Skip to main content

connectorx/
source_router.rs

1use crate::constants::CONNECTORX_PROTOCOL;
2use crate::errors::{ConnectorXError, Result};
3use anyhow::anyhow;
4use fehler::throws;
5#[cfg(feature = "src_postgres")]
6use redshift_iam::redshift_to_postgres;
7use std::convert::TryFrom;
8use url::Url;
9
10#[derive(Debug, Clone)]
11pub enum SourceType {
12    Postgres,
13    SQLite,
14    MySQL,
15    MsSQL,
16    Oracle,
17    BigQuery,
18    DuckDB,
19    Trino,
20    ClickHouse,
21    Unknown,
22}
23
24#[derive(Debug, Clone)]
25pub struct SourceConn {
26    pub ty: SourceType,
27    pub conn: Url,
28    pub proto: String,
29}
30
31impl TryFrom<&str> for SourceConn {
32    type Error = ConnectorXError;
33
34    fn try_from(conn: &str) -> Result<SourceConn> {
35        let old_url = Url::parse(conn).map_err(|e| anyhow!("parse error: {}", e))?;
36
37        // parse connectorx protocol
38        let proto = match old_url.query_pairs().find(|p| p.0 == CONNECTORX_PROTOCOL) {
39            Some((_, proto)) => proto.to_owned().to_string(),
40            None => "binary".to_string(),
41        };
42
43        // create url by removing connectorx protocol
44        let stripped_query: Vec<(_, _)> = old_url
45            .query_pairs()
46            .filter(|p| &*p.0 != CONNECTORX_PROTOCOL)
47            .collect();
48        let mut url = old_url.clone();
49        url.set_query(None);
50        for pair in stripped_query {
51            url.query_pairs_mut()
52                .append_pair(&pair.0.to_string()[..], &pair.1.to_string()[..]);
53        }
54
55        // users from sqlalchemy may set engine in connection url (e.g. mssql+pymssql://...)
56        // only for compatablility, we don't use the same engine
57        match url.scheme().split('+').collect::<Vec<&str>>()[0] {
58            "postgres" | "postgresql" => Ok(SourceConn::new(SourceType::Postgres, url, proto)),
59            #[cfg(feature = "src_postgres")]
60            "redshift-iam" => Ok(SourceConn::new(
61                SourceType::Postgres,
62                redshift_to_postgres(url),
63                "cursor".to_string(),
64            )),
65            "sqlite" => Ok(SourceConn::new(SourceType::SQLite, url, proto)),
66            "mysql" => Ok(SourceConn::new(SourceType::MySQL, url, proto)),
67            "mssql" => Ok(SourceConn::new(SourceType::MsSQL, url, proto)),
68            "oracle" => Ok(SourceConn::new(SourceType::Oracle, url, proto)),
69            "bigquery" => Ok(SourceConn::new(SourceType::BigQuery, url, proto)),
70            "duckdb" => Ok(SourceConn::new(SourceType::DuckDB, url, proto)),
71            "trino" => Ok(SourceConn::new(SourceType::Trino, url, proto)),
72            "clickhouse" => Ok(SourceConn::new(SourceType::ClickHouse, url, proto)),
73            _ => Ok(SourceConn::new(SourceType::Unknown, url, proto)),
74        }
75    }
76}
77
78impl SourceConn {
79    pub fn new(ty: SourceType, conn: Url, proto: String) -> Self {
80        Self { ty, conn, proto }
81    }
82    pub fn set_protocol(&mut self, protocol: &str) {
83        self.proto = protocol.to_string();
84    }
85}
86
87#[throws(ConnectorXError)]
88pub fn parse_source(conn: &str, protocol: Option<&str>) -> SourceConn {
89    let mut source_conn = SourceConn::try_from(conn)?;
90    match protocol {
91        Some(p) => source_conn.set_protocol(p),
92        None => {}
93    }
94    source_conn
95}