connectorx/sources/sqlite/
mod.rs

1//! Source implementation for SQLite embedded database.
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::SQLiteSourceError;
7use crate::{
8    data_order::DataOrder,
9    errors::ConnectorXError,
10    sources::{PartitionParser, Produce, Source, SourcePartition},
11    sql::{count_query, limit1_query, CXQuery},
12    utils::DummyBox,
13};
14use anyhow::anyhow;
15use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
16use fallible_streaming_iterator::FallibleStreamingIterator;
17use fehler::{throw, throws};
18use log::debug;
19use owning_ref::OwningHandle;
20use r2d2::{Pool, PooledConnection};
21use r2d2_sqlite::SqliteConnectionManager;
22use rusqlite::{Row, Rows, Statement};
23use sqlparser::dialect::SQLiteDialect;
24use std::convert::TryFrom;
25pub use typesystem::SQLiteTypeSystem;
26use urlencoding::decode;
27
28pub struct SQLiteSource {
29    pool: Pool<SqliteConnectionManager>,
30    origin_query: Option<String>,
31    queries: Vec<CXQuery<String>>,
32    names: Vec<String>,
33    schema: Vec<SQLiteTypeSystem>,
34}
35
36impl SQLiteSource {
37    #[throws(SQLiteSourceError)]
38    pub fn new(conn: &str, nconn: usize) -> Self {
39        let decoded_conn = decode(conn)?.into_owned();
40        debug!("decoded conn: {}", decoded_conn);
41        let manager = SqliteConnectionManager::file(decoded_conn);
42        let pool = r2d2::Pool::builder()
43            .max_size(nconn as u32)
44            .build(manager)?;
45
46        Self {
47            pool,
48            origin_query: None,
49            queries: vec![],
50            names: vec![],
51            schema: vec![],
52        }
53    }
54}
55
56impl Source for SQLiteSource
57where
58    SQLiteSourcePartition: SourcePartition<TypeSystem = SQLiteTypeSystem>,
59{
60    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
61    type Partition = SQLiteSourcePartition;
62    type TypeSystem = SQLiteTypeSystem;
63    type Error = SQLiteSourceError;
64
65    #[throws(SQLiteSourceError)]
66    fn set_data_order(&mut self, data_order: DataOrder) {
67        if !matches!(data_order, DataOrder::RowMajor) {
68            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
69        }
70    }
71
72    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
73        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
74    }
75
76    fn set_origin_query(&mut self, query: Option<String>) {
77        self.origin_query = query;
78    }
79
80    #[throws(SQLiteSourceError)]
81    fn fetch_metadata(&mut self) {
82        assert!(!self.queries.is_empty());
83        let conn = self.pool.get()?;
84        let mut names = vec![];
85        let mut types = vec![];
86        let mut num_empty = 0;
87
88        // assuming all the partition queries yield same schema
89        for (i, query) in self.queries.iter().enumerate() {
90            let l1query = limit1_query(query, &SQLiteDialect {})?;
91
92            let is_sucess = conn.query_row(l1query.as_str(), [], |row| {
93                for (j, col) in row.as_ref().columns().iter().enumerate() {
94                    if j >= names.len() {
95                        names.push(col.name().to_string());
96                    }
97                    if j >= types.len() {
98                        let vr = row.get_ref(j)?;
99                        match SQLiteTypeSystem::try_from((col.decl_type(), vr.data_type())) {
100                            Ok(t) => types.push(Some(t)),
101                            Err(_) => {
102                                types.push(None);
103                            }
104                        }
105                    } else if types[j].is_none() {
106                        // We didn't get the type in the previous round
107                        let vr = row.get_ref(j)?;
108                        if let Ok(t) = SQLiteTypeSystem::try_from((col.decl_type(), vr.data_type()))
109                        {
110                            types[j] = Some(t)
111                        }
112                    }
113                }
114                Ok(())
115            });
116
117            match is_sucess {
118                Ok(()) => {
119                    if !types.contains(&None) {
120                        self.names = names;
121                        self.schema = types.into_iter().map(|t| t.unwrap()).collect();
122                        return;
123                    } else if i == self.queries.len() - 1 {
124                        debug!(
125                            "cannot get metadata for '{}' due to null value: {:?}",
126                            query, types
127                        );
128                        throw!(SQLiteSourceError::InferTypeFromNull);
129                    }
130                }
131                Err(e) => {
132                    if let rusqlite::Error::QueryReturnedNoRows = e {
133                        num_empty += 1; // make sure when all partition results are empty, do not throw error
134                    }
135                    if i == self.queries.len() - 1 && num_empty < self.queries.len() {
136                        // tried the last query but still get an error
137                        debug!("cannot get metadata for '{}': {}", query, e);
138                        throw!(e)
139                    }
140                }
141            }
142        }
143
144        // tried all queries but all get empty result set
145        let stmt = conn.prepare(self.queries[0].as_str())?;
146
147        self.names = stmt
148            .column_names()
149            .into_iter()
150            .map(|s| s.to_string())
151            .collect();
152        // set all columns as string (align with pandas)
153        self.schema = vec![SQLiteTypeSystem::Text(false); self.names.len()];
154    }
155
156    #[throws(SQLiteSourceError)]
157    fn result_rows(&mut self) -> Option<usize> {
158        match &self.origin_query {
159            Some(q) => {
160                let cxq = CXQuery::Naked(q.clone());
161                let conn = self.pool.get()?;
162                let nrows =
163                    conn.query_row(count_query(&cxq, &SQLiteDialect {})?.as_str(), [], |row| {
164                        Ok(row.get::<_, i64>(0)? as usize)
165                    })?;
166                Some(nrows)
167            }
168            None => None,
169        }
170    }
171
172    fn names(&self) -> Vec<String> {
173        self.names.clone()
174    }
175
176    fn schema(&self) -> Vec<Self::TypeSystem> {
177        self.schema.clone()
178    }
179
180    #[throws(SQLiteSourceError)]
181    fn partition(self) -> Vec<Self::Partition> {
182        let mut ret = vec![];
183        for query in self.queries {
184            let conn = self.pool.get()?;
185
186            ret.push(SQLiteSourcePartition::new(conn, &query, &self.schema));
187        }
188        ret
189    }
190}
191
192pub struct SQLiteSourcePartition {
193    conn: PooledConnection<SqliteConnectionManager>,
194    query: CXQuery<String>,
195    schema: Vec<SQLiteTypeSystem>,
196    nrows: usize,
197    ncols: usize,
198}
199
200impl SQLiteSourcePartition {
201    pub fn new(
202        conn: PooledConnection<SqliteConnectionManager>,
203        query: &CXQuery<String>,
204        schema: &[SQLiteTypeSystem],
205    ) -> Self {
206        Self {
207            conn,
208            query: query.clone(),
209            schema: schema.to_vec(),
210            nrows: 0,
211            ncols: schema.len(),
212        }
213    }
214}
215
216impl SourcePartition for SQLiteSourcePartition {
217    type TypeSystem = SQLiteTypeSystem;
218    type Parser<'a> = SQLiteSourcePartitionParser<'a>;
219    type Error = SQLiteSourceError;
220
221    #[throws(SQLiteSourceError)]
222    fn result_rows(&mut self) {
223        self.nrows = self.conn.query_row(
224            count_query(&self.query, &SQLiteDialect {})?.as_str(),
225            [],
226            |row| Ok(row.get::<_, i64>(0)? as usize),
227        )?;
228    }
229
230    #[throws(SQLiteSourceError)]
231    fn parser(&mut self) -> Self::Parser<'_> {
232        SQLiteSourcePartitionParser::new(&self.conn, self.query.as_str(), &self.schema)?
233    }
234
235    fn nrows(&self) -> usize {
236        self.nrows
237    }
238
239    fn ncols(&self) -> usize {
240        self.ncols
241    }
242}
243
244unsafe impl<'a> Send for SQLiteSourcePartitionParser<'a> {}
245
246pub struct SQLiteSourcePartitionParser<'a> {
247    rows: OwningHandle<Box<Statement<'a>>, DummyBox<Rows<'a>>>,
248    ncols: usize,
249    current_col: usize,
250    current_consumed: bool,
251    is_finished: bool,
252}
253
254impl<'a> SQLiteSourcePartitionParser<'a> {
255    #[throws(SQLiteSourceError)]
256    pub fn new(
257        conn: &'a PooledConnection<SqliteConnectionManager>,
258        query: &str,
259        schema: &[SQLiteTypeSystem],
260    ) -> Self {
261        let stmt: Statement<'a> = conn.prepare(query)?;
262
263        // Safety: DummyBox borrows the on-heap stmt, which is owned by the OwningHandle.
264        // No matter how we move the owning handle (thus the Box<Statment>), the Statement
265        // keeps its address static on the heap, thus the borrow of MyRows keeps valid.
266        let rows: OwningHandle<Box<Statement<'a>>, DummyBox<Rows<'a>>> =
267            OwningHandle::new_with_fn(Box::new(stmt), |stmt: *const Statement<'a>| unsafe {
268                DummyBox((*(stmt as *mut Statement<'_>)).query([]).unwrap())
269            });
270        Self {
271            rows,
272            ncols: schema.len(),
273            current_col: 0,
274            current_consumed: true,
275            is_finished: false,
276        }
277    }
278
279    #[throws(SQLiteSourceError)]
280    fn next_loc(&mut self) -> (&Row, usize) {
281        self.current_consumed = true;
282        let row: &Row = (*self.rows)
283            .get()
284            .ok_or_else(|| anyhow!("Sqlite empty current row"))?;
285        let col = self.current_col;
286        self.current_col = (self.current_col + 1) % self.ncols;
287        (row, col)
288    }
289}
290
291impl<'a> PartitionParser<'a> for SQLiteSourcePartitionParser<'a> {
292    type TypeSystem = SQLiteTypeSystem;
293    type Error = SQLiteSourceError;
294
295    #[throws(SQLiteSourceError)]
296    fn fetch_next(&mut self) -> (usize, bool) {
297        assert!(self.current_col == 0);
298
299        if !self.current_consumed {
300            return (1, false);
301        } else if self.is_finished {
302            return (0, true);
303        }
304
305        match (*self.rows).next()? {
306            Some(_) => {
307                self.current_consumed = false;
308                (1, false)
309            }
310            None => {
311                self.is_finished = true;
312                (0, true)
313            }
314        }
315    }
316}
317
318macro_rules! impl_produce {
319    ($($t: ty,)+) => {
320        $(
321            impl<'r, 'a> Produce<'r, $t> for SQLiteSourcePartitionParser<'a> {
322                type Error = SQLiteSourceError;
323
324                #[throws(SQLiteSourceError)]
325                fn produce(&'r mut self) -> $t {
326                    let (row, col) = self.next_loc()?;
327                    let val = row.get(col)?;
328                    val
329                }
330            }
331
332            impl<'r, 'a> Produce<'r, Option<$t>> for SQLiteSourcePartitionParser<'a> {
333                type Error = SQLiteSourceError;
334
335                #[throws(SQLiteSourceError)]
336                fn produce(&'r mut self) -> Option<$t> {
337                    let (row, col) = self.next_loc()?;
338                    let val = row.get(col)?;
339                    val
340                }
341            }
342        )+
343    };
344}
345
346impl_produce!(
347    bool,
348    i64,
349    i32,
350    i16,
351    f64,
352    Box<str>,
353    NaiveDate,
354    NaiveTime,
355    NaiveDateTime,
356    Vec<u8>,
357);