connectorx/sources/postgres/
connection.rs1use crate::sources::postgres::errors::PostgresSourceError;
2use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
3use postgres::{config::SslMode, Config};
4use postgres_openssl::MakeTlsConnector;
5use std::collections::HashMap;
6use std::convert::TryFrom;
7use std::path::PathBuf;
8use url::Url;
9
10#[derive(Clone, Debug)]
11pub struct TlsConfig {
12 pub pg_config: Config,
14 pub client_cert: Option<(PathBuf, PathBuf)>,
16 pub root_cert: Option<PathBuf>,
18}
19
20impl TryFrom<TlsConfig> for MakeTlsConnector {
21 type Error = PostgresSourceError;
22 fn try_from(tls_config: TlsConfig) -> Result<Self, Self::Error> {
26 let mut builder = SslConnector::builder(SslMethod::tls_client())?;
27 let ssl_mode = tls_config.pg_config.get_ssl_mode();
28 let (verify_ca, verify_hostname) = match ssl_mode {
29 SslMode::Disable | SslMode::Prefer => (false, false),
30 SslMode::Require => match tls_config.root_cert {
31 Some(_) => (true, false),
37 None => (false, false),
38 },
39 _ => panic!("unexpected sslmode {:?}", ssl_mode),
45 };
46
47 if let Some((cert, key)) = tls_config.client_cert {
48 builder.set_certificate_file(cert, SslFiletype::PEM)?;
49 builder.set_private_key_file(key, SslFiletype::PEM)?;
50 }
51
52 if let Some(root_cert) = tls_config.root_cert {
53 builder.set_ca_file(root_cert)?;
54 }
55
56 if !verify_ca {
57 builder.set_verify(SslVerifyMode::NONE); }
59
60 let mut tls_connector = MakeTlsConnector::new(builder.build());
61
62 if !verify_hostname {
63 tls_connector.set_callback(|connect, _| {
64 connect.set_verify_hostname(false);
65 Ok(())
66 });
67 }
68
69 Ok(tls_connector)
70 }
71}
72
73fn strip_bad_opts(url: &Url) -> Url {
75 let stripped_query: Vec<(_, _)> = url
76 .query_pairs()
77 .filter(|p| !matches!(&*p.0, "sslkey" | "sslcert" | "sslrootcert"))
78 .collect();
79
80 let mut url2 = url.clone();
81 url2.set_query(None);
82
83 for pair in stripped_query {
84 url2.query_pairs_mut()
85 .append_pair(&pair.0.to_string()[..], &pair.1.to_string()[..]);
86 }
87
88 url2
89}
90
91pub fn rewrite_tls_args(
92 conn: &Url,
93) -> Result<(Config, Option<MakeTlsConnector>), PostgresSourceError> {
94 let params: HashMap<String, String> = conn.query_pairs().into_owned().collect();
101
102 let sslcert = params.get("sslcert").map(PathBuf::from);
103 let sslkey = params.get("sslkey").map(PathBuf::from);
104 let root_cert = params.get("sslrootcert").map(PathBuf::from);
105 let client_cert = match (sslcert, sslkey) {
106 (Some(a), Some(b)) => Some((a, b)),
107 _ => None,
108 };
109
110 let stripped_url = strip_bad_opts(conn);
111 let pg_config: Config = stripped_url.as_str().parse().unwrap();
112
113 let tls_config = TlsConfig {
114 pg_config: pg_config.clone(),
115 client_cert,
116 root_cert,
117 };
118
119 let tls_connector = match pg_config.get_ssl_mode() {
120 SslMode::Disable => None,
121 _ => Some(MakeTlsConnector::try_from(tls_config)?),
122 };
123
124 Ok((pg_config, tls_connector))
125}