connectorx/sources/dummy/
mod.rs

1//! A dummy source that generates different values based on an internal counter.
2//! This source is for test purpose.
3
4mod typesystem;
5
6pub use self::typesystem::DummyTypeSystem;
7use super::{PartitionParser, Produce, Source, SourcePartition};
8use crate::data_order::DataOrder;
9use crate::errors::{ConnectorXError, Result};
10use crate::sql::CXQuery;
11use chrono::{offset, DateTime, Utc};
12use fehler::{throw, throws};
13use num_traits::cast::FromPrimitive;
14
15pub struct DummySource {
16    names: Vec<String>,
17    schema: Vec<DummyTypeSystem>,
18    queries: Vec<CXQuery<String>>,
19}
20
21impl DummySource {
22    pub fn new<S: AsRef<str>>(names: &[S], schema: &[DummyTypeSystem]) -> Self {
23        assert_eq!(names.len(), schema.len());
24        DummySource {
25            names: names.iter().map(|s| s.as_ref().to_string()).collect(),
26            schema: schema.to_vec(),
27            queries: vec![],
28        }
29    }
30}
31
32impl Source for DummySource {
33    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
34    type TypeSystem = DummyTypeSystem;
35    type Partition = DummySourcePartition;
36    type Error = ConnectorXError;
37
38    #[throws(ConnectorXError)]
39    fn set_data_order(&mut self, data_order: DataOrder) {
40        if !matches!(data_order, DataOrder::RowMajor) {
41            throw!(ConnectorXError::UnsupportedDataOrder(data_order))
42        }
43    }
44
45    // query: nrows,ncols
46    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
47        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
48    }
49
50    fn set_origin_query(&mut self, _query: Option<String>) {}
51
52    fn fetch_metadata(&mut self) -> Result<()> {
53        Ok(())
54    }
55
56    fn result_rows(&mut self) -> Result<Option<usize>> {
57        Ok(None)
58    }
59
60    fn names(&self) -> Vec<String> {
61        self.names.clone()
62    }
63
64    fn schema(&self) -> Vec<Self::TypeSystem> {
65        self.schema.clone()
66    }
67
68    fn partition(self) -> Result<Vec<Self::Partition>> {
69        assert!(!self.queries.is_empty());
70        let queries = self.queries;
71        let schema = self.schema;
72
73        Ok(queries
74            .into_iter()
75            .map(|q| DummySourcePartition::new(&schema, &q))
76            .collect())
77    }
78}
79
80pub struct DummySourcePartition {
81    nrows: usize,
82    ncols: usize,
83    counter: usize,
84}
85
86impl DummySourcePartition {
87    pub fn new(_schema: &[DummyTypeSystem], q: &CXQuery<String>) -> Self {
88        let v: Vec<usize> = q.as_str().split(',').map(|s| s.parse().unwrap()).collect();
89
90        DummySourcePartition {
91            nrows: v[0],
92            ncols: v[1],
93            counter: 0,
94        }
95    }
96}
97
98impl SourcePartition for DummySourcePartition {
99    type TypeSystem = DummyTypeSystem;
100    type Parser<'a> = DummySourcePartitionParser<'a>;
101    type Error = ConnectorXError;
102
103    fn result_rows(&mut self) -> Result<()> {
104        Ok(())
105    }
106
107    fn parser(&mut self) -> Result<Self::Parser<'_>> {
108        Ok(DummySourcePartitionParser::new(
109            &mut self.counter,
110            self.nrows,
111            self.ncols,
112        ))
113    }
114
115    fn nrows(&self) -> usize {
116        self.nrows
117    }
118
119    fn ncols(&self) -> usize {
120        self.ncols
121    }
122}
123
124pub struct DummySourcePartitionParser<'a> {
125    counter: &'a mut usize,
126    #[allow(unused)]
127    nrows: usize,
128    ncols: usize,
129}
130
131impl<'a> DummySourcePartitionParser<'a> {
132    fn new(counter: &'a mut usize, nrows: usize, ncols: usize) -> Self {
133        DummySourcePartitionParser {
134            counter,
135            ncols,
136            nrows,
137        }
138    }
139
140    fn next_val(&mut self) -> usize {
141        let ret = *self.counter / self.ncols;
142        *self.counter += 1;
143        ret
144    }
145}
146
147impl<'a> PartitionParser<'a> for DummySourcePartitionParser<'a> {
148    type TypeSystem = DummyTypeSystem;
149    type Error = ConnectorXError;
150
151    fn fetch_next(&mut self) -> Result<(usize, bool)> {
152        Ok((self.nrows, true))
153    }
154}
155
156macro_rules! numeric_impl {
157    ($($t: ty),+) => {
158        $(
159            impl<'r, 'a> Produce<'r, $t> for DummySourcePartitionParser<'a> {
160                type Error = ConnectorXError;
161
162                fn produce(&mut self) -> Result<$t> {
163                    let ret = self.next_val();
164                    Ok(FromPrimitive::from_usize(ret).unwrap_or_default())
165                }
166            }
167
168            impl<'r, 'a> Produce<'r, Option<$t>> for DummySourcePartitionParser<'a> {
169                type Error = ConnectorXError;
170
171                fn produce(&mut self) -> Result<Option<$t>> {
172                    let ret = self.next_val();
173                    Ok(Some(FromPrimitive::from_usize(ret).unwrap_or_default()))
174                }
175            }
176        )+
177    };
178}
179
180numeric_impl!(u64, i32, i64, f64);
181
182impl<'r, 'a> Produce<'r, String> for DummySourcePartitionParser<'a> {
183    type Error = ConnectorXError;
184
185    fn produce(&mut self) -> Result<String> {
186        let ret = self.next_val().to_string();
187        Ok(ret)
188    }
189}
190
191impl<'r, 'a> Produce<'r, Option<String>> for DummySourcePartitionParser<'a> {
192    type Error = ConnectorXError;
193
194    fn produce(&mut self) -> Result<Option<String>> {
195        let ret = self.next_val().to_string();
196        Ok(Some(ret))
197    }
198}
199
200impl<'r, 'a> Produce<'r, bool> for DummySourcePartitionParser<'a> {
201    type Error = ConnectorXError;
202
203    fn produce(&mut self) -> Result<bool> {
204        let ret = self.next_val() % 2 == 0;
205        Ok(ret)
206    }
207}
208
209impl<'r, 'a> Produce<'r, Option<bool>> for DummySourcePartitionParser<'a> {
210    type Error = ConnectorXError;
211
212    fn produce(&mut self) -> Result<Option<bool>> {
213        let ret = match self.next_val() % 3 {
214            0 => Some(true),
215            1 => Some(false),
216            2 => None,
217            _ => unreachable!(),
218        };
219
220        Ok(ret)
221    }
222}
223
224impl<'r, 'a> Produce<'r, DateTime<Utc>> for DummySourcePartitionParser<'a> {
225    type Error = ConnectorXError;
226
227    fn produce(&mut self) -> Result<DateTime<Utc>> {
228        self.next_val();
229        let ret = offset::Utc::now();
230
231        Ok(ret)
232    }
233}
234
235impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for DummySourcePartitionParser<'a> {
236    type Error = ConnectorXError;
237
238    fn produce(&mut self) -> Result<Option<DateTime<Utc>>> {
239        self.next_val();
240        let ret = match self.next_val() % 2 {
241            0 => Some(offset::Utc::now()),
242            1 => None,
243            _ => unreachable!(),
244        };
245        Ok(ret)
246    }
247}