connectorx/sources/postgres/
connection.rs

1use crate::sources::postgres::errors::PostgresSourceError;
2use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
3use postgres::{config::SslMode, Config};
4use postgres_openssl::MakeTlsConnector;
5use std::collections::HashMap;
6use std::convert::TryFrom;
7use std::path::PathBuf;
8use url::Url;
9
10#[derive(Clone, Debug)]
11pub struct TlsConfig {
12    /// Postgres config, pg_config.sslmode (`sslmode`).
13    pub pg_config: Config,
14    /// Location of the client cert and key (`sslcert`, `sslkey`).
15    pub client_cert: Option<(PathBuf, PathBuf)>,
16    /// Location of the root certificate (`sslrootcert`).
17    pub root_cert: Option<PathBuf>,
18}
19
20impl TryFrom<TlsConfig> for MakeTlsConnector {
21    type Error = PostgresSourceError;
22    // The logic of this function adapted primarily from:
23    // https://github.com/sfackler/rust-postgres/pull/774
24    // We only support server side authentication (`sslrootcert`) for now
25    fn try_from(tls_config: TlsConfig) -> Result<Self, Self::Error> {
26        let mut builder = SslConnector::builder(SslMethod::tls_client())?;
27        let ssl_mode = tls_config.pg_config.get_ssl_mode();
28        let (verify_ca, verify_hostname) = match ssl_mode {
29            SslMode::Disable | SslMode::Prefer => (false, false),
30            SslMode::Require => match tls_config.root_cert {
31                // If a root CA file exists, the behavior of sslmode=require will be the same as
32                // that of verify-ca, meaning the server certificate is validated against the CA.
33                //
34                // For more details, check out the note about backwards compatibility in
35                // https://postgresql.org/docs/current/libpq-ssl.html#LIBQ-SSL-CERTIFICATES.
36                Some(_) => (true, false),
37                None => (false, false),
38            },
39            // These two modes will not work until upstream rust-postgres supports parsing
40            // them as part of the TLS config.
41            //
42            // SslMode::VerifyCa => (true, false),
43            // SslMode::VerifyFull => (true, true),
44            _ => panic!("unexpected sslmode {:?}", ssl_mode),
45        };
46
47        if let Some((cert, key)) = tls_config.client_cert {
48            builder.set_certificate_file(cert, SslFiletype::PEM)?;
49            builder.set_private_key_file(key, SslFiletype::PEM)?;
50        }
51
52        if let Some(root_cert) = tls_config.root_cert {
53            builder.set_ca_file(root_cert)?;
54        }
55
56        if !verify_ca {
57            builder.set_verify(SslVerifyMode::NONE); // do not verify CA
58        }
59
60        let mut tls_connector = MakeTlsConnector::new(builder.build());
61
62        if !verify_hostname {
63            tls_connector.set_callback(|connect, _| {
64                connect.set_verify_hostname(false);
65                Ok(())
66            });
67        }
68
69        Ok(tls_connector)
70    }
71}
72
73// Strip URL params not accepted by upstream rust-postgres
74fn strip_bad_opts(url: &Url) -> Url {
75    let stripped_query: Vec<(_, _)> = url
76        .query_pairs()
77        .filter(|p| !matches!(&*p.0, "sslkey" | "sslcert" | "sslrootcert"))
78        .collect();
79
80    let mut url2 = url.clone();
81    url2.set_query(None);
82
83    for pair in stripped_query {
84        url2.query_pairs_mut()
85            .append_pair(&pair.0.to_string()[..], &pair.1.to_string()[..]);
86    }
87
88    url2
89}
90
91pub fn rewrite_tls_args(
92    conn: &Url,
93) -> Result<(Config, Option<MakeTlsConnector>), PostgresSourceError> {
94    // We parse the config, then strip unsupported SSL opts and rewrite the URI
95    // before calling conn.parse().
96    //
97    // For more details on this approach, see the conversation here:
98    // https://github.com/sfackler/rust-postgres/pull/774#discussion_r641784774
99
100    let params: HashMap<String, String> = conn.query_pairs().into_owned().collect();
101
102    let sslcert = params.get("sslcert").map(PathBuf::from);
103    let sslkey = params.get("sslkey").map(PathBuf::from);
104    let root_cert = params.get("sslrootcert").map(PathBuf::from);
105    let client_cert = match (sslcert, sslkey) {
106        (Some(a), Some(b)) => Some((a, b)),
107        _ => None,
108    };
109
110    let stripped_url = strip_bad_opts(conn);
111    let pg_config: Config = stripped_url.as_str().parse().unwrap();
112
113    let tls_config = TlsConfig {
114        pg_config: pg_config.clone(),
115        client_cert,
116        root_cert,
117    };
118
119    let tls_connector = match pg_config.get_ssl_mode() {
120        SslMode::Disable => None,
121        _ => Some(MakeTlsConnector::try_from(tls_config)?),
122    };
123
124    Ok((pg_config, tls_connector))
125}