connectorx/sources/sqlite/
mod.rs1mod errors;
4mod typesystem;
5
6pub use self::errors::SQLiteSourceError;
7use crate::{
8 data_order::DataOrder,
9 errors::ConnectorXError,
10 sources::{PartitionParser, Produce, Source, SourcePartition},
11 sql::{count_query, limit1_query, CXQuery},
12 utils::DummyBox,
13};
14use anyhow::anyhow;
15use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
16use fallible_streaming_iterator::FallibleStreamingIterator;
17use fehler::{throw, throws};
18use log::debug;
19use owning_ref::OwningHandle;
20use r2d2::{Pool, PooledConnection};
21use r2d2_sqlite::SqliteConnectionManager;
22use rusqlite::{Row, Rows, Statement};
23use sqlparser::dialect::SQLiteDialect;
24use std::convert::TryFrom;
25pub use typesystem::SQLiteTypeSystem;
26use urlencoding::decode;
27
28pub struct SQLiteSource {
29 pool: Pool<SqliteConnectionManager>,
30 origin_query: Option<String>,
31 queries: Vec<CXQuery<String>>,
32 names: Vec<String>,
33 schema: Vec<SQLiteTypeSystem>,
34}
35
36impl SQLiteSource {
37 #[throws(SQLiteSourceError)]
38 pub fn new(conn: &str, nconn: usize) -> Self {
39 let decoded_conn = decode(conn)?.into_owned();
40 debug!("decoded conn: {}", decoded_conn);
41 let manager = SqliteConnectionManager::file(decoded_conn);
42 let pool = r2d2::Pool::builder()
43 .max_size(nconn as u32)
44 .build(manager)?;
45
46 Self {
47 pool,
48 origin_query: None,
49 queries: vec![],
50 names: vec![],
51 schema: vec![],
52 }
53 }
54}
55
56impl Source for SQLiteSource
57where
58 SQLiteSourcePartition: SourcePartition<TypeSystem = SQLiteTypeSystem>,
59{
60 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
61 type Partition = SQLiteSourcePartition;
62 type TypeSystem = SQLiteTypeSystem;
63 type Error = SQLiteSourceError;
64
65 #[throws(SQLiteSourceError)]
66 fn set_data_order(&mut self, data_order: DataOrder) {
67 if !matches!(data_order, DataOrder::RowMajor) {
68 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
69 }
70 }
71
72 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
73 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
74 }
75
76 fn set_origin_query(&mut self, query: Option<String>) {
77 self.origin_query = query;
78 }
79
80 #[throws(SQLiteSourceError)]
81 fn fetch_metadata(&mut self) {
82 assert!(!self.queries.is_empty());
83 let conn = self.pool.get()?;
84 let mut names = vec![];
85 let mut types = vec![];
86 let mut num_empty = 0;
87
88 for (i, query) in self.queries.iter().enumerate() {
90 let l1query = limit1_query(query, &SQLiteDialect {})?;
91
92 let is_sucess = conn.query_row(l1query.as_str(), [], |row| {
93 for (j, col) in row.as_ref().columns().iter().enumerate() {
94 if j >= names.len() {
95 names.push(col.name().to_string());
96 }
97 if j >= types.len() {
98 let vr = row.get_ref(j)?;
99 match SQLiteTypeSystem::try_from((col.decl_type(), vr.data_type())) {
100 Ok(t) => types.push(Some(t)),
101 Err(_) => {
102 types.push(None);
103 }
104 }
105 } else if types[j].is_none() {
106 let vr = row.get_ref(j)?;
108 if let Ok(t) = SQLiteTypeSystem::try_from((col.decl_type(), vr.data_type()))
109 {
110 types[j] = Some(t)
111 }
112 }
113 }
114 Ok(())
115 });
116
117 match is_sucess {
118 Ok(()) => {
119 if !types.contains(&None) {
120 self.names = names;
121 self.schema = types.into_iter().map(|t| t.unwrap()).collect();
122 return;
123 } else if i == self.queries.len() - 1 {
124 debug!(
125 "cannot get metadata for '{}' due to null value: {:?}",
126 query, types
127 );
128 throw!(SQLiteSourceError::InferTypeFromNull);
129 }
130 }
131 Err(e) => {
132 if let rusqlite::Error::QueryReturnedNoRows = e {
133 num_empty += 1; }
135 if i == self.queries.len() - 1 && num_empty < self.queries.len() {
136 debug!("cannot get metadata for '{}': {}", query, e);
138 throw!(e)
139 }
140 }
141 }
142 }
143
144 let stmt = conn.prepare(self.queries[0].as_str())?;
146
147 self.names = stmt
148 .column_names()
149 .into_iter()
150 .map(|s| s.to_string())
151 .collect();
152 self.schema = vec![SQLiteTypeSystem::Text(false); self.names.len()];
154 }
155
156 #[throws(SQLiteSourceError)]
157 fn result_rows(&mut self) -> Option<usize> {
158 match &self.origin_query {
159 Some(q) => {
160 let cxq = CXQuery::Naked(q.clone());
161 let conn = self.pool.get()?;
162 let nrows =
163 conn.query_row(count_query(&cxq, &SQLiteDialect {})?.as_str(), [], |row| {
164 Ok(row.get::<_, i64>(0)? as usize)
165 })?;
166 Some(nrows)
167 }
168 None => None,
169 }
170 }
171
172 fn names(&self) -> Vec<String> {
173 self.names.clone()
174 }
175
176 fn schema(&self) -> Vec<Self::TypeSystem> {
177 self.schema.clone()
178 }
179
180 #[throws(SQLiteSourceError)]
181 fn partition(self) -> Vec<Self::Partition> {
182 let mut ret = vec![];
183 for query in self.queries {
184 let conn = self.pool.get()?;
185
186 ret.push(SQLiteSourcePartition::new(conn, &query, &self.schema));
187 }
188 ret
189 }
190}
191
192pub struct SQLiteSourcePartition {
193 conn: PooledConnection<SqliteConnectionManager>,
194 query: CXQuery<String>,
195 schema: Vec<SQLiteTypeSystem>,
196 nrows: usize,
197 ncols: usize,
198}
199
200impl SQLiteSourcePartition {
201 pub fn new(
202 conn: PooledConnection<SqliteConnectionManager>,
203 query: &CXQuery<String>,
204 schema: &[SQLiteTypeSystem],
205 ) -> Self {
206 Self {
207 conn,
208 query: query.clone(),
209 schema: schema.to_vec(),
210 nrows: 0,
211 ncols: schema.len(),
212 }
213 }
214}
215
216impl SourcePartition for SQLiteSourcePartition {
217 type TypeSystem = SQLiteTypeSystem;
218 type Parser<'a> = SQLiteSourcePartitionParser<'a>;
219 type Error = SQLiteSourceError;
220
221 #[throws(SQLiteSourceError)]
222 fn result_rows(&mut self) {
223 self.nrows = self.conn.query_row(
224 count_query(&self.query, &SQLiteDialect {})?.as_str(),
225 [],
226 |row| Ok(row.get::<_, i64>(0)? as usize),
227 )?;
228 }
229
230 #[throws(SQLiteSourceError)]
231 fn parser(&mut self) -> Self::Parser<'_> {
232 SQLiteSourcePartitionParser::new(&self.conn, self.query.as_str(), &self.schema)?
233 }
234
235 fn nrows(&self) -> usize {
236 self.nrows
237 }
238
239 fn ncols(&self) -> usize {
240 self.ncols
241 }
242}
243
244unsafe impl<'a> Send for SQLiteSourcePartitionParser<'a> {}
245
246pub struct SQLiteSourcePartitionParser<'a> {
247 rows: OwningHandle<Box<Statement<'a>>, DummyBox<Rows<'a>>>,
248 ncols: usize,
249 current_col: usize,
250 current_consumed: bool,
251 is_finished: bool,
252}
253
254impl<'a> SQLiteSourcePartitionParser<'a> {
255 #[throws(SQLiteSourceError)]
256 pub fn new(
257 conn: &'a PooledConnection<SqliteConnectionManager>,
258 query: &str,
259 schema: &[SQLiteTypeSystem],
260 ) -> Self {
261 let stmt: Statement<'a> = conn.prepare(query)?;
262
263 let rows: OwningHandle<Box<Statement<'a>>, DummyBox<Rows<'a>>> =
267 OwningHandle::new_with_fn(Box::new(stmt), |stmt: *const Statement<'a>| unsafe {
268 DummyBox((*(stmt as *mut Statement<'_>)).query([]).unwrap())
269 });
270 Self {
271 rows,
272 ncols: schema.len(),
273 current_col: 0,
274 current_consumed: true,
275 is_finished: false,
276 }
277 }
278
279 #[throws(SQLiteSourceError)]
280 fn next_loc(&mut self) -> (&Row, usize) {
281 self.current_consumed = true;
282 let row: &Row = (*self.rows)
283 .get()
284 .ok_or_else(|| anyhow!("Sqlite empty current row"))?;
285 let col = self.current_col;
286 self.current_col = (self.current_col + 1) % self.ncols;
287 (row, col)
288 }
289}
290
291impl<'a> PartitionParser<'a> for SQLiteSourcePartitionParser<'a> {
292 type TypeSystem = SQLiteTypeSystem;
293 type Error = SQLiteSourceError;
294
295 #[throws(SQLiteSourceError)]
296 fn fetch_next(&mut self) -> (usize, bool) {
297 assert!(self.current_col == 0);
298
299 if !self.current_consumed {
300 return (1, false);
301 } else if self.is_finished {
302 return (0, true);
303 }
304
305 match (*self.rows).next()? {
306 Some(_) => {
307 self.current_consumed = false;
308 (1, false)
309 }
310 None => {
311 self.is_finished = true;
312 (0, true)
313 }
314 }
315 }
316}
317
318macro_rules! impl_produce {
319 ($($t: ty,)+) => {
320 $(
321 impl<'r, 'a> Produce<'r, $t> for SQLiteSourcePartitionParser<'a> {
322 type Error = SQLiteSourceError;
323
324 #[throws(SQLiteSourceError)]
325 fn produce(&'r mut self) -> $t {
326 let (row, col) = self.next_loc()?;
327 let val = row.get(col)?;
328 val
329 }
330 }
331
332 impl<'r, 'a> Produce<'r, Option<$t>> for SQLiteSourcePartitionParser<'a> {
333 type Error = SQLiteSourceError;
334
335 #[throws(SQLiteSourceError)]
336 fn produce(&'r mut self) -> Option<$t> {
337 let (row, col) = self.next_loc()?;
338 let val = row.get(col)?;
339 val
340 }
341 }
342 )+
343 };
344}
345
346impl_produce!(
347 bool,
348 i64,
349 i32,
350 i16,
351 f64,
352 Box<str>,
353 NaiveDate,
354 NaiveTime,
355 NaiveDateTime,
356 Vec<u8>,
357);