connectorx/
dispatcher.rs

1///! This module provides [`dispatcher::Dispatcher`], the core struct in ConnectorX
2///! that drives the data loading from a source to a destination.
3use crate::{
4    data_order::{coordinate, DataOrder},
5    destinations::{Destination, DestinationPartition},
6    errors::{ConnectorXError, Result as CXResult},
7    sources::{PartitionParser, Source, SourcePartition},
8    sql::CXQuery,
9    typesystem::Transport,
10};
11use itertools::Itertools;
12use log::debug;
13use rayon::prelude::*;
14use std::marker::PhantomData;
15
16/// A dispatcher takes a `S: Source`, a `D: Destination`, a `TP: Transport` and a vector of `queries` as input to
17/// load data from `S` to `D` using the queries.
18pub struct Dispatcher<'a, S, D, TP> {
19    src: S,
20    dst: &'a mut D,
21    queries: Vec<CXQuery<String>>,
22    origin_query: Option<String>,
23    _phantom: PhantomData<TP>,
24}
25
26impl<'w, S, D, TP> Dispatcher<'w, S, D, TP>
27where
28    S: Source,
29    D: Destination,
30    TP: Transport<TSS = S::TypeSystem, TSD = D::TypeSystem, S = S, D = D>,
31{
32    /// Create a new dispatcher by providing a source, a destination and the queries.
33    pub fn new<Q>(src: S, dst: &'w mut D, queries: &[Q], origin_query: Option<String>) -> Self
34    where
35        for<'a> &'a Q: Into<CXQuery>,
36    {
37        Self {
38            src,
39            dst,
40            queries: queries.iter().map(Into::into).collect(),
41            origin_query,
42            _phantom: PhantomData,
43        }
44    }
45
46    pub fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
47        self.src.set_pre_execution_queries(pre_execution_queries);
48    }
49
50    pub fn prepare(
51        mut self,
52    ) -> Result<
53        (
54            DataOrder,
55            Vec<S::Partition>,
56            Vec<D::Partition<'w>>,
57            Vec<S::TypeSystem>,
58            Vec<D::TypeSystem>,
59        ),
60        TP::Error,
61    > {
62        debug!("Prepare");
63        let dorder = coordinate(S::DATA_ORDERS, D::DATA_ORDERS)?;
64        self.src.set_data_order(dorder)?;
65        self.src.set_queries(self.queries.as_slice());
66        self.src.set_origin_query(self.origin_query);
67
68        debug!("Fetching metadata");
69        self.src.fetch_metadata()?;
70        let src_schema = self.src.schema();
71        let dst_schema = src_schema
72            .iter()
73            .map(|&s| TP::convert_typesystem(s))
74            .collect::<CXResult<Vec<_>>>()?;
75        let names = self.src.names();
76
77        let mut total_rows = if self.dst.needs_count() {
78            // return None if cannot derive total count
79            debug!("Try get row rounts for entire result");
80            self.src.result_rows()?
81        } else {
82            debug!("Do not need counts in advance");
83            Some(0)
84        };
85        let mut src_partitions: Vec<S::Partition> = self.src.partition()?;
86        if self.dst.needs_count() && total_rows.is_none() {
87            debug!("Manually count rows of each partitioned query and sum up");
88            // run queries
89            src_partitions
90                .par_iter_mut()
91                .try_for_each(|partition| -> Result<(), S::Error> { partition.result_rows() })?;
92
93            // get number of row of each partition from the source
94            let part_rows: Vec<usize> = src_partitions
95                .iter()
96                .map(|partition| partition.nrows())
97                .collect();
98            total_rows = Some(part_rows.iter().sum());
99        }
100        let total_rows = total_rows.ok_or_else(ConnectorXError::CountError)?;
101
102        debug!(
103            "Allocate destination memory: {}x{}",
104            total_rows,
105            src_schema.len()
106        );
107        self.dst.allocate(total_rows, &names, &dst_schema, dorder)?;
108
109        debug!("Create destination partition");
110        let dst_partitions = self.dst.partition(self.queries.len())?;
111
112        Ok((
113            dorder,
114            src_partitions,
115            dst_partitions,
116            src_schema,
117            dst_schema,
118        ))
119    }
120
121    /// Start the data loading process.
122    pub fn run(self) -> Result<(), TP::Error> {
123        debug!("Run dispatcher");
124        let (dorder, src_partitions, dst_partitions, src_schema, dst_schema) = self.prepare()?;
125
126        #[cfg(all(not(feature = "branch"), not(feature = "fptr")))]
127        compile_error!("branch or fptr, pick one");
128
129        #[cfg(feature = "branch")]
130        let schemas: Vec<_> = src_schema
131            .iter()
132            .zip_eq(&dst_schema)
133            .map(|(&src_ty, &dst_ty)| (src_ty, dst_ty))
134            .collect();
135
136        debug!("Start writing");
137        // parse and write
138        dst_partitions
139            .into_par_iter()
140            .zip_eq(src_partitions)
141            .enumerate()
142            .try_for_each(|(i, (mut dst, mut src))| -> Result<(), TP::Error> {
143                #[cfg(feature = "fptr")]
144                let f: Vec<_> = src_schema
145                    .iter()
146                    .zip_eq(&dst_schema)
147                    .map(|(&src_ty, &dst_ty)| TP::processor(src_ty, dst_ty))
148                    .collect::<CXResult<Vec<_>>>()?;
149
150                let mut parser = src.parser()?;
151
152                match dorder {
153                    DataOrder::RowMajor => loop {
154                        let (n, is_last) = parser.fetch_next()?;
155                        dst.aquire_row(n)?;
156                        for _ in 0..n {
157                            #[allow(clippy::needless_range_loop)]
158                            for col in 0..dst.ncols() {
159                                #[cfg(feature = "fptr")]
160                                f[col](&mut parser, &mut dst)?;
161
162                                #[cfg(feature = "branch")]
163                                {
164                                    let (s1, s2) = schemas[col];
165                                    TP::process(s1, s2, &mut parser, &mut dst)?;
166                                }
167                            }
168                        }
169                        if is_last {
170                            break;
171                        }
172                    },
173                    DataOrder::ColumnMajor => loop {
174                        let (n, is_last) = parser.fetch_next()?;
175                        dst.aquire_row(n)?;
176                        #[allow(clippy::needless_range_loop)]
177                        for col in 0..dst.ncols() {
178                            for _ in 0..n {
179                                #[cfg(feature = "fptr")]
180                                f[col](&mut parser, &mut dst)?;
181                                #[cfg(feature = "branch")]
182                                {
183                                    let (s1, s2) = schemas[col];
184                                    TP::process(s1, s2, &mut parser, &mut dst)?;
185                                }
186                            }
187                        }
188                        if is_last {
189                            break;
190                        }
191                    },
192                }
193
194                debug!("Finalize partition {}", i);
195                dst.finalize()?;
196                debug!("Partition {} finished", i);
197                Ok(())
198            })?;
199
200        debug!("Writing finished");
201
202        Ok(())
203    }
204
205    /// Only fetch the metadata (header) of the destination.
206    pub fn get_meta(&mut self) -> Result<(), TP::Error> {
207        let dorder = coordinate(S::DATA_ORDERS, D::DATA_ORDERS)?;
208        self.src.set_data_order(dorder)?;
209        self.src.set_queries(self.queries.as_slice());
210        self.src.set_origin_query(self.origin_query.clone());
211        self.src.fetch_metadata()?;
212        let src_schema = self.src.schema();
213        let dst_schema = src_schema
214            .iter()
215            .map(|&s| TP::convert_typesystem(s))
216            .collect::<CXResult<Vec<_>>>()?;
217        let names = self.src.names();
218        self.dst.allocate(0, &names, &dst_schema, dorder)?;
219        Ok(())
220    }
221}