connectorx/transports/
postgres_arrow.rs1use crate::destinations::arrow::{
4 typesystem::{
5 ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6 },
7 ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11 PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use pgvector::{Bit, HalfVector, SparseVector, Vector};
18use postgres::NoTls;
19use postgres_openssl::MakeTlsConnector;
20use rust_decimal::Decimal;
21use serde_json::Value;
22use std::marker::PhantomData;
23use thiserror::Error;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum PostgresArrowTransportError {
28 #[error(transparent)]
29 Source(#[from] PostgresSourceError),
30
31 #[error(transparent)]
32 Destination(#[from] ArrowDestinationError),
33
34 #[error(transparent)]
35 ConnectorX(#[from] crate::errors::ConnectorXError),
36}
37
38pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
40
41macro_rules! impl_postgres_transport {
42 ($proto:ty, $tls:ty) => {
43 impl_transport!(
44 name = PostgresArrowTransport<$proto, $tls>,
45 error = PostgresArrowTransportError,
46 systems = PostgresTypeSystem => ArrowTypeSystem,
47 route = PostgresSource<$proto, $tls> => ArrowDestination,
48 mappings = {
49 { Float4[f32] => Float32[f32] | conversion auto }
50 { Float8[f64] => Float64[f64] | conversion auto }
51 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
52 { Int2[i16] => Int16[i16] | conversion auto }
53 { Int4[i32] => Int32[i32] | conversion auto }
54 { Int8[i64] => Int64[i64] | conversion auto }
55 { Bool[bool] => Boolean[bool] | conversion auto }
56 { Text[&'r str] => LargeUtf8[String] | conversion owned }
57 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
58 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
59 { Name[&'r str] => LargeUtf8[String] | conversion none }
60 { Enum[&'r str] => LargeUtf8[String] | conversion none }
61 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
62 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
63 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
64 { TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
65 { UUID[Uuid] => LargeUtf8[String] | conversion option }
66 { Char[&'r str] => LargeUtf8[String] | conversion none }
67 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
68 { JSON[Value] => LargeUtf8[String] | conversion option }
69 { JSONB[Value] => LargeUtf8[String] | conversion none }
70 { Inet[IpInet] => LargeUtf8[String] | conversion none }
71 { BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
72 { VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
73 { TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
74 { Int2Array[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
75 { Int4Array[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
76 { Int8Array[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
77 { Float4Array[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
78 { Float8Array[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
79 { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
80 { Vector[Vector] => Float32Array[Vec<Option<f32>>] | conversion option }
81 { HalfVec[HalfVector] => Float32Array[Vec<Option<f32>>] | conversion option }
82 { Bit[Bit] => LargeBinary[Vec<u8>] | conversion option }
83 { SparseVec[SparseVector] => Float32Array[Vec<Option<f32>>] | conversion option }
84 }
85 );
86 }
87}
88
89impl_postgres_transport!(BinaryProtocol, NoTls);
90impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
91impl_postgres_transport!(CSVProtocol, NoTls);
92impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
93impl_postgres_transport!(CursorProtocol, NoTls);
94impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
95impl_postgres_transport!(SimpleProtocol, NoTls);
96impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
97
98impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
99 fn convert(val: IpInet) -> String {
100 val.to_string()
101 }
102}
103
104impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
105 fn convert(val: Option<IpInet>) -> Option<String> {
106 val.map(|val| val.to_string())
107 }
108}
109
110impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
111 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
112 NaiveTimeWrapperMicro(val)
113 }
114}
115
116impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
117 for PostgresArrowTransport<P, C>
118{
119 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
120 NaiveDateTimeWrapperMicro(val)
121 }
122}
123
124impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
125 fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
126 DateTimeWrapperMicro(val)
127 }
128}
129
130impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
131 fn convert(val: Uuid) -> String {
132 val.to_string()
133 }
134}
135
136impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
137 fn convert(val: Decimal) -> f64 {
138 val.to_f64()
139 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
140 }
141}
142
143impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
144 fn convert(val: Value) -> String {
145 val.to_string()
146 }
147}
148
149impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
150 fn convert(val: Vector) -> Vec<Option<f32>> {
151 val.to_vec().into_iter().map(Some).collect()
152 }
153}
154
155impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
156 fn convert(val: HalfVector) -> Vec<Option<f32>> {
157 val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
158 }
159}
160
161impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
162 fn convert(val: Bit) -> Vec<u8> {
163 val.as_bytes().into()
164 }
165}
166
167impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
168 fn convert(val: SparseVector) -> Vec<Option<f32>> {
169 val.to_vec().into_iter().map(Some).collect()
170 }
171}