connectorx/transports/
postgres_arrow.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrow::{
4    typesystem::{
5        ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6    },
7    ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11    PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use pgvector::{Bit, HalfVector, SparseVector, Vector};
18use postgres::NoTls;
19use postgres_openssl::MakeTlsConnector;
20use rust_decimal::Decimal;
21use serde_json::Value;
22use std::marker::PhantomData;
23use thiserror::Error;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum PostgresArrowTransportError {
28    #[error(transparent)]
29    Source(#[from] PostgresSourceError),
30
31    #[error(transparent)]
32    Destination(#[from] ArrowDestinationError),
33
34    #[error(transparent)]
35    ConnectorX(#[from] crate::errors::ConnectorXError),
36}
37
38/// Convert Postgres data types to Arrow data types.
39pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
40
41macro_rules! impl_postgres_transport {
42    ($proto:ty, $tls:ty) => {
43        impl_transport!(
44            name = PostgresArrowTransport<$proto, $tls>,
45            error = PostgresArrowTransportError,
46            systems = PostgresTypeSystem => ArrowTypeSystem,
47            route = PostgresSource<$proto, $tls> => ArrowDestination,
48            mappings = {
49                { Float4[f32]                        => Float32[f32]                           | conversion auto   }
50                { Float8[f64]                        => Float64[f64]                           | conversion auto   }
51                { Numeric[Decimal]                   => Decimal[Decimal]                       | conversion auto   }
52                { Int2[i16]                          => Int16[i16]                             | conversion auto   }
53                { Int4[i32]                          => Int32[i32]                             | conversion auto   }
54                { Int8[i64]                          => Int64[i64]                             | conversion auto   }
55                { Bool[bool]                         => Boolean[bool]                          | conversion auto   }
56                { Text[&'r str]                      => LargeUtf8[String]                      | conversion owned  }
57                { BpChar[&'r str]                    => LargeUtf8[String]                      | conversion none   }
58                { VarChar[&'r str]                   => LargeUtf8[String]                      | conversion none   }
59                { Name[&'r str]                      => LargeUtf8[String]                      | conversion none   }
60                { Enum[&'r str]                      => LargeUtf8[String]                      | conversion none   }
61                { Timestamp[NaiveDateTime]           => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
62                { Date[NaiveDate]                    => Date32[NaiveDate]                      | conversion auto   }
63                { Time[NaiveTime]                    => Time64Micro[NaiveTimeWrapperMicro]     | conversion option }
64                { TimestampTz[DateTime<Utc>]         => DateTimeTzMicro[DateTimeWrapperMicro]  | conversion option }
65                { UUID[Uuid]                         => LargeUtf8[String]                      | conversion option }
66                { Char[&'r str]                      => LargeUtf8[String]                      | conversion none   }
67                { ByteA[Vec<u8>]                     => LargeBinary[Vec<u8>]                   | conversion auto   }
68                { JSON[Value]                        => LargeUtf8[String]                      | conversion option }
69                { JSONB[Value]                       => LargeUtf8[String]                      | conversion none   }
70                { Inet[IpInet]                       => LargeUtf8[String]                      | conversion none   }
71                { BoolArray[Vec<Option<bool>>]       => BoolArray[Vec<Option<bool>>]           | conversion auto   }
72                { VarcharArray[Vec<Option<String>>]  => Utf8Array[Vec<Option<String>>]         | conversion auto   }
73                { TextArray[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]         | conversion none   }
74                { Int2Array[Vec<Option<i16>>]        => Int16Array[Vec<Option<i16>>]           | conversion auto   }
75                { Int4Array[Vec<Option<i32>>]        => Int32Array[Vec<Option<i32>>]           | conversion auto   }
76                { Int8Array[Vec<Option<i64>>]        => Int64Array[Vec<Option<i64>>]           | conversion auto   }
77                { Float4Array[Vec<Option<f32>>]      => Float32Array[Vec<Option<f32>>]         | conversion auto   }
78                { Float8Array[Vec<Option<f64>>]      => Float64Array[Vec<Option<f64>>]         | conversion auto   }
79                { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>]     | conversion auto   }
80                { Vector[Vector]                     => Float32Array[Vec<Option<f32>>]         | conversion option }
81                { HalfVec[HalfVector]                => Float32Array[Vec<Option<f32>>]         | conversion option }
82                { Bit[Bit]                           => LargeBinary[Vec<u8>]                   | conversion option }
83                { SparseVec[SparseVector]            => Float32Array[Vec<Option<f32>>]         | conversion option }
84            }
85        );
86    }
87}
88
89impl_postgres_transport!(BinaryProtocol, NoTls);
90impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
91impl_postgres_transport!(CSVProtocol, NoTls);
92impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
93impl_postgres_transport!(CursorProtocol, NoTls);
94impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
95impl_postgres_transport!(SimpleProtocol, NoTls);
96impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
97
98impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
99    fn convert(val: IpInet) -> String {
100        val.to_string()
101    }
102}
103
104impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
105    fn convert(val: Option<IpInet>) -> Option<String> {
106        val.map(|val| val.to_string())
107    }
108}
109
110impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
111    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
112        NaiveTimeWrapperMicro(val)
113    }
114}
115
116impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
117    for PostgresArrowTransport<P, C>
118{
119    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
120        NaiveDateTimeWrapperMicro(val)
121    }
122}
123
124impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
125    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
126        DateTimeWrapperMicro(val)
127    }
128}
129
130impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
131    fn convert(val: Uuid) -> String {
132        val.to_string()
133    }
134}
135
136impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
137    fn convert(val: Decimal) -> f64 {
138        val.to_f64()
139            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
140    }
141}
142
143impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
144    fn convert(val: Value) -> String {
145        val.to_string()
146    }
147}
148
149impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
150    fn convert(val: Vector) -> Vec<Option<f32>> {
151        val.to_vec().into_iter().map(Some).collect()
152    }
153}
154
155impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
156    fn convert(val: HalfVector) -> Vec<Option<f32>> {
157        val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
158    }
159}
160
161impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
162    fn convert(val: Bit) -> Vec<u8> {
163        val.as_bytes().into()
164    }
165}
166
167impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
168    fn convert(val: SparseVector) -> Vec<Option<f32>> {
169        val.to_vec().into_iter().map(Some).collect()
170    }
171}