connectorx/sources/dummy/
mod.rs1mod typesystem;
5
6pub use self::typesystem::DummyTypeSystem;
7use super::{PartitionParser, Produce, Source, SourcePartition};
8use crate::data_order::DataOrder;
9use crate::errors::{ConnectorXError, Result};
10use crate::sql::CXQuery;
11use chrono::{offset, DateTime, Utc};
12use fehler::{throw, throws};
13use num_traits::cast::FromPrimitive;
14
15pub struct DummySource {
16 names: Vec<String>,
17 schema: Vec<DummyTypeSystem>,
18 queries: Vec<CXQuery<String>>,
19}
20
21impl DummySource {
22 pub fn new<S: AsRef<str>>(names: &[S], schema: &[DummyTypeSystem]) -> Self {
23 assert_eq!(names.len(), schema.len());
24 DummySource {
25 names: names.iter().map(|s| s.as_ref().to_string()).collect(),
26 schema: schema.to_vec(),
27 queries: vec![],
28 }
29 }
30}
31
32impl Source for DummySource {
33 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
34 type TypeSystem = DummyTypeSystem;
35 type Partition = DummySourcePartition;
36 type Error = ConnectorXError;
37
38 #[throws(ConnectorXError)]
39 fn set_data_order(&mut self, data_order: DataOrder) {
40 if !matches!(data_order, DataOrder::RowMajor) {
41 throw!(ConnectorXError::UnsupportedDataOrder(data_order))
42 }
43 }
44
45 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
47 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
48 }
49
50 fn set_origin_query(&mut self, _query: Option<String>) {}
51
52 fn fetch_metadata(&mut self) -> Result<()> {
53 Ok(())
54 }
55
56 fn result_rows(&mut self) -> Result<Option<usize>> {
57 Ok(None)
58 }
59
60 fn names(&self) -> Vec<String> {
61 self.names.clone()
62 }
63
64 fn schema(&self) -> Vec<Self::TypeSystem> {
65 self.schema.clone()
66 }
67
68 fn partition(self) -> Result<Vec<Self::Partition>> {
69 assert!(!self.queries.is_empty());
70 let queries = self.queries;
71 let schema = self.schema;
72
73 Ok(queries
74 .into_iter()
75 .map(|q| DummySourcePartition::new(&schema, &q))
76 .collect())
77 }
78}
79
80pub struct DummySourcePartition {
81 nrows: usize,
82 ncols: usize,
83 counter: usize,
84}
85
86impl DummySourcePartition {
87 pub fn new(_schema: &[DummyTypeSystem], q: &CXQuery<String>) -> Self {
88 let v: Vec<usize> = q.as_str().split(',').map(|s| s.parse().unwrap()).collect();
89
90 DummySourcePartition {
91 nrows: v[0],
92 ncols: v[1],
93 counter: 0,
94 }
95 }
96}
97
98impl SourcePartition for DummySourcePartition {
99 type TypeSystem = DummyTypeSystem;
100 type Parser<'a> = DummySourcePartitionParser<'a>;
101 type Error = ConnectorXError;
102
103 fn result_rows(&mut self) -> Result<()> {
104 Ok(())
105 }
106
107 fn parser(&mut self) -> Result<Self::Parser<'_>> {
108 Ok(DummySourcePartitionParser::new(
109 &mut self.counter,
110 self.nrows,
111 self.ncols,
112 ))
113 }
114
115 fn nrows(&self) -> usize {
116 self.nrows
117 }
118
119 fn ncols(&self) -> usize {
120 self.ncols
121 }
122}
123
124pub struct DummySourcePartitionParser<'a> {
125 counter: &'a mut usize,
126 #[allow(unused)]
127 nrows: usize,
128 ncols: usize,
129}
130
131impl<'a> DummySourcePartitionParser<'a> {
132 fn new(counter: &'a mut usize, nrows: usize, ncols: usize) -> Self {
133 DummySourcePartitionParser {
134 counter,
135 ncols,
136 nrows,
137 }
138 }
139
140 fn next_val(&mut self) -> usize {
141 let ret = *self.counter / self.ncols;
142 *self.counter += 1;
143 ret
144 }
145}
146
147impl<'a> PartitionParser<'a> for DummySourcePartitionParser<'a> {
148 type TypeSystem = DummyTypeSystem;
149 type Error = ConnectorXError;
150
151 fn fetch_next(&mut self) -> Result<(usize, bool)> {
152 Ok((self.nrows, true))
153 }
154}
155
156macro_rules! numeric_impl {
157 ($($t: ty),+) => {
158 $(
159 impl<'r, 'a> Produce<'r, $t> for DummySourcePartitionParser<'a> {
160 type Error = ConnectorXError;
161
162 fn produce(&mut self) -> Result<$t> {
163 let ret = self.next_val();
164 Ok(FromPrimitive::from_usize(ret).unwrap_or_default())
165 }
166 }
167
168 impl<'r, 'a> Produce<'r, Option<$t>> for DummySourcePartitionParser<'a> {
169 type Error = ConnectorXError;
170
171 fn produce(&mut self) -> Result<Option<$t>> {
172 let ret = self.next_val();
173 Ok(Some(FromPrimitive::from_usize(ret).unwrap_or_default()))
174 }
175 }
176 )+
177 };
178}
179
180numeric_impl!(u64, i32, i64, f64);
181
182impl<'r, 'a> Produce<'r, String> for DummySourcePartitionParser<'a> {
183 type Error = ConnectorXError;
184
185 fn produce(&mut self) -> Result<String> {
186 let ret = self.next_val().to_string();
187 Ok(ret)
188 }
189}
190
191impl<'r, 'a> Produce<'r, Option<String>> for DummySourcePartitionParser<'a> {
192 type Error = ConnectorXError;
193
194 fn produce(&mut self) -> Result<Option<String>> {
195 let ret = self.next_val().to_string();
196 Ok(Some(ret))
197 }
198}
199
200impl<'r, 'a> Produce<'r, bool> for DummySourcePartitionParser<'a> {
201 type Error = ConnectorXError;
202
203 fn produce(&mut self) -> Result<bool> {
204 let ret = self.next_val() % 2 == 0;
205 Ok(ret)
206 }
207}
208
209impl<'r, 'a> Produce<'r, Option<bool>> for DummySourcePartitionParser<'a> {
210 type Error = ConnectorXError;
211
212 fn produce(&mut self) -> Result<Option<bool>> {
213 let ret = match self.next_val() % 3 {
214 0 => Some(true),
215 1 => Some(false),
216 2 => None,
217 _ => unreachable!(),
218 };
219
220 Ok(ret)
221 }
222}
223
224impl<'r, 'a> Produce<'r, DateTime<Utc>> for DummySourcePartitionParser<'a> {
225 type Error = ConnectorXError;
226
227 fn produce(&mut self) -> Result<DateTime<Utc>> {
228 self.next_val();
229 let ret = offset::Utc::now();
230
231 Ok(ret)
232 }
233}
234
235impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for DummySourcePartitionParser<'a> {
236 type Error = ConnectorXError;
237
238 fn produce(&mut self) -> Result<Option<DateTime<Utc>>> {
239 self.next_val();
240 let ret = match self.next_val() % 2 {
241 0 => Some(offset::Utc::now()),
242 1 => None,
243 _ => unreachable!(),
244 };
245 Ok(ret)
246 }
247}