connectorx/
source_router.rs

1use crate::constants::CONNECTORX_PROTOCOL;
2use crate::errors::{ConnectorXError, Result};
3use anyhow::anyhow;
4use fehler::throws;
5use std::convert::TryFrom;
6use url::Url;
7
8#[derive(Debug, Clone)]
9pub enum SourceType {
10    Postgres,
11    SQLite,
12    MySQL,
13    MsSQL,
14    Oracle,
15    BigQuery,
16    DuckDB,
17    Trino,
18    ClickHouse,
19    Unknown,
20}
21
22#[derive(Debug, Clone)]
23pub struct SourceConn {
24    pub ty: SourceType,
25    pub conn: Url,
26    pub proto: String,
27}
28
29impl TryFrom<&str> for SourceConn {
30    type Error = ConnectorXError;
31
32    fn try_from(conn: &str) -> Result<SourceConn> {
33        let old_url = Url::parse(conn).map_err(|e| anyhow!("parse error: {}", e))?;
34
35        // parse connectorx protocol
36        let proto = match old_url.query_pairs().find(|p| p.0 == CONNECTORX_PROTOCOL) {
37            Some((_, proto)) => proto.to_owned().to_string(),
38            None => "binary".to_string(),
39        };
40
41        // create url by removing connectorx protocol
42        let stripped_query: Vec<(_, _)> = old_url
43            .query_pairs()
44            .filter(|p| &*p.0 != CONNECTORX_PROTOCOL)
45            .collect();
46        let mut url = old_url.clone();
47        url.set_query(None);
48        for pair in stripped_query {
49            url.query_pairs_mut()
50                .append_pair(&pair.0.to_string()[..], &pair.1.to_string()[..]);
51        }
52
53        // users from sqlalchemy may set engine in connection url (e.g. mssql+pymssql://...)
54        // only for compatablility, we don't use the same engine
55        match url.scheme().split('+').collect::<Vec<&str>>()[0] {
56            "postgres" | "postgresql" => Ok(SourceConn::new(SourceType::Postgres, url, proto)),
57            "sqlite" => Ok(SourceConn::new(SourceType::SQLite, url, proto)),
58            "mysql" => Ok(SourceConn::new(SourceType::MySQL, url, proto)),
59            "mssql" => Ok(SourceConn::new(SourceType::MsSQL, url, proto)),
60            "oracle" => Ok(SourceConn::new(SourceType::Oracle, url, proto)),
61            "bigquery" => Ok(SourceConn::new(SourceType::BigQuery, url, proto)),
62            "duckdb" => Ok(SourceConn::new(SourceType::DuckDB, url, proto)),
63            "trino" => Ok(SourceConn::new(SourceType::Trino, url, proto)),
64            "clickhouse" => Ok(SourceConn::new(SourceType::ClickHouse, url, proto)),
65            _ => Ok(SourceConn::new(SourceType::Unknown, url, proto)),
66        }
67    }
68}
69
70impl SourceConn {
71    pub fn new(ty: SourceType, conn: Url, proto: String) -> Self {
72        Self { ty, conn, proto }
73    }
74    pub fn set_protocol(&mut self, protocol: &str) {
75        self.proto = protocol.to_string();
76    }
77}
78
79#[throws(ConnectorXError)]
80pub fn parse_source(conn: &str, protocol: Option<&str>) -> SourceConn {
81    let mut source_conn = SourceConn::try_from(conn)?;
82    match protocol {
83        Some(p) => source_conn.set_protocol(p),
84        None => {}
85    }
86    source_conn
87}