connectorx/transports/
postgres_arrow.rs1use crate::destinations::arrow::{
4 typesystem::{
5 ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6 },
7 ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11 PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use postgres::NoTls;
18use postgres_openssl::MakeTlsConnector;
19use rust_decimal::Decimal;
20use serde_json::Value;
21use std::marker::PhantomData;
22use thiserror::Error;
23use uuid::Uuid;
24
25#[derive(Error, Debug)]
26pub enum PostgresArrowTransportError {
27 #[error(transparent)]
28 Source(#[from] PostgresSourceError),
29
30 #[error(transparent)]
31 Destination(#[from] ArrowDestinationError),
32
33 #[error(transparent)]
34 ConnectorX(#[from] crate::errors::ConnectorXError),
35}
36
37pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
39
40macro_rules! impl_postgres_transport {
41 ($proto:ty, $tls:ty) => {
42 impl_transport!(
43 name = PostgresArrowTransport<$proto, $tls>,
44 error = PostgresArrowTransportError,
45 systems = PostgresTypeSystem => ArrowTypeSystem,
46 route = PostgresSource<$proto, $tls> => ArrowDestination,
47 mappings = {
48 { Float4[f32] => Float32[f32] | conversion auto }
49 { Float8[f64] => Float64[f64] | conversion auto }
50 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
51 { Int2[i16] => Int16[i16] | conversion auto }
52 { Int4[i32] => Int32[i32] | conversion auto }
53 { Int8[i64] => Int64[i64] | conversion auto }
54 { Bool[bool] => Boolean[bool] | conversion auto }
55 { Text[&'r str] => LargeUtf8[String] | conversion owned }
56 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
57 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
58 { Name[&'r str] => LargeUtf8[String] | conversion none }
59 { Enum[&'r str] => LargeUtf8[String] | conversion none }
60 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
61 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
62 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
63 { TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
64 { UUID[Uuid] => LargeUtf8[String] | conversion option }
65 { Char[&'r str] => LargeUtf8[String] | conversion none }
66 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
67 { JSON[Value] => LargeUtf8[String] | conversion option }
68 { JSONB[Value] => LargeUtf8[String] | conversion none }
69 { Inet[IpInet] => LargeUtf8[String] | conversion none }
70 { BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
71 { VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
72 { TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
73 { Int2Array[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
74 { Int4Array[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
75 { Int8Array[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
76 { Float4Array[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
77 { Float8Array[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
78 { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
79 }
80 );
81 }
82}
83
84impl_postgres_transport!(BinaryProtocol, NoTls);
85impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
86impl_postgres_transport!(CSVProtocol, NoTls);
87impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
88impl_postgres_transport!(CursorProtocol, NoTls);
89impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
90impl_postgres_transport!(SimpleProtocol, NoTls);
91impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
92
93impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
94 fn convert(val: IpInet) -> String {
95 val.to_string()
96 }
97}
98
99impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
100 fn convert(val: Option<IpInet>) -> Option<String> {
101 val.map(|val| val.to_string())
102 }
103}
104
105impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
106 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
107 NaiveTimeWrapperMicro(val)
108 }
109}
110
111impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
112 for PostgresArrowTransport<P, C>
113{
114 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
115 NaiveDateTimeWrapperMicro(val)
116 }
117}
118
119impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
120 fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
121 DateTimeWrapperMicro(val)
122 }
123}
124
125impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
126 fn convert(val: Uuid) -> String {
127 val.to_string()
128 }
129}
130
131impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
132 fn convert(val: Decimal) -> f64 {
133 val.to_f64()
134 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
135 }
136}
137
138impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
139 fn convert(val: Value) -> String {
140 val.to_string()
141 }
142}