1use crate::{
4 data_order::{coordinate, DataOrder},
5 destinations::{Destination, DestinationPartition},
6 errors::{ConnectorXError, Result as CXResult},
7 sources::{PartitionParser, Source, SourcePartition},
8 sql::CXQuery,
9 typesystem::Transport,
10};
11use itertools::Itertools;
12use log::debug;
13use rayon::prelude::*;
14use std::marker::PhantomData;
15
16pub struct Dispatcher<'a, S, D, TP> {
19 src: S,
20 dst: &'a mut D,
21 queries: Vec<CXQuery<String>>,
22 origin_query: Option<String>,
23 _phantom: PhantomData<TP>,
24}
25
26impl<'w, S, D, TP> Dispatcher<'w, S, D, TP>
27where
28 S: Source,
29 D: Destination,
30 TP: Transport<TSS = S::TypeSystem, TSD = D::TypeSystem, S = S, D = D>,
31{
32 pub fn new<Q>(src: S, dst: &'w mut D, queries: &[Q], origin_query: Option<String>) -> Self
34 where
35 for<'a> &'a Q: Into<CXQuery>,
36 {
37 Self {
38 src,
39 dst,
40 queries: queries.iter().map(Into::into).collect(),
41 origin_query,
42 _phantom: PhantomData,
43 }
44 }
45
46 pub fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
47 self.src.set_pre_execution_queries(pre_execution_queries);
48 }
49
50 pub fn prepare(
51 mut self,
52 ) -> Result<
53 (
54 DataOrder,
55 Vec<S::Partition>,
56 Vec<D::Partition<'w>>,
57 Vec<S::TypeSystem>,
58 Vec<D::TypeSystem>,
59 ),
60 TP::Error,
61 > {
62 debug!("Prepare");
63 let dorder = coordinate(S::DATA_ORDERS, D::DATA_ORDERS)?;
64 self.src.set_data_order(dorder)?;
65 self.src.set_queries(self.queries.as_slice());
66 self.src.set_origin_query(self.origin_query);
67
68 debug!("Fetching metadata");
69 self.src.fetch_metadata()?;
70 let src_schema = self.src.schema();
71 let dst_schema = src_schema
72 .iter()
73 .map(|&s| TP::convert_typesystem(s))
74 .collect::<CXResult<Vec<_>>>()?;
75 let names = self.src.names();
76
77 let mut total_rows = if self.dst.needs_count() {
78 debug!("Try get row rounts for entire result");
80 self.src.result_rows()?
81 } else {
82 debug!("Do not need counts in advance");
83 Some(0)
84 };
85 let mut src_partitions: Vec<S::Partition> = self.src.partition()?;
86 if self.dst.needs_count() && total_rows.is_none() {
87 debug!("Manually count rows of each partitioned query and sum up");
88 src_partitions
90 .par_iter_mut()
91 .try_for_each(|partition| -> Result<(), S::Error> { partition.result_rows() })?;
92
93 let part_rows: Vec<usize> = src_partitions
95 .iter()
96 .map(|partition| partition.nrows())
97 .collect();
98 total_rows = Some(part_rows.iter().sum());
99 }
100 let total_rows = total_rows.ok_or_else(ConnectorXError::CountError)?;
101
102 debug!(
103 "Allocate destination memory: {}x{}",
104 total_rows,
105 src_schema.len()
106 );
107 self.dst.allocate(total_rows, &names, &dst_schema, dorder)?;
108
109 debug!("Create destination partition");
110 let dst_partitions = self.dst.partition(self.queries.len())?;
111
112 Ok((
113 dorder,
114 src_partitions,
115 dst_partitions,
116 src_schema,
117 dst_schema,
118 ))
119 }
120
121 pub fn run(self) -> Result<(), TP::Error> {
123 debug!("Run dispatcher");
124 let (dorder, src_partitions, dst_partitions, src_schema, dst_schema) = self.prepare()?;
125
126 #[cfg(all(not(feature = "branch"), not(feature = "fptr")))]
127 compile_error!("branch or fptr, pick one");
128
129 #[cfg(feature = "branch")]
130 let schemas: Vec<_> = src_schema
131 .iter()
132 .zip_eq(&dst_schema)
133 .map(|(&src_ty, &dst_ty)| (src_ty, dst_ty))
134 .collect();
135
136 debug!("Start writing");
137 dst_partitions
139 .into_par_iter()
140 .zip_eq(src_partitions)
141 .enumerate()
142 .try_for_each(|(i, (mut dst, mut src))| -> Result<(), TP::Error> {
143 #[cfg(feature = "fptr")]
144 let f: Vec<_> = src_schema
145 .iter()
146 .zip_eq(&dst_schema)
147 .map(|(&src_ty, &dst_ty)| TP::processor(src_ty, dst_ty))
148 .collect::<CXResult<Vec<_>>>()?;
149
150 let mut parser = src.parser()?;
151
152 match dorder {
153 DataOrder::RowMajor => loop {
154 let (n, is_last) = parser.fetch_next()?;
155 dst.aquire_row(n)?;
156 for _ in 0..n {
157 #[allow(clippy::needless_range_loop)]
158 for col in 0..dst.ncols() {
159 #[cfg(feature = "fptr")]
160 f[col](&mut parser, &mut dst)?;
161
162 #[cfg(feature = "branch")]
163 {
164 let (s1, s2) = schemas[col];
165 TP::process(s1, s2, &mut parser, &mut dst)?;
166 }
167 }
168 }
169 if is_last {
170 break;
171 }
172 },
173 DataOrder::ColumnMajor => loop {
174 let (n, is_last) = parser.fetch_next()?;
175 dst.aquire_row(n)?;
176 #[allow(clippy::needless_range_loop)]
177 for col in 0..dst.ncols() {
178 for _ in 0..n {
179 #[cfg(feature = "fptr")]
180 f[col](&mut parser, &mut dst)?;
181 #[cfg(feature = "branch")]
182 {
183 let (s1, s2) = schemas[col];
184 TP::process(s1, s2, &mut parser, &mut dst)?;
185 }
186 }
187 }
188 if is_last {
189 break;
190 }
191 },
192 }
193
194 debug!("Finalize partition {}", i);
195 dst.finalize()?;
196 debug!("Partition {} finished", i);
197 Ok(())
198 })?;
199
200 debug!("Writing finished");
201
202 Ok(())
203 }
204
205 pub fn get_meta(&mut self) -> Result<(), TP::Error> {
207 let dorder = coordinate(S::DATA_ORDERS, D::DATA_ORDERS)?;
208 self.src.set_data_order(dorder)?;
209 self.src.set_queries(self.queries.as_slice());
210 self.src.set_origin_query(self.origin_query.clone());
211 self.src.fetch_metadata()?;
212 let src_schema = self.src.schema();
213 let dst_schema = src_schema
214 .iter()
215 .map(|&s| TP::convert_typesystem(s))
216 .collect::<CXResult<Vec<_>>>()?;
217 let names = self.src.names();
218 self.dst.allocate(0, &names, &dst_schema, dorder)?;
219 Ok(())
220 }
221}