connectorx/
source_router.rs1use crate::constants::CONNECTORX_PROTOCOL;
2use crate::errors::{ConnectorXError, Result};
3use anyhow::anyhow;
4use fehler::throws;
5use std::convert::TryFrom;
6use url::Url;
7
8#[derive(Debug, Clone)]
9pub enum SourceType {
10 Postgres,
11 SQLite,
12 MySQL,
13 MsSQL,
14 Oracle,
15 BigQuery,
16 DuckDB,
17 Trino,
18 Unknown,
19}
20
21#[derive(Debug, Clone)]
22pub struct SourceConn {
23 pub ty: SourceType,
24 pub conn: Url,
25 pub proto: String,
26}
27
28impl TryFrom<&str> for SourceConn {
29 type Error = ConnectorXError;
30
31 fn try_from(conn: &str) -> Result<SourceConn> {
32 let old_url = Url::parse(conn).map_err(|e| anyhow!("parse error: {}", e))?;
33
34 let proto = match old_url.query_pairs().find(|p| p.0 == CONNECTORX_PROTOCOL) {
36 Some((_, proto)) => proto.to_owned().to_string(),
37 None => "binary".to_string(),
38 };
39
40 let stripped_query: Vec<(_, _)> = old_url
42 .query_pairs()
43 .filter(|p| &*p.0 != CONNECTORX_PROTOCOL)
44 .collect();
45 let mut url = old_url.clone();
46 url.set_query(None);
47 for pair in stripped_query {
48 url.query_pairs_mut()
49 .append_pair(&pair.0.to_string()[..], &pair.1.to_string()[..]);
50 }
51
52 match url.scheme().split('+').collect::<Vec<&str>>()[0] {
55 "postgres" | "postgresql" => Ok(SourceConn::new(SourceType::Postgres, url, proto)),
56 "sqlite" => Ok(SourceConn::new(SourceType::SQLite, url, proto)),
57 "mysql" => Ok(SourceConn::new(SourceType::MySQL, url, proto)),
58 "mssql" => Ok(SourceConn::new(SourceType::MsSQL, url, proto)),
59 "oracle" => Ok(SourceConn::new(SourceType::Oracle, url, proto)),
60 "bigquery" => Ok(SourceConn::new(SourceType::BigQuery, url, proto)),
61 "duckdb" => Ok(SourceConn::new(SourceType::DuckDB, url, proto)),
62 "trino" => Ok(SourceConn::new(SourceType::Trino, url, proto)),
63 _ => Ok(SourceConn::new(SourceType::Unknown, url, proto)),
64 }
65 }
66}
67
68impl SourceConn {
69 pub fn new(ty: SourceType, conn: Url, proto: String) -> Self {
70 Self { ty, conn, proto }
71 }
72 pub fn set_protocol(&mut self, protocol: &str) {
73 self.proto = protocol.to_string();
74 }
75}
76
77#[throws(ConnectorXError)]
78pub fn parse_source(conn: &str, protocol: Option<&str>) -> SourceConn {
79 let mut source_conn = SourceConn::try_from(conn)?;
80 match protocol {
81 Some(p) => source_conn.set_protocol(p),
82 None => {}
83 }
84 source_conn
85}