Skip to main content

connectorx/transports/
postgres_arrowstream.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrowstream::{
4    typesystem::{
5        ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6    },
7    ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11    PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use pgvector::{Bit, HalfVector, SparseVector, Vector};
17use postgres::NoTls;
18use postgres_openssl::MakeTlsConnector;
19use rust_decimal::Decimal;
20use serde_json::Value;
21use std::marker::PhantomData;
22use thiserror::Error;
23use uuid::Uuid;
24
25#[derive(Error, Debug)]
26pub enum PostgresArrowTransportError {
27    #[error(transparent)]
28    Source(#[from] PostgresSourceError),
29
30    #[error(transparent)]
31    Destination(#[from] ArrowDestinationError),
32
33    #[error(transparent)]
34    ConnectorX(#[from] crate::errors::ConnectorXError),
35}
36
37/// Convert Postgres data types to Arrow data types.
38pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
39
40macro_rules! impl_postgres_transport {
41    ($proto:ty, $tls:ty) => {
42        impl_transport!(
43            name = PostgresArrowTransport<$proto, $tls>,
44            error = PostgresArrowTransportError,
45            systems = PostgresTypeSystem => ArrowTypeSystem,
46            route = PostgresSource<$proto, $tls> => ArrowDestination,
47            mappings = {
48                { Float4[f32]                        => Float64[f64]                       | conversion auto   }
49                { Float8[f64]                        => Float64[f64]                       | conversion auto   }
50                { Numeric[Decimal]                   => Decimal[Decimal]                   | conversion auto   }
51                { Int2[i16]                          => Int64[i64]                         | conversion auto   }
52                { Int4[i32]                          => Int64[i64]                         | conversion auto   }
53                { Int8[i64]                          => Int64[i64]                         | conversion auto   }
54                { UInt4[u32]                         => UInt64[u64]                        | conversion auto   }
55                { Bool[bool]                         => Boolean[bool]                      | conversion auto   }
56                { Text[&'r str]                      => LargeUtf8[String]                  | conversion owned  }
57                { BpChar[&'r str]                    => LargeUtf8[String]                  | conversion none   }
58                { VarChar[&'r str]                   => LargeUtf8[String]                  | conversion none   }
59                { Name[&'r str]                      => LargeUtf8[String]                  | conversion none   }
60                { Enum[&'r str]                      => LargeUtf8[String]                  | conversion none   }
61                { Timestamp[NaiveDateTime]           => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
62                { Date[NaiveDate]                    => Date32[NaiveDate]                  | conversion auto   }
63                { Time[NaiveTime]                    => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
64                { TimestampTz[DateTime<Utc>]         => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
65                { UUID[Uuid]                         => LargeUtf8[String]                  | conversion option }
66                { Char[&'r str]                      => LargeUtf8[String]                  | conversion none   }
67                { ByteA[Vec<u8>]                     => LargeBinary[Vec<u8>]               | conversion auto   }
68                { JSON[Value]                        => LargeUtf8[String]                  | conversion option }
69                { JSONB[Value]                       => LargeUtf8[String]                  | conversion none   }
70                { Inet[IpInet]                       => LargeUtf8[String]                  | conversion none   }
71                { BoolArray[Vec<Option<bool>>]       => BoolArray[Vec<Option<bool>>]       | conversion auto   }
72                { VarcharArray[Vec<Option<String>>]  => Utf8Array[Vec<Option<String>>]     | conversion auto   }
73                { TextArray[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]     | conversion none   }
74                { Int2Array[Vec<Option<i16>>]        => Int16Array[Vec<Option<i16>>]       | conversion auto   }
75                { Int4Array[Vec<Option<i32>>]        => Int32Array[Vec<Option<i32>>]       | conversion auto   }
76                { Int8Array[Vec<Option<i64>>]        => Int64Array[Vec<Option<i64>>]       | conversion auto   }
77                { Float4Array[Vec<Option<f32>>]      => Float32Array[Vec<Option<f32>>]     | conversion auto   }
78                { Float8Array[Vec<Option<f64>>]      => Float64Array[Vec<Option<f64>>]     | conversion auto   }
79                { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto   }
80                { Vector[Vector]                     => Float32Array[Vec<Option<f32>>]     | conversion option }
81                { HalfVec[HalfVector]                => Float32Array[Vec<Option<f32>>]     | conversion option }
82                { Bit[Bit]                           => LargeBinary[Vec<u8>]               | conversion option }
83                { SparseVec[SparseVector]            => Float32Array[Vec<Option<f32>>]     | conversion option }
84            }
85        );
86    }
87}
88
89impl_postgres_transport!(BinaryProtocol, NoTls);
90impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
91impl_postgres_transport!(CSVProtocol, NoTls);
92impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
93impl_postgres_transport!(CursorProtocol, NoTls);
94impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
95impl_postgres_transport!(SimpleProtocol, NoTls);
96impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
97
98impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
99    fn convert(val: IpInet) -> String {
100        val.to_string()
101    }
102}
103
104impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
105    fn convert(val: Option<IpInet>) -> Option<String> {
106        val.map(|val| val.to_string())
107    }
108}
109
110impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
111    fn convert(val: Uuid) -> String {
112        val.to_string()
113    }
114}
115
116impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
117    fn convert(val: Value) -> String {
118        val.to_string()
119    }
120}
121
122impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
123    fn convert(val: Vector) -> Vec<Option<f32>> {
124        val.to_vec().into_iter().map(Some).collect()
125    }
126}
127
128impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
129    fn convert(val: HalfVector) -> Vec<Option<f32>> {
130        val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
131    }
132}
133
134impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
135    fn convert(val: Bit) -> Vec<u8> {
136        val.as_bytes().into()
137    }
138}
139
140impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
141    fn convert(val: SparseVector) -> Vec<Option<f32>> {
142        val.to_vec().into_iter().map(Some).collect()
143    }
144}
145
146impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
147    for PostgresArrowTransport<P, C>
148{
149    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
150        NaiveDateTimeWrapperMicro(val)
151    }
152}
153
154impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
155    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
156        NaiveTimeWrapperMicro(val)
157    }
158}
159
160impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
161    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
162        DateTimeWrapperMicro(val)
163    }
164}