connectorx/sources/
mod.rs

1//! This module defines four traits [`Source`], [`SourcePartition`], [`PartitionParser`], and [`Produce`]  to define a source.
2//! This module also contains source implementations for various databases.
3
4#[cfg(feature = "src_bigquery")]
5pub mod bigquery;
6#[cfg(feature = "src_csv")]
7pub mod csv;
8#[cfg(feature = "src_dummy")]
9pub mod dummy;
10#[cfg(feature = "src_mssql")]
11pub mod mssql;
12#[cfg(feature = "src_mysql")]
13pub mod mysql;
14#[cfg(feature = "src_oracle")]
15pub mod oracle;
16#[cfg(feature = "src_postgres")]
17pub mod postgres;
18#[cfg(feature = "src_sqlite")]
19pub mod sqlite;
20#[cfg(feature = "src_trino")]
21pub mod trino;
22
23use crate::data_order::DataOrder;
24use crate::errors::ConnectorXError;
25use crate::sql::CXQuery;
26use crate::typesystem::{TypeAssoc, TypeSystem};
27use std::fmt::Debug;
28
29pub trait Source {
30    /// Supported data orders, ordering by preference.
31    const DATA_ORDERS: &'static [DataOrder];
32    /// The type system this `Source` associated with.
33    type TypeSystem: TypeSystem;
34    // Partition needs to be send to different threads for parallel execution
35    type Partition: SourcePartition<TypeSystem = Self::TypeSystem, Error = Self::Error> + Send;
36    type Error: From<ConnectorXError> + Send + Debug;
37
38    fn set_data_order(&mut self, data_order: DataOrder) -> Result<(), Self::Error>;
39
40    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]);
41
42    fn set_origin_query(&mut self, query: Option<String>);
43
44    fn set_pre_execution_queries(&mut self, _pre_execution_queries: Option<&[String]>) {
45        unimplemented!("pre_execution_queries is not implemented in this source type");
46    }
47
48    fn fetch_metadata(&mut self) -> Result<(), Self::Error>;
49    /// Get total number of rows if available
50    fn result_rows(&mut self) -> Result<Option<usize>, Self::Error>;
51
52    fn names(&self) -> Vec<String>;
53
54    fn schema(&self) -> Vec<Self::TypeSystem>;
55
56    fn partition(self) -> Result<Vec<Self::Partition>, Self::Error>;
57}
58
59/// In general, a `DataSource` abstracts the data source as a stream, which can produce
60/// a sequence of values of variate types by repetitively calling the function `produce`.
61pub trait SourcePartition {
62    type TypeSystem: TypeSystem;
63    type Parser<'a>: PartitionParser<'a, TypeSystem = Self::TypeSystem, Error = Self::Error>
64    where
65        Self: 'a;
66    type Error: From<ConnectorXError> + Send + Debug;
67
68    /// Count total number of rows in each partition.
69    fn result_rows(&mut self) -> Result<(), Self::Error>;
70
71    fn parser(&mut self) -> Result<Self::Parser<'_>, Self::Error>;
72
73    /// Number of rows this `DataSource` got.
74    /// Sometimes it is not possible for the source to know how many rows it gets before reading the whole data.
75    fn nrows(&self) -> usize;
76
77    /// Number of cols this `DataSource` got.
78    fn ncols(&self) -> usize;
79}
80
81pub trait PartitionParser<'a>: Send {
82    type TypeSystem: TypeSystem;
83    type Error: From<ConnectorXError> + Send + Debug;
84
85    /// Read a value `T` by calling `Produce<T>::produce`. Usually this function does not need to be
86    /// implemented.
87    fn parse<'r, T>(&'r mut self) -> Result<T, <Self as PartitionParser<'a>>::Error>
88    where
89        T: TypeAssoc<Self::TypeSystem>,
90        Self: Produce<'r, T, Error = <Self as PartitionParser<'a>>::Error>,
91    {
92        self.produce()
93    }
94
95    /// Fetch next batch of rows from database, return (number of rows fetched to local, whether all rows are fechted from database).
96    /// There might be rows that are not consumed yet when calling the next fetch_next.
97    /// The function might be called even after the last batch is fetched.
98    fn fetch_next(&mut self) -> Result<(usize, bool), Self::Error>;
99}
100
101/// A type implemented `Produce<T>` means that it can produce a value `T` by consuming part of it's raw data buffer.
102pub trait Produce<'r, T> {
103    type Error: From<ConnectorXError> + Send;
104
105    fn produce(&'r mut self) -> Result<T, Self::Error>;
106}