connectorx/sources/csv/
mod.rs

1//! Source implementation for CSV files.
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::CSVSourceError;
7pub use self::typesystem::CSVTypeSystem;
8use super::{PartitionParser, Produce, Source, SourcePartition};
9use crate::{data_order::DataOrder, errors::ConnectorXError, sql::CXQuery};
10use anyhow::anyhow;
11use chrono::{DateTime, Utc};
12use fehler::{throw, throws};
13#[cfg(feature = "src_csv")]
14use regex::{Regex, RegexBuilder};
15use std::collections::HashSet;
16use std::fs::File;
17
18pub struct CSVSource {
19    schema: Vec<CSVTypeSystem>,
20    files: Vec<CXQuery<String>>,
21    names: Vec<String>,
22}
23
24impl CSVSource {
25    pub fn new(schema: &[CSVTypeSystem]) -> Self {
26        CSVSource {
27            schema: schema.to_vec(),
28            files: vec![],
29            names: vec![],
30        }
31    }
32
33    #[throws(CSVSourceError)]
34    pub fn infer_schema(&mut self) -> Vec<CSVTypeSystem> {
35        // regular expressions for infer CSVTypeSystem from string
36        let decimal_re: Regex = Regex::new(r"^-?(\d+\.\d+)$")?;
37        let integer_re: Regex = Regex::new(r"^-?(\d+)$")?;
38        let boolean_re: Regex = RegexBuilder::new(r"^(true)$|^(false)$")
39            .case_insensitive(true)
40            .build()?;
41        let datetime_re: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$")?;
42
43        // read max_records rows to infer possible CSVTypeSystems for each field
44        let mut reader = csv::ReaderBuilder::new()
45            .has_headers(true)
46            .from_reader(File::open(self.files[0].as_str())?);
47
48        let max_records_to_read = 50;
49        let num_cols = self.names.len();
50
51        let mut column_types: Vec<HashSet<CSVTypeSystem>> = vec![HashSet::new(); num_cols];
52        let mut nulls: Vec<bool> = vec![false; num_cols];
53
54        let mut record = csv::StringRecord::new();
55
56        for _record_counter in 0..max_records_to_read {
57            if !reader.read_record(&mut record)? {
58                break;
59            }
60            for field_counter in 0..num_cols {
61                if let Some(string) = record.get(field_counter) {
62                    if string.is_empty() {
63                        nulls[field_counter] = true;
64                    } else {
65                        let dt: CSVTypeSystem;
66
67                        if string.starts_with('"') {
68                            dt = CSVTypeSystem::String(false);
69                        } else if boolean_re.is_match(string) {
70                            dt = CSVTypeSystem::Bool(false);
71                        } else if decimal_re.is_match(string) {
72                            dt = CSVTypeSystem::F64(false);
73                        } else if integer_re.is_match(string) {
74                            dt = CSVTypeSystem::I64(false);
75                        } else if datetime_re.is_match(string) {
76                            dt = CSVTypeSystem::DateTime(false);
77                        } else {
78                            dt = CSVTypeSystem::String(false);
79                        }
80                        column_types[field_counter].insert(dt);
81                    }
82                }
83            }
84        }
85
86        // determine CSVTypeSystem based on possible candidates
87        let mut schema = vec![];
88
89        for field_counter in 0..num_cols {
90            let possibilities = &column_types[field_counter];
91            let has_nulls = nulls[field_counter];
92
93            match possibilities.len() {
94                1 => {
95                    for dt in possibilities.iter() {
96                        match *dt {
97                            CSVTypeSystem::I64(false) => {
98                                schema.push(CSVTypeSystem::I64(has_nulls));
99                            }
100                            CSVTypeSystem::F64(false) => {
101                                schema.push(CSVTypeSystem::F64(has_nulls));
102                            }
103                            CSVTypeSystem::Bool(false) => {
104                                schema.push(CSVTypeSystem::Bool(has_nulls));
105                            }
106                            CSVTypeSystem::String(false) => {
107                                schema.push(CSVTypeSystem::String(has_nulls));
108                            }
109                            CSVTypeSystem::DateTime(false) => {
110                                schema.push(CSVTypeSystem::DateTime(has_nulls));
111                            }
112                            _ => {}
113                        }
114                    }
115                }
116                2 => {
117                    if possibilities.contains(&CSVTypeSystem::I64(false))
118                        && possibilities.contains(&CSVTypeSystem::F64(false))
119                    {
120                        // Integer && Float -> Float
121                        schema.push(CSVTypeSystem::F64(has_nulls));
122                    } else {
123                        // Conflicting CSVTypeSystems -> String
124                        schema.push(CSVTypeSystem::String(has_nulls));
125                    }
126                }
127                _ => {
128                    // Conflicting CSVTypeSystems -> String
129                    schema.push(CSVTypeSystem::String(has_nulls));
130                }
131            }
132        }
133        schema
134    }
135}
136
137impl Source for CSVSource {
138    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
139    type Partition = CSVSourcePartition;
140    type TypeSystem = CSVTypeSystem;
141    type Error = CSVSourceError;
142
143    #[throws(CSVSourceError)]
144    fn set_data_order(&mut self, data_order: DataOrder) {
145        if !matches!(data_order, DataOrder::RowMajor) {
146            throw!(ConnectorXError::UnsupportedDataOrder(data_order))
147        }
148    }
149
150    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
151        self.files = queries.iter().map(|q| q.map(Q::to_string)).collect();
152    }
153
154    fn set_origin_query(&mut self, _query: Option<String>) {}
155
156    #[throws(CSVSourceError)]
157    fn fetch_metadata(&mut self) {
158        let mut reader = csv::ReaderBuilder::new()
159            .has_headers(true)
160            .from_reader(File::open(self.files[0].as_str())?);
161        let header = reader.headers()?;
162
163        self.names = header.iter().map(|s| s.to_string()).collect();
164
165        if self.schema.is_empty() {
166            self.schema = self.infer_schema()?;
167        }
168
169        assert_eq!(header.len(), self.schema.len());
170    }
171
172    #[throws(CSVSourceError)]
173    fn result_rows(&mut self) -> Option<usize> {
174        None
175    }
176
177    fn names(&self) -> Vec<String> {
178        self.names.clone()
179    }
180
181    fn schema(&self) -> Vec<Self::TypeSystem> {
182        self.schema.clone()
183    }
184
185    #[throws(CSVSourceError)]
186    fn partition(self) -> Vec<Self::Partition> {
187        let mut partitions = vec![];
188        for file in self.files {
189            partitions.push(CSVSourcePartition::new(file)?);
190        }
191        partitions
192    }
193}
194
195pub struct CSVSourcePartition {
196    records: Vec<csv::StringRecord>,
197    counter: usize,
198    nrows: usize,
199    ncols: usize,
200}
201
202impl CSVSourcePartition {
203    #[throws(CSVSourceError)]
204    pub fn new(fname: CXQuery<String>) -> Self {
205        let reader = csv::ReaderBuilder::new()
206            .has_headers(true)
207            .from_reader(File::open(fname.as_str())?);
208        let mut records = vec![];
209        reader
210            .into_records()
211            .try_for_each(|v| -> Result<(), CSVSourceError> {
212                records.push(v.map_err(|e| anyhow!(e))?);
213                Ok(())
214            })?;
215
216        let nrows = records.len();
217        let ncols = if nrows > 0 { records[0].len() } else { 0 };
218
219        Self {
220            records,
221            counter: 0,
222            nrows,
223            ncols,
224        }
225    }
226}
227
228impl SourcePartition for CSVSourcePartition {
229    type TypeSystem = CSVTypeSystem;
230    type Parser<'a> = CSVSourcePartitionParser<'a>;
231    type Error = CSVSourceError;
232
233    /// The parameter `query` is the path of the csv file
234    #[throws(CSVSourceError)]
235    fn result_rows(&mut self) {}
236
237    fn nrows(&self) -> usize {
238        self.nrows
239    }
240
241    fn ncols(&self) -> usize {
242        self.ncols
243    }
244
245    #[throws(CSVSourceError)]
246    fn parser(&mut self) -> Self::Parser<'_> {
247        CSVSourcePartitionParser {
248            records: &mut self.records,
249            counter: &mut self.counter,
250            ncols: self.ncols,
251        }
252    }
253}
254
255pub struct CSVSourcePartitionParser<'a> {
256    records: &'a mut [csv::StringRecord],
257    counter: &'a mut usize,
258    ncols: usize,
259}
260
261impl<'a> CSVSourcePartitionParser<'a> {
262    fn next_val(&mut self) -> &str {
263        let v: &str = self.records[*self.counter / self.ncols][*self.counter % self.ncols].as_ref();
264        *self.counter += 1;
265
266        v
267    }
268}
269
270impl<'a> PartitionParser<'a> for CSVSourcePartitionParser<'a> {
271    type TypeSystem = CSVTypeSystem;
272    type Error = CSVSourceError;
273
274    #[throws(CSVSourceError)]
275    fn fetch_next(&mut self) -> (usize, bool) {
276        (self.records.len(), true)
277    }
278}
279
280impl<'r, 'a> Produce<'r, i64> for CSVSourcePartitionParser<'a> {
281    type Error = CSVSourceError;
282
283    #[throws(CSVSourceError)]
284    fn produce(&mut self) -> i64 {
285        let v = self.next_val();
286        v.parse()
287            .map_err(|_| ConnectorXError::cannot_produce::<i64>(Some(v.into())))?
288    }
289}
290
291impl<'r, 'a> Produce<'r, Option<i64>> for CSVSourcePartitionParser<'a> {
292    type Error = CSVSourceError;
293
294    #[throws(CSVSourceError)]
295    fn produce(&mut self) -> Option<i64> {
296        let v = self.next_val();
297        if v.is_empty() {
298            return None;
299        }
300        let v = v
301            .parse()
302            .map_err(|_| ConnectorXError::cannot_produce::<Option<i64>>(Some(v.into())))?;
303
304        Some(v)
305    }
306}
307
308impl<'r, 'a> Produce<'r, f64> for CSVSourcePartitionParser<'a> {
309    type Error = CSVSourceError;
310
311    #[throws(CSVSourceError)]
312    fn produce(&mut self) -> f64 {
313        let v = self.next_val();
314        v.parse()
315            .map_err(|_| ConnectorXError::cannot_produce::<f64>(Some(v.into())))?
316    }
317}
318
319impl<'r, 'a> Produce<'r, Option<f64>> for CSVSourcePartitionParser<'a> {
320    type Error = CSVSourceError;
321
322    #[throws(CSVSourceError)]
323    fn produce(&mut self) -> Option<f64> {
324        let v = self.next_val();
325        if v.is_empty() {
326            return None;
327        }
328        let v = v
329            .parse()
330            .map_err(|_| ConnectorXError::cannot_produce::<Option<f64>>(Some(v.into())))?;
331
332        Some(v)
333    }
334}
335
336impl<'r, 'a> Produce<'r, bool> for CSVSourcePartitionParser<'a> {
337    type Error = CSVSourceError;
338
339    #[throws(CSVSourceError)]
340    fn produce(&mut self) -> bool {
341        let v = self.next_val();
342        v.parse()
343            .map_err(|_| ConnectorXError::cannot_produce::<bool>(Some(v.into())))?
344    }
345}
346
347impl<'r, 'a> Produce<'r, Option<bool>> for CSVSourcePartitionParser<'a> {
348    type Error = CSVSourceError;
349
350    #[throws(CSVSourceError)]
351    fn produce(&mut self) -> Option<bool> {
352        let v = self.next_val();
353        if v.is_empty() {
354            return None;
355        }
356        let v = v
357            .parse()
358            .map_err(|_| ConnectorXError::cannot_produce::<Option<bool>>(Some(v.into())))?;
359
360        Some(v)
361    }
362}
363
364impl<'r, 'a> Produce<'r, String> for CSVSourcePartitionParser<'a> {
365    type Error = CSVSourceError;
366
367    #[throws(CSVSourceError)]
368    fn produce(&mut self) -> String {
369        let v = self.next_val();
370        String::from(v)
371    }
372}
373
374impl<'a, 'r> Produce<'r, Option<String>> for CSVSourcePartitionParser<'a> {
375    type Error = CSVSourceError;
376
377    #[throws(CSVSourceError)]
378    fn produce(&'r mut self) -> Option<String> {
379        let v = self.next_val();
380
381        Some(String::from(v))
382    }
383}
384
385impl<'r, 'a> Produce<'r, DateTime<Utc>> for CSVSourcePartitionParser<'a> {
386    type Error = CSVSourceError;
387
388    #[throws(CSVSourceError)]
389    fn produce(&mut self) -> DateTime<Utc> {
390        let v = self.next_val();
391        v.parse()
392            .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?
393    }
394}
395
396impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for CSVSourcePartitionParser<'a> {
397    type Error = CSVSourceError;
398
399    #[throws(CSVSourceError)]
400    fn produce(&mut self) -> Option<DateTime<Utc>> {
401        let v = self.next_val();
402        if v.is_empty() {
403            return None;
404        }
405        let v = v
406            .parse()
407            .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?;
408        Some(v)
409    }
410}