connectorx/transports/
postgres_arrowstream.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrowstream::{
4    typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8    PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use cidr_02::IpInet;
13use pgvector::{Bit, HalfVector, SparseVector, Vector};
14use postgres::NoTls;
15use postgres_openssl::MakeTlsConnector;
16use rust_decimal::Decimal;
17use serde_json::Value;
18use std::marker::PhantomData;
19use thiserror::Error;
20use uuid::Uuid;
21
22#[derive(Error, Debug)]
23pub enum PostgresArrowTransportError {
24    #[error(transparent)]
25    Source(#[from] PostgresSourceError),
26
27    #[error(transparent)]
28    Destination(#[from] ArrowDestinationError),
29
30    #[error(transparent)]
31    ConnectorX(#[from] crate::errors::ConnectorXError),
32}
33
34/// Convert Postgres data types to Arrow data types.
35pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
36
37macro_rules! impl_postgres_transport {
38    ($proto:ty, $tls:ty) => {
39        impl_transport!(
40            name = PostgresArrowTransport<$proto, $tls>,
41            error = PostgresArrowTransportError,
42            systems = PostgresTypeSystem => ArrowTypeSystem,
43            route = PostgresSource<$proto, $tls> => ArrowDestination,
44            mappings = {
45                { Float4[f32]                        => Float64[f64]                       | conversion auto   }
46                { Float8[f64]                        => Float64[f64]                       | conversion auto   }
47                { Numeric[Decimal]                   => Decimal[Decimal]                   | conversion auto   }
48                { Int2[i16]                          => Int64[i64]                         | conversion auto   }
49                { Int4[i32]                          => Int64[i64]                         | conversion auto   }
50                { Int8[i64]                          => Int64[i64]                         | conversion auto   }
51                { UInt4[u32]                         => UInt64[u64]                        | conversion auto   }
52                { Bool[bool]                         => Boolean[bool]                      | conversion auto   }
53                { Text[&'r str]                      => LargeUtf8[String]                  | conversion owned  }
54                { BpChar[&'r str]                    => LargeUtf8[String]                  | conversion none   }
55                { VarChar[&'r str]                   => LargeUtf8[String]                  | conversion none   }
56                { Name[&'r str]                      => LargeUtf8[String]                  | conversion none   }
57                { Enum[&'r str]                      => LargeUtf8[String]                  | conversion none   }
58                { Timestamp[NaiveDateTime]           => Date64[NaiveDateTime]              | conversion auto   }
59                { Date[NaiveDate]                    => Date32[NaiveDate]                  | conversion auto   }
60                { Time[NaiveTime]                    => Time64[NaiveTime]                  | conversion auto   }
61                { TimestampTz[DateTime<Utc>]         => DateTimeTz[DateTime<Utc>]          | conversion auto   }
62                { UUID[Uuid]                         => LargeUtf8[String]                  | conversion option }
63                { Char[&'r str]                      => LargeUtf8[String]                  | conversion none   }
64                { ByteA[Vec<u8>]                     => LargeBinary[Vec<u8>]               | conversion auto   }
65                { JSON[Value]                        => LargeUtf8[String]                  | conversion option }
66                { JSONB[Value]                       => LargeUtf8[String]                  | conversion none   }
67                { Inet[IpInet]                       => LargeUtf8[String]                  | conversion none   }
68                { BoolArray[Vec<Option<bool>>]       => BoolArray[Vec<Option<bool>>]       | conversion auto   }
69                { VarcharArray[Vec<Option<String>>]  => Utf8Array[Vec<Option<String>>]     | conversion auto   }
70                { TextArray[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]     | conversion none   }
71                { Int2Array[Vec<Option<i16>>]        => Int16Array[Vec<Option<i16>>]       | conversion auto   }
72                { Int4Array[Vec<Option<i32>>]        => Int32Array[Vec<Option<i32>>]       | conversion auto   }
73                { Int8Array[Vec<Option<i64>>]        => Int64Array[Vec<Option<i64>>]       | conversion auto   }
74                { Float4Array[Vec<Option<f32>>]      => Float32Array[Vec<Option<f32>>]     | conversion auto   }
75                { Float8Array[Vec<Option<f64>>]      => Float64Array[Vec<Option<f64>>]     | conversion auto   }
76                { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto   }
77                { Vector[Vector]                     => Float32Array[Vec<Option<f32>>]     | conversion option }
78                { HalfVec[HalfVector]                => Float32Array[Vec<Option<f32>>]     | conversion option }
79                { Bit[Bit]                           => LargeBinary[Vec<u8>]               | conversion option }
80                { SparseVec[SparseVector]            => Float32Array[Vec<Option<f32>>]     | conversion option }
81            }
82        );
83    }
84}
85
86impl_postgres_transport!(BinaryProtocol, NoTls);
87impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
88impl_postgres_transport!(CSVProtocol, NoTls);
89impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
90impl_postgres_transport!(CursorProtocol, NoTls);
91impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
92impl_postgres_transport!(SimpleProtocol, NoTls);
93impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
94
95impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
96    fn convert(val: IpInet) -> String {
97        val.to_string()
98    }
99}
100
101impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
102    fn convert(val: Option<IpInet>) -> Option<String> {
103        val.map(|val| val.to_string())
104    }
105}
106
107impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
108    fn convert(val: Uuid) -> String {
109        val.to_string()
110    }
111}
112
113impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
114    fn convert(val: Value) -> String {
115        val.to_string()
116    }
117}
118
119impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
120    fn convert(val: Vector) -> Vec<Option<f32>> {
121        val.to_vec().into_iter().map(Some).collect()
122    }
123}
124
125impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
126    fn convert(val: HalfVector) -> Vec<Option<f32>> {
127        val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
128    }
129}
130
131impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
132    fn convert(val: Bit) -> Vec<u8> {
133        val.as_bytes().into()
134    }
135}
136
137impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
138    fn convert(val: SparseVector) -> Vec<Option<f32>> {
139        val.to_vec().into_iter().map(Some).collect()
140    }
141}