connectorx/lib.rs
1#![allow(clippy::upper_case_acronyms)]
2
3//! # ConnectorX
4//!
5//! ConnectorX enables you to load data from databases into dataframes in the fastest and most memory efficient way by leveraging
6//! zero-copy and partition-based parallelism.
7//!
8//! Currently, ConnectorX consists of a Rust core library and a python library. This is the documentation for the Rust crate.
9//! For the documentation of the Python library, please refer to our [Github Readme](https://github.com/sfu-db/connector-x).
10//!
11//! # Design
12//!
13//! A data loading problem consists of three sub-problems:
14//! 1. How to connect to the data source and read data.
15//! 2. How to connect to the data destination and write data.
16//! 3. How to map the types between the source and destination.
17//!
18//! Additionally, since ConnectorX will partition a query into partitions and execute them in parallel, we also have
19//! 4. How to partition the query and run them in parallel.
20//!
21//! ConnectorX approaches these problems by defining abstractions on sources, destinations, and mapping rules.
22//! For the partition-based parallelism, ConnectorX will partition the query as well as the source and the destination
23//! together and put them into threads.
24//! Each thread will own exactly 1 query, 1 partitioned source, and 1 partitioned destination.
25//!
26//! The following graph depicts the internal mechanism when ConnectorX is downloading the data.
27//!
28//! ```text
29//! +------------------------------------------------------------+
30//! | Thread 1 |
31//! | |
32//! +---+ | +-----------------+ +-------------+ +-----------------+ | +---+
33//! | +-----------+>| Partitioned Src +-->| Type Mapper +->| Partitioned Dst +-+--------->| |
34//! | | | +-----------------+ +-------------+ +-----------------+ | | |
35//! | D | | | | D |
36//! | a | +------------------------------------------------------------+ | a |
37//! | t | . | t |
38//! | a | . | a |
39//! | b | . | f |
40//! | a | +------------------------------------------------------------+ | r |
41//! | s | | Thread n | | a |
42//! | e | | | | m |
43//! | | | +-----------------+ +-------------+ +-----------------+ | | e |
44//! | +-----------+>| Partitioned Src +-->| Type Mapper +->| Partitioned Dst +-+--------->| |
45//! +---+ | +-----------------+ +-------------+ +-----------------+ | +---+
46//! | |
47//! +------------------------------------------------------------+
48//!
49//! ```
50//! ## How does ConnectorX download the data?
51//!
52//! Upon receiving the query, e.g. SELECT * FROM lineitem, ConnectorX will first issue a LIMIT 1 query SELECT * FROM lineitem LIMIT 1 to get the schema of the result set.
53//!
54//! Then, if partition_on is specified, ConnectorX will issue `SELECT MIN($partition_on), MAX($partition_on) FROM (SELECT * FROM lineitem)` to know the range of the partition column.
55//! After that, the original query is split into partitions based on the min/max information, e.g. `SELECT * FROM (SELECT * FROM lineitem) WHERE $partition_on > 0 AND $partition_on < 10000`.
56//! ConnectorX will then run a count query to get the partition size (e.g. `SELECT COUNT(*) FROM (SELECT * FROM lineitem) WHERE $partition_on > 0 AND $partition_on < 10000`).
57//! If the partition is not specified, the count query will be `SELECT COUNT(*) FROM (SELECT * FROM lineitem)`.
58//!
59//! Finally, ConnectorX will use the schema info as well as the count info to allocate memory and download data by executing the queries normally.
60//! Once the downloading begins, there will be one thread for each partition so that the data are downloaded in parallel at the partition level.
61//! The thread will issue the query of the corresponding partition to the database and then write the returned data to the destination row-wise or column-wise (depends on the database) in a streaming fashion.
62//! This mechanism implies that having an index on the partition column is recommended to make full use of the parallel downloading power provided by ConnectorX.
63//!
64//! # Extending ConnectorX
65//! ## Adding a new source
66//!
67//! To add a new data source, you need to implement [`sources::Source`], [`sources::SourcePartition`], [`sources::PartitionParser`], and [`sources::Produce`] for the source.
68//! In detail, [`sources::Source`] describes how to connect to the database from a connection string, as well as how to do partitioning on the source to produce a list of [`sources::SourcePartition`].
69//! [`sources::SourcePartition`] describes how to get the row count for the specific partition so that the destination can preallocate the memory.
70//! Finally, [`sources::PartitionParser`] and [`sources::Produce`] abstracts away the detail about how does each partition parse different types.
71//!
72//! ## Adding a new destination
73//!
74//! To add a new data destination, you need to implement [`destinations::Destination`], [`destinations::DestinationPartition`], and [`destinations::Consume`]. Similar to the sources,
75//! [`destinations::Destination`] describes how to allocate the memory of the data destination, as well as how to do partitioning on the destination to produce a list of [`destinations::DestinationPartition`].
76//! [`destinations::DestinationPartition`] and [`destinations::Consume`] abstract away the detail about how does each partition writes different types.
77//!
78//! ## Adding a new transport (type mapping)
79//!
80//! After having a source and a destination that describes how to read and write the data,
81//! ConnectorX also needs to know how to convert the values with different types from the source to the destination.
82//! For example, Postgres can produce a `uuid` type but there's no uuid in Arrow. It is the transport's duty to convert
83//! the `uuid` into an Arrow compatible type, e.g. string. You can use the [`impl_transport!`] macro to define a transport.
84//!
85//! ## Putting things together
86//!
87//! Say, you decide to load data from SQL Server to Arrow. In ConnectorX we already provided the source for SQL Server as [`sources::sqlite::SQLiteSource`], and the
88//! Arrow destination [`destinations::arrow::ArrowDestination`], as well as the transport [`transports::SQLiteArrowTransport`].
89//! Given the source, destination and transport already implemented, you can use [`dispatcher::Dispatcher`] to load the data:
90//!
91//! ```no_run
92//! use connectorx::prelude::*;
93//!
94//! let mut destination = ArrowDestination::new();
95//! let source = SQLiteSource::new("/path/to/db", 10).expect("cannot create the source");
96//! let queries = &["SELECT * FROM db WHERE id < 100", "SELECT * FROM db WHERE id >= 100"];
97//! let dispatcher = Dispatcher::<SQLiteSource, ArrowDestination, SQLiteArrowTransport>::new(source, &mut destination, queries, None);
98//! dispatcher.run().expect("run failed");
99//!
100//! let data = destination.arrow();
101//! ```
102//!
103//! Or simply you can directly use the [`get_arrow::get_arrow`] in which we wrapped the above procedures:
104//!
105//! ```no_run
106//! use connectorx::prelude::*;
107//! use std::convert::TryFrom;
108//!
109//! let mut source_conn = SourceConn::try_from("postgresql://username:password@host:port/db?cxprotocol=binary").expect("parse conn str failed");
110//! let queries = &[CXQuery::from("SELECT * FROM table WHERE id < 100"), CXQuery::from("SELECT * FROM table WHERE id >= 100")];
111//! let destination = get_arrow(&source_conn, None, queries).expect("run failed");
112//!
113//! let data = destination.arrow();
114//! ```
115//!
116//! NOTE: the pool size parameter `nconn` used in initializing the source should be larger than or equal to the number of partitioned queries input later.
117//!
118//! ## Need more examples?
119//! You can use the existing implementation as the example.
120//! [MySQL source](https://github.com/sfu-db/connector-x/tree/main/connectorx/src/sources/mysql),
121//! [Arrow destination](https://github.com/sfu-db/connector-x/tree/main/connectorx/src/destinations/arrow),
122//! [MySQL to Arrow transport](https://github.com/sfu-db/connector-x/blob/main/connectorx/src/transports/mysql_arrow.rs).
123//!
124//! # Sources protocols & Destinations that is implemented in the Rust core.
125//!
126//! ## Sources
127//! - [x] Postgres
128//! - [x] Mysql
129//! - [x] Sqlite
130//! - [x] SQL Server
131//! - [x] Oracle
132//! - [x] BigQuery
133//!
134//! ## Destinations
135//! - [x] Arrow
136//! - [x] Polars
137//!
138//! # Feature gates
139//! By default, ConnectorX does not enable any sources / destinations to keep the dependencies minimal.
140//! Instead, we provide following features for you to opt-in: `src_sqlite`, `src_postgres`, `src_mysql`, `src_mssql`, `src_oracle`, `dst_arrow`, `dst_polars`.
141//! For example, if you'd like to load data from Postgres to Arrow, you can enable `src_postgres` and `dst_arrow` in `Cargo.toml`.
142//! This will enable [`sources::postgres`], [`destinations::arrow`] and [`transports::PostgresArrowTransport`].
143
144pub mod typesystem;
145#[macro_use]
146mod macros;
147#[cfg(feature = "dst_arrow")]
148pub mod arrow_batch_iter;
149pub mod constants;
150pub mod data_order;
151pub mod destinations;
152mod dispatcher;
153pub mod errors;
154#[cfg(feature = "fed_exec")]
155pub mod fed_dispatcher;
156#[cfg(feature = "federation")]
157pub mod fed_rewriter;
158#[cfg(feature = "dst_arrow")]
159pub mod get_arrow;
160pub mod partition;
161pub mod source_router;
162pub mod sources;
163#[doc(hidden)]
164pub mod sql;
165pub mod transports;
166#[doc(hidden)]
167pub mod utils;
168
169pub mod prelude {
170 #[cfg(feature = "dst_arrow")]
171 pub use crate::arrow_batch_iter::{set_global_num_thread, RecordBatchIterator};
172 pub use crate::data_order::{coordinate, DataOrder};
173 #[cfg(feature = "dst_arrow")]
174 pub use crate::destinations::arrow::{ArrowDestination, ArrowPartitionWriter, ArrowTypeSystem};
175 #[cfg(feature = "dst_arrow")]
176 pub use crate::destinations::arrowstream::{
177 ArrowDestination as ArrowStreamDestination,
178 ArrowPartitionWriter as ArrowStreamPartitionWriter,
179 ArrowTypeSystem as ArrowStreamTypeSystem,
180 };
181 pub use crate::destinations::{Consume, Destination, DestinationPartition};
182 pub use crate::dispatcher::Dispatcher;
183 pub use crate::errors::{ConnectorXError, ConnectorXOutError};
184 #[cfg(feature = "federation")]
185 pub use crate::fed_rewriter::{rewrite_sql, FederatedDataSourceInfo, Plan};
186 #[cfg(feature = "dst_arrow")]
187 pub use crate::get_arrow::{get_arrow, new_record_batch_iter};
188 pub use crate::source_router::*;
189 #[cfg(feature = "src_bigquery")]
190 pub use crate::sources::bigquery::BigQuerySource;
191 #[cfg(feature = "src_csv")]
192 pub use crate::sources::csv::CSVSource;
193 #[cfg(feature = "src_dummy")]
194 pub use crate::sources::dummy::DummySource;
195 #[cfg(feature = "src_mssql")]
196 pub use crate::sources::mssql::MsSQLSource;
197 #[cfg(feature = "src_mysql")]
198 pub use crate::sources::mysql::MySQLSource;
199 #[cfg(feature = "src_oracle")]
200 pub use crate::sources::oracle::OracleSource;
201 #[cfg(feature = "src_postgres")]
202 pub use crate::sources::postgres::PostgresSource;
203 #[cfg(feature = "src_sqlite")]
204 pub use crate::sources::sqlite::SQLiteSource;
205 #[cfg(feature = "src_trino")]
206 pub use crate::sources::trino::TrinoSource;
207 pub use crate::sources::{PartitionParser, Produce, Source, SourcePartition};
208 pub use crate::sql::CXQuery;
209 pub use crate::transports::*;
210 pub use crate::typesystem::{
211 ParameterizedFunc, ParameterizedOn, Realize, Transport, TypeAssoc, TypeConversion,
212 TypeSystem,
213 };
214}