connectorx/transports/
postgres_arrow.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrow::{
4    typesystem::{
5        ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6    },
7    ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11    PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use postgres::NoTls;
18use postgres_openssl::MakeTlsConnector;
19use rust_decimal::Decimal;
20use serde_json::Value;
21use std::marker::PhantomData;
22use thiserror::Error;
23use uuid::Uuid;
24
25#[derive(Error, Debug)]
26pub enum PostgresArrowTransportError {
27    #[error(transparent)]
28    Source(#[from] PostgresSourceError),
29
30    #[error(transparent)]
31    Destination(#[from] ArrowDestinationError),
32
33    #[error(transparent)]
34    ConnectorX(#[from] crate::errors::ConnectorXError),
35}
36
37/// Convert Postgres data types to Arrow data types.
38pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
39
40macro_rules! impl_postgres_transport {
41    ($proto:ty, $tls:ty) => {
42        impl_transport!(
43            name = PostgresArrowTransport<$proto, $tls>,
44            error = PostgresArrowTransportError,
45            systems = PostgresTypeSystem => ArrowTypeSystem,
46            route = PostgresSource<$proto, $tls> => ArrowDestination,
47            mappings = {
48                { Float4[f32]                        => Float32[f32]                           | conversion auto   }
49                { Float8[f64]                        => Float64[f64]                           | conversion auto   }
50                { Numeric[Decimal]                   => Decimal[Decimal]                       | conversion auto   }
51                { Int2[i16]                          => Int16[i16]                             | conversion auto   }
52                { Int4[i32]                          => Int32[i32]                             | conversion auto   }
53                { Int8[i64]                          => Int64[i64]                             | conversion auto   }
54                { Bool[bool]                         => Boolean[bool]                          | conversion auto   }
55                { Text[&'r str]                      => LargeUtf8[String]                      | conversion owned  }
56                { BpChar[&'r str]                    => LargeUtf8[String]                      | conversion none   }
57                { VarChar[&'r str]                   => LargeUtf8[String]                      | conversion none   }
58                { Name[&'r str]                      => LargeUtf8[String]                      | conversion none   }
59                { Enum[&'r str]                      => LargeUtf8[String]                      | conversion none   }
60                { Timestamp[NaiveDateTime]           => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
61                { Date[NaiveDate]                    => Date32[NaiveDate]                      | conversion auto   }
62                { Time[NaiveTime]                    => Time64Micro[NaiveTimeWrapperMicro]     | conversion option }
63                { TimestampTz[DateTime<Utc>]         => DateTimeTzMicro[DateTimeWrapperMicro]  | conversion option }
64                { UUID[Uuid]                         => LargeUtf8[String]                      | conversion option }
65                { Char[&'r str]                      => LargeUtf8[String]                      | conversion none   }
66                { ByteA[Vec<u8>]                     => LargeBinary[Vec<u8>]                   | conversion auto   }
67                { JSON[Value]                        => LargeUtf8[String]                      | conversion option }
68                { JSONB[Value]                       => LargeUtf8[String]                      | conversion none   }
69                { Inet[IpInet]                       => LargeUtf8[String]                      | conversion none   }
70                { BoolArray[Vec<Option<bool>>]       => BoolArray[Vec<Option<bool>>]           | conversion auto   }
71                { VarcharArray[Vec<Option<String>>]  => Utf8Array[Vec<Option<String>>]         | conversion auto   }
72                { TextArray[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]         | conversion none   }
73                { Int2Array[Vec<Option<i16>>]        => Int16Array[Vec<Option<i16>>]           | conversion auto   }
74                { Int4Array[Vec<Option<i32>>]        => Int32Array[Vec<Option<i32>>]           | conversion auto   }
75                { Int8Array[Vec<Option<i64>>]        => Int64Array[Vec<Option<i64>>]           | conversion auto   }
76                { Float4Array[Vec<Option<f32>>]      => Float32Array[Vec<Option<f32>>]         | conversion auto   }
77                { Float8Array[Vec<Option<f64>>]      => Float64Array[Vec<Option<f64>>]         | conversion auto   }
78                { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>]     | conversion auto   }
79            }
80        );
81    }
82}
83
84impl_postgres_transport!(BinaryProtocol, NoTls);
85impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
86impl_postgres_transport!(CSVProtocol, NoTls);
87impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
88impl_postgres_transport!(CursorProtocol, NoTls);
89impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
90impl_postgres_transport!(SimpleProtocol, NoTls);
91impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
92
93impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
94    fn convert(val: IpInet) -> String {
95        val.to_string()
96    }
97}
98
99impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
100    fn convert(val: Option<IpInet>) -> Option<String> {
101        val.map(|val| val.to_string())
102    }
103}
104
105impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
106    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
107        NaiveTimeWrapperMicro(val)
108    }
109}
110
111impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
112    for PostgresArrowTransport<P, C>
113{
114    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
115        NaiveDateTimeWrapperMicro(val)
116    }
117}
118
119impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
120    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
121        DateTimeWrapperMicro(val)
122    }
123}
124
125impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
126    fn convert(val: Uuid) -> String {
127        val.to_string()
128    }
129}
130
131impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
132    fn convert(val: Decimal) -> f64 {
133        val.to_f64()
134            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
135    }
136}
137
138impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
139    fn convert(val: Value) -> String {
140        val.to_string()
141    }
142}