connectorx/sources/
mod.rs

1//! This module defines four traits [`Source`], [`SourcePartition`], [`PartitionParser`], and [`Produce`]  to define a source.
2//! This module also contains source implementations for various databases.
3
4#[cfg(feature = "src_bigquery")]
5pub mod bigquery;
6#[cfg(feature = "src_clickhouse")]
7pub mod clickhouse;
8#[cfg(feature = "src_csv")]
9pub mod csv;
10#[cfg(feature = "src_dummy")]
11pub mod dummy;
12#[cfg(feature = "src_mssql")]
13pub mod mssql;
14#[cfg(feature = "src_mysql")]
15pub mod mysql;
16#[cfg(feature = "src_oracle")]
17pub mod oracle;
18#[cfg(feature = "src_postgres")]
19pub mod postgres;
20#[cfg(feature = "src_sqlite")]
21pub mod sqlite;
22#[cfg(feature = "src_trino")]
23pub mod trino;
24
25use crate::data_order::DataOrder;
26use crate::errors::ConnectorXError;
27use crate::sql::CXQuery;
28use crate::typesystem::{TypeAssoc, TypeSystem};
29use std::fmt::Debug;
30
31pub trait Source {
32    /// Supported data orders, ordering by preference.
33    const DATA_ORDERS: &'static [DataOrder];
34    /// The type system this `Source` associated with.
35    type TypeSystem: TypeSystem;
36    // Partition needs to be send to different threads for parallel execution
37    type Partition: SourcePartition<TypeSystem = Self::TypeSystem, Error = Self::Error> + Send;
38    type Error: From<ConnectorXError> + Send + Debug;
39
40    fn set_data_order(&mut self, data_order: DataOrder) -> Result<(), Self::Error>;
41
42    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]);
43
44    fn set_origin_query(&mut self, query: Option<String>);
45
46    fn set_pre_execution_queries(&mut self, _pre_execution_queries: Option<&[String]>) {
47        unimplemented!("pre_execution_queries is not implemented in this source type");
48    }
49
50    fn fetch_metadata(&mut self) -> Result<(), Self::Error>;
51    /// Get total number of rows if available
52    fn result_rows(&mut self) -> Result<Option<usize>, Self::Error>;
53
54    fn names(&self) -> Vec<String>;
55
56    fn schema(&self) -> Vec<Self::TypeSystem>;
57
58    fn partition(self) -> Result<Vec<Self::Partition>, Self::Error>;
59}
60
61/// In general, a `DataSource` abstracts the data source as a stream, which can produce
62/// a sequence of values of variate types by repetitively calling the function `produce`.
63pub trait SourcePartition {
64    type TypeSystem: TypeSystem;
65    type Parser<'a>: PartitionParser<'a, TypeSystem = Self::TypeSystem, Error = Self::Error>
66    where
67        Self: 'a;
68    type Error: From<ConnectorXError> + Send + Debug;
69
70    /// Count total number of rows in each partition.
71    fn result_rows(&mut self) -> Result<(), Self::Error>;
72
73    fn parser(&mut self) -> Result<Self::Parser<'_>, Self::Error>;
74
75    /// Number of rows this `DataSource` got.
76    /// Sometimes it is not possible for the source to know how many rows it gets before reading the whole data.
77    fn nrows(&self) -> usize;
78
79    /// Number of cols this `DataSource` got.
80    fn ncols(&self) -> usize;
81}
82
83pub trait PartitionParser<'a>: Send {
84    type TypeSystem: TypeSystem;
85    type Error: From<ConnectorXError> + Send + Debug;
86
87    /// Read a value `T` by calling `Produce<T>::produce`. Usually this function does not need to be
88    /// implemented.
89    fn parse<'r, T>(&'r mut self) -> Result<T, <Self as PartitionParser<'a>>::Error>
90    where
91        T: TypeAssoc<Self::TypeSystem>,
92        Self: Produce<'r, T, Error = <Self as PartitionParser<'a>>::Error>,
93    {
94        self.produce()
95    }
96
97    /// Fetch next batch of rows from database, return (number of rows fetched to local, whether all rows are fechted from database).
98    /// There might be rows that are not consumed yet when calling the next fetch_next.
99    /// The function might be called even after the last batch is fetched.
100    fn fetch_next(&mut self) -> Result<(usize, bool), Self::Error>;
101}
102
103/// A type implemented `Produce<T>` means that it can produce a value `T` by consuming part of it's raw data buffer.
104pub trait Produce<'r, T> {
105    type Error: From<ConnectorXError> + Send;
106
107    fn produce(&'r mut self) -> Result<T, Self::Error>;
108}