connectorx/sources/csv/
mod.rs1mod errors;
4mod typesystem;
5
6pub use self::errors::CSVSourceError;
7pub use self::typesystem::CSVTypeSystem;
8use super::{PartitionParser, Produce, Source, SourcePartition};
9use crate::{data_order::DataOrder, errors::ConnectorXError, sql::CXQuery};
10use anyhow::anyhow;
11use chrono::{DateTime, Utc};
12use fehler::{throw, throws};
13#[cfg(feature = "src_csv")]
14use regex::{Regex, RegexBuilder};
15use std::collections::HashSet;
16use std::fs::File;
17
18pub struct CSVSource {
19 schema: Vec<CSVTypeSystem>,
20 files: Vec<CXQuery<String>>,
21 names: Vec<String>,
22}
23
24impl CSVSource {
25 pub fn new(schema: &[CSVTypeSystem]) -> Self {
26 CSVSource {
27 schema: schema.to_vec(),
28 files: vec![],
29 names: vec![],
30 }
31 }
32
33 #[throws(CSVSourceError)]
34 pub fn infer_schema(&mut self) -> Vec<CSVTypeSystem> {
35 let decimal_re: Regex = Regex::new(r"^-?(\d+\.\d+)$")?;
37 let integer_re: Regex = Regex::new(r"^-?(\d+)$")?;
38 let boolean_re: Regex = RegexBuilder::new(r"^(true)$|^(false)$")
39 .case_insensitive(true)
40 .build()?;
41 let datetime_re: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$")?;
42
43 let mut reader = csv::ReaderBuilder::new()
45 .has_headers(true)
46 .from_reader(File::open(self.files[0].as_str())?);
47
48 let max_records_to_read = 50;
49 let num_cols = self.names.len();
50
51 let mut column_types: Vec<HashSet<CSVTypeSystem>> = vec![HashSet::new(); num_cols];
52 let mut nulls: Vec<bool> = vec![false; num_cols];
53
54 let mut record = csv::StringRecord::new();
55
56 for _record_counter in 0..max_records_to_read {
57 if !reader.read_record(&mut record)? {
58 break;
59 }
60 for field_counter in 0..num_cols {
61 if let Some(string) = record.get(field_counter) {
62 if string.is_empty() {
63 nulls[field_counter] = true;
64 } else {
65 let dt: CSVTypeSystem;
66
67 if string.starts_with('"') {
68 dt = CSVTypeSystem::String(false);
69 } else if boolean_re.is_match(string) {
70 dt = CSVTypeSystem::Bool(false);
71 } else if decimal_re.is_match(string) {
72 dt = CSVTypeSystem::F64(false);
73 } else if integer_re.is_match(string) {
74 dt = CSVTypeSystem::I64(false);
75 } else if datetime_re.is_match(string) {
76 dt = CSVTypeSystem::DateTime(false);
77 } else {
78 dt = CSVTypeSystem::String(false);
79 }
80 column_types[field_counter].insert(dt);
81 }
82 }
83 }
84 }
85
86 let mut schema = vec![];
88
89 for field_counter in 0..num_cols {
90 let possibilities = &column_types[field_counter];
91 let has_nulls = nulls[field_counter];
92
93 match possibilities.len() {
94 1 => {
95 for dt in possibilities.iter() {
96 match *dt {
97 CSVTypeSystem::I64(false) => {
98 schema.push(CSVTypeSystem::I64(has_nulls));
99 }
100 CSVTypeSystem::F64(false) => {
101 schema.push(CSVTypeSystem::F64(has_nulls));
102 }
103 CSVTypeSystem::Bool(false) => {
104 schema.push(CSVTypeSystem::Bool(has_nulls));
105 }
106 CSVTypeSystem::String(false) => {
107 schema.push(CSVTypeSystem::String(has_nulls));
108 }
109 CSVTypeSystem::DateTime(false) => {
110 schema.push(CSVTypeSystem::DateTime(has_nulls));
111 }
112 _ => {}
113 }
114 }
115 }
116 2 => {
117 if possibilities.contains(&CSVTypeSystem::I64(false))
118 && possibilities.contains(&CSVTypeSystem::F64(false))
119 {
120 schema.push(CSVTypeSystem::F64(has_nulls));
122 } else {
123 schema.push(CSVTypeSystem::String(has_nulls));
125 }
126 }
127 _ => {
128 schema.push(CSVTypeSystem::String(has_nulls));
130 }
131 }
132 }
133 schema
134 }
135}
136
137impl Source for CSVSource {
138 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
139 type Partition = CSVSourcePartition;
140 type TypeSystem = CSVTypeSystem;
141 type Error = CSVSourceError;
142
143 #[throws(CSVSourceError)]
144 fn set_data_order(&mut self, data_order: DataOrder) {
145 if !matches!(data_order, DataOrder::RowMajor) {
146 throw!(ConnectorXError::UnsupportedDataOrder(data_order))
147 }
148 }
149
150 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
151 self.files = queries.iter().map(|q| q.map(Q::to_string)).collect();
152 }
153
154 fn set_origin_query(&mut self, _query: Option<String>) {}
155
156 #[throws(CSVSourceError)]
157 fn fetch_metadata(&mut self) {
158 let mut reader = csv::ReaderBuilder::new()
159 .has_headers(true)
160 .from_reader(File::open(self.files[0].as_str())?);
161 let header = reader.headers()?;
162
163 self.names = header.iter().map(|s| s.to_string()).collect();
164
165 if self.schema.is_empty() {
166 self.schema = self.infer_schema()?;
167 }
168
169 assert_eq!(header.len(), self.schema.len());
170 }
171
172 #[throws(CSVSourceError)]
173 fn result_rows(&mut self) -> Option<usize> {
174 None
175 }
176
177 fn names(&self) -> Vec<String> {
178 self.names.clone()
179 }
180
181 fn schema(&self) -> Vec<Self::TypeSystem> {
182 self.schema.clone()
183 }
184
185 #[throws(CSVSourceError)]
186 fn partition(self) -> Vec<Self::Partition> {
187 let mut partitions = vec![];
188 for file in self.files {
189 partitions.push(CSVSourcePartition::new(file)?);
190 }
191 partitions
192 }
193}
194
195pub struct CSVSourcePartition {
196 records: Vec<csv::StringRecord>,
197 counter: usize,
198 nrows: usize,
199 ncols: usize,
200}
201
202impl CSVSourcePartition {
203 #[throws(CSVSourceError)]
204 pub fn new(fname: CXQuery<String>) -> Self {
205 let reader = csv::ReaderBuilder::new()
206 .has_headers(true)
207 .from_reader(File::open(fname.as_str())?);
208 let mut records = vec![];
209 reader
210 .into_records()
211 .try_for_each(|v| -> Result<(), CSVSourceError> {
212 records.push(v.map_err(|e| anyhow!(e))?);
213 Ok(())
214 })?;
215
216 let nrows = records.len();
217 let ncols = if nrows > 0 { records[0].len() } else { 0 };
218
219 Self {
220 records,
221 counter: 0,
222 nrows,
223 ncols,
224 }
225 }
226}
227
228impl SourcePartition for CSVSourcePartition {
229 type TypeSystem = CSVTypeSystem;
230 type Parser<'a> = CSVSourcePartitionParser<'a>;
231 type Error = CSVSourceError;
232
233 #[throws(CSVSourceError)]
235 fn result_rows(&mut self) {}
236
237 fn nrows(&self) -> usize {
238 self.nrows
239 }
240
241 fn ncols(&self) -> usize {
242 self.ncols
243 }
244
245 #[throws(CSVSourceError)]
246 fn parser(&mut self) -> Self::Parser<'_> {
247 CSVSourcePartitionParser {
248 records: &mut self.records,
249 counter: &mut self.counter,
250 ncols: self.ncols,
251 }
252 }
253}
254
255pub struct CSVSourcePartitionParser<'a> {
256 records: &'a mut [csv::StringRecord],
257 counter: &'a mut usize,
258 ncols: usize,
259}
260
261impl<'a> CSVSourcePartitionParser<'a> {
262 fn next_val(&mut self) -> &str {
263 let v: &str = self.records[*self.counter / self.ncols][*self.counter % self.ncols].as_ref();
264 *self.counter += 1;
265
266 v
267 }
268}
269
270impl<'a> PartitionParser<'a> for CSVSourcePartitionParser<'a> {
271 type TypeSystem = CSVTypeSystem;
272 type Error = CSVSourceError;
273
274 #[throws(CSVSourceError)]
275 fn fetch_next(&mut self) -> (usize, bool) {
276 (self.records.len(), true)
277 }
278}
279
280impl<'r, 'a> Produce<'r, i64> for CSVSourcePartitionParser<'a> {
281 type Error = CSVSourceError;
282
283 #[throws(CSVSourceError)]
284 fn produce(&mut self) -> i64 {
285 let v = self.next_val();
286 v.parse()
287 .map_err(|_| ConnectorXError::cannot_produce::<i64>(Some(v.into())))?
288 }
289}
290
291impl<'r, 'a> Produce<'r, Option<i64>> for CSVSourcePartitionParser<'a> {
292 type Error = CSVSourceError;
293
294 #[throws(CSVSourceError)]
295 fn produce(&mut self) -> Option<i64> {
296 let v = self.next_val();
297 if v.is_empty() {
298 return None;
299 }
300 let v = v
301 .parse()
302 .map_err(|_| ConnectorXError::cannot_produce::<Option<i64>>(Some(v.into())))?;
303
304 Some(v)
305 }
306}
307
308impl<'r, 'a> Produce<'r, f64> for CSVSourcePartitionParser<'a> {
309 type Error = CSVSourceError;
310
311 #[throws(CSVSourceError)]
312 fn produce(&mut self) -> f64 {
313 let v = self.next_val();
314 v.parse()
315 .map_err(|_| ConnectorXError::cannot_produce::<f64>(Some(v.into())))?
316 }
317}
318
319impl<'r, 'a> Produce<'r, Option<f64>> for CSVSourcePartitionParser<'a> {
320 type Error = CSVSourceError;
321
322 #[throws(CSVSourceError)]
323 fn produce(&mut self) -> Option<f64> {
324 let v = self.next_val();
325 if v.is_empty() {
326 return None;
327 }
328 let v = v
329 .parse()
330 .map_err(|_| ConnectorXError::cannot_produce::<Option<f64>>(Some(v.into())))?;
331
332 Some(v)
333 }
334}
335
336impl<'r, 'a> Produce<'r, bool> for CSVSourcePartitionParser<'a> {
337 type Error = CSVSourceError;
338
339 #[throws(CSVSourceError)]
340 fn produce(&mut self) -> bool {
341 let v = self.next_val();
342 v.parse()
343 .map_err(|_| ConnectorXError::cannot_produce::<bool>(Some(v.into())))?
344 }
345}
346
347impl<'r, 'a> Produce<'r, Option<bool>> for CSVSourcePartitionParser<'a> {
348 type Error = CSVSourceError;
349
350 #[throws(CSVSourceError)]
351 fn produce(&mut self) -> Option<bool> {
352 let v = self.next_val();
353 if v.is_empty() {
354 return None;
355 }
356 let v = v
357 .parse()
358 .map_err(|_| ConnectorXError::cannot_produce::<Option<bool>>(Some(v.into())))?;
359
360 Some(v)
361 }
362}
363
364impl<'r, 'a> Produce<'r, String> for CSVSourcePartitionParser<'a> {
365 type Error = CSVSourceError;
366
367 #[throws(CSVSourceError)]
368 fn produce(&mut self) -> String {
369 let v = self.next_val();
370 String::from(v)
371 }
372}
373
374impl<'a, 'r> Produce<'r, Option<String>> for CSVSourcePartitionParser<'a> {
375 type Error = CSVSourceError;
376
377 #[throws(CSVSourceError)]
378 fn produce(&'r mut self) -> Option<String> {
379 let v = self.next_val();
380
381 Some(String::from(v))
382 }
383}
384
385impl<'r, 'a> Produce<'r, DateTime<Utc>> for CSVSourcePartitionParser<'a> {
386 type Error = CSVSourceError;
387
388 #[throws(CSVSourceError)]
389 fn produce(&mut self) -> DateTime<Utc> {
390 let v = self.next_val();
391 v.parse()
392 .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?
393 }
394}
395
396impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for CSVSourcePartitionParser<'a> {
397 type Error = CSVSourceError;
398
399 #[throws(CSVSourceError)]
400 fn produce(&mut self) -> Option<DateTime<Utc>> {
401 let v = self.next_val();
402 if v.is_empty() {
403 return None;
404 }
405 let v = v
406 .parse()
407 .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?;
408 Some(v)
409 }
410}