connectorx/transports/
postgres_arrow.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrow::{
4    typesystem::{
5        ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6    },
7    ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11    PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use pgvector::{Bit, HalfVector, SparseVector, Vector};
18use postgres::NoTls;
19use postgres_openssl::MakeTlsConnector;
20use rust_decimal::Decimal;
21use serde_json::Value;
22use std::marker::PhantomData;
23use thiserror::Error;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum PostgresArrowTransportError {
28    #[error(transparent)]
29    Source(#[from] PostgresSourceError),
30
31    #[error(transparent)]
32    Destination(#[from] ArrowDestinationError),
33
34    #[error(transparent)]
35    ConnectorX(#[from] crate::errors::ConnectorXError),
36}
37
38/// Convert Postgres data types to Arrow data types.
39pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
40
41macro_rules! impl_postgres_transport {
42    ($proto:ty, $tls:ty) => {
43        impl_transport!(
44            name = PostgresArrowTransport<$proto, $tls>,
45            error = PostgresArrowTransportError,
46            systems = PostgresTypeSystem => ArrowTypeSystem,
47            route = PostgresSource<$proto, $tls> => ArrowDestination,
48            mappings = {
49                { Float4[f32]                        => Float32[f32]                           | conversion auto   }
50                { Float8[f64]                        => Float64[f64]                           | conversion auto   }
51                { Numeric[Decimal]                   => Decimal[Decimal]                       | conversion auto   }
52                { Int2[i16]                          => Int16[i16]                             | conversion auto   }
53                { Int4[i32]                          => Int32[i32]                             | conversion auto   }
54                { Int8[i64]                          => Int64[i64]                             | conversion auto   }
55                { UInt4[u32]                         => UInt32[u32]                            | conversion auto   }
56                { Bool[bool]                         => Boolean[bool]                          | conversion auto   }
57                { Text[&'r str]                      => LargeUtf8[String]                      | conversion owned  }
58                { BpChar[&'r str]                    => LargeUtf8[String]                      | conversion none   }
59                { VarChar[&'r str]                   => LargeUtf8[String]                      | conversion none   }
60                { Name[&'r str]                      => LargeUtf8[String]                      | conversion none   }
61                { Enum[&'r str]                      => LargeUtf8[String]                      | conversion none   }
62                { Timestamp[NaiveDateTime]           => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
63                { Date[NaiveDate]                    => Date32[NaiveDate]                      | conversion auto   }
64                { Time[NaiveTime]                    => Time64Micro[NaiveTimeWrapperMicro]     | conversion option }
65                { TimestampTz[DateTime<Utc>]         => DateTimeTzMicro[DateTimeWrapperMicro]  | conversion option }
66                { UUID[Uuid]                         => LargeUtf8[String]                      | conversion option }
67                { Char[&'r str]                      => LargeUtf8[String]                      | conversion none   }
68                { ByteA[Vec<u8>]                     => LargeBinary[Vec<u8>]                   | conversion auto   }
69                { JSON[Value]                        => LargeUtf8[String]                      | conversion option }
70                { JSONB[Value]                       => LargeUtf8[String]                      | conversion none   }
71                { Inet[IpInet]                       => LargeUtf8[String]                      | conversion none   }
72                { BoolArray[Vec<Option<bool>>]       => BoolArray[Vec<Option<bool>>]           | conversion auto   }
73                { VarcharArray[Vec<Option<String>>]  => Utf8Array[Vec<Option<String>>]         | conversion auto   }
74                { TextArray[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]         | conversion none   }
75                { Int2Array[Vec<Option<i16>>]        => Int16Array[Vec<Option<i16>>]           | conversion auto   }
76                { Int4Array[Vec<Option<i32>>]        => Int32Array[Vec<Option<i32>>]           | conversion auto   }
77                { Int8Array[Vec<Option<i64>>]        => Int64Array[Vec<Option<i64>>]           | conversion auto   }
78                { Float4Array[Vec<Option<f32>>]      => Float32Array[Vec<Option<f32>>]         | conversion auto   }
79                { Float8Array[Vec<Option<f64>>]      => Float64Array[Vec<Option<f64>>]         | conversion auto   }
80                { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>]     | conversion auto   }
81                { Vector[Vector]                     => Float32Array[Vec<Option<f32>>]         | conversion option }
82                { HalfVec[HalfVector]                => Float32Array[Vec<Option<f32>>]         | conversion option }
83                { Bit[Bit]                           => LargeBinary[Vec<u8>]                   | conversion option }
84                { SparseVec[SparseVector]            => Float32Array[Vec<Option<f32>>]         | conversion option }
85            }
86        );
87    }
88}
89
90impl_postgres_transport!(BinaryProtocol, NoTls);
91impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
92impl_postgres_transport!(CSVProtocol, NoTls);
93impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
94impl_postgres_transport!(CursorProtocol, NoTls);
95impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
96impl_postgres_transport!(SimpleProtocol, NoTls);
97impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
98
99impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
100    fn convert(val: IpInet) -> String {
101        val.to_string()
102    }
103}
104
105impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
106    fn convert(val: Option<IpInet>) -> Option<String> {
107        val.map(|val| val.to_string())
108    }
109}
110
111impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
112    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
113        NaiveTimeWrapperMicro(val)
114    }
115}
116
117impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
118    for PostgresArrowTransport<P, C>
119{
120    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
121        NaiveDateTimeWrapperMicro(val)
122    }
123}
124
125impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
126    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
127        DateTimeWrapperMicro(val)
128    }
129}
130
131impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
132    fn convert(val: Uuid) -> String {
133        val.to_string()
134    }
135}
136
137impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
138    fn convert(val: Decimal) -> f64 {
139        val.to_f64()
140            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
141    }
142}
143
144impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
145    fn convert(val: Value) -> String {
146        val.to_string()
147    }
148}
149
150impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
151    fn convert(val: Vector) -> Vec<Option<f32>> {
152        val.to_vec().into_iter().map(Some).collect()
153    }
154}
155
156impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
157    fn convert(val: HalfVector) -> Vec<Option<f32>> {
158        val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
159    }
160}
161
162impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
163    fn convert(val: Bit) -> Vec<u8> {
164        val.as_bytes().into()
165    }
166}
167
168impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
169    fn convert(val: SparseVector) -> Vec<Option<f32>> {
170        val.to_vec().into_iter().map(Some).collect()
171    }
172}