connectorx/transports/
postgres_arrow.rs1use crate::destinations::arrow::{
4 typesystem::{
5 ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6 },
7 ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11 PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use num_traits::ToPrimitive;
17use pgvector::{Bit, HalfVector, SparseVector, Vector};
18use postgres::NoTls;
19use postgres_openssl::MakeTlsConnector;
20use rust_decimal::Decimal;
21use serde_json::Value;
22use std::marker::PhantomData;
23use thiserror::Error;
24use uuid::Uuid;
25
26#[derive(Error, Debug)]
27pub enum PostgresArrowTransportError {
28 #[error(transparent)]
29 Source(#[from] PostgresSourceError),
30
31 #[error(transparent)]
32 Destination(#[from] ArrowDestinationError),
33
34 #[error(transparent)]
35 ConnectorX(#[from] crate::errors::ConnectorXError),
36}
37
38pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
40
41macro_rules! impl_postgres_transport {
42 ($proto:ty, $tls:ty) => {
43 impl_transport!(
44 name = PostgresArrowTransport<$proto, $tls>,
45 error = PostgresArrowTransportError,
46 systems = PostgresTypeSystem => ArrowTypeSystem,
47 route = PostgresSource<$proto, $tls> => ArrowDestination,
48 mappings = {
49 { Float4[f32] => Float32[f32] | conversion auto }
50 { Float8[f64] => Float64[f64] | conversion auto }
51 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
52 { Int2[i16] => Int16[i16] | conversion auto }
53 { Int4[i32] => Int32[i32] | conversion auto }
54 { Int8[i64] => Int64[i64] | conversion auto }
55 { UInt4[u32] => UInt32[u32] | conversion auto }
56 { Bool[bool] => Boolean[bool] | conversion auto }
57 { Text[&'r str] => LargeUtf8[String] | conversion owned }
58 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
59 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
60 { Name[&'r str] => LargeUtf8[String] | conversion none }
61 { Enum[&'r str] => LargeUtf8[String] | conversion none }
62 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
63 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
64 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
65 { TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
66 { UUID[Uuid] => LargeUtf8[String] | conversion option }
67 { Char[&'r str] => LargeUtf8[String] | conversion none }
68 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
69 { JSON[Value] => LargeUtf8[String] | conversion option }
70 { JSONB[Value] => LargeUtf8[String] | conversion none }
71 { Inet[IpInet] => LargeUtf8[String] | conversion none }
72 { BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
73 { VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
74 { TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
75 { Int2Array[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
76 { Int4Array[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
77 { Int8Array[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
78 { Float4Array[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
79 { Float8Array[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
80 { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
81 { Vector[Vector] => Float32Array[Vec<Option<f32>>] | conversion option }
82 { HalfVec[HalfVector] => Float32Array[Vec<Option<f32>>] | conversion option }
83 { Bit[Bit] => LargeBinary[Vec<u8>] | conversion option }
84 { SparseVec[SparseVector] => Float32Array[Vec<Option<f32>>] | conversion option }
85 }
86 );
87 }
88}
89
90impl_postgres_transport!(BinaryProtocol, NoTls);
91impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
92impl_postgres_transport!(CSVProtocol, NoTls);
93impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
94impl_postgres_transport!(CursorProtocol, NoTls);
95impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
96impl_postgres_transport!(SimpleProtocol, NoTls);
97impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
98
99impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
100 fn convert(val: IpInet) -> String {
101 val.to_string()
102 }
103}
104
105impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
106 fn convert(val: Option<IpInet>) -> Option<String> {
107 val.map(|val| val.to_string())
108 }
109}
110
111impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
112 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
113 NaiveTimeWrapperMicro(val)
114 }
115}
116
117impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
118 for PostgresArrowTransport<P, C>
119{
120 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
121 NaiveDateTimeWrapperMicro(val)
122 }
123}
124
125impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
126 fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
127 DateTimeWrapperMicro(val)
128 }
129}
130
131impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
132 fn convert(val: Uuid) -> String {
133 val.to_string()
134 }
135}
136
137impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
138 fn convert(val: Decimal) -> f64 {
139 val.to_f64()
140 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
141 }
142}
143
144impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
145 fn convert(val: Value) -> String {
146 val.to_string()
147 }
148}
149
150impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
151 fn convert(val: Vector) -> Vec<Option<f32>> {
152 val.to_vec().into_iter().map(Some).collect()
153 }
154}
155
156impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
157 fn convert(val: HalfVector) -> Vec<Option<f32>> {
158 val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
159 }
160}
161
162impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
163 fn convert(val: Bit) -> Vec<u8> {
164 val.as_bytes().into()
165 }
166}
167
168impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
169 fn convert(val: SparseVector) -> Vec<Option<f32>> {
170 val.to_vec().into_iter().map(Some).collect()
171 }
172}