connectorx/transports/
postgres_arrowstream.rs1use crate::destinations::arrowstream::{
4 typesystem::{
5 ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
6 },
7 ArrowDestination, ArrowDestinationError,
8};
9use crate::sources::postgres::{
10 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
11 PostgresTypeSystem, SimpleProtocol,
12};
13use crate::typesystem::TypeConversion;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use cidr_02::IpInet;
16use pgvector::{Bit, HalfVector, SparseVector, Vector};
17use postgres::NoTls;
18use postgres_openssl::MakeTlsConnector;
19use rust_decimal::Decimal;
20use serde_json::Value;
21use std::marker::PhantomData;
22use thiserror::Error;
23use uuid::Uuid;
24
25#[derive(Error, Debug)]
26pub enum PostgresArrowTransportError {
27 #[error(transparent)]
28 Source(#[from] PostgresSourceError),
29
30 #[error(transparent)]
31 Destination(#[from] ArrowDestinationError),
32
33 #[error(transparent)]
34 ConnectorX(#[from] crate::errors::ConnectorXError),
35}
36
37pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
39
40macro_rules! impl_postgres_transport {
41 ($proto:ty, $tls:ty) => {
42 impl_transport!(
43 name = PostgresArrowTransport<$proto, $tls>,
44 error = PostgresArrowTransportError,
45 systems = PostgresTypeSystem => ArrowTypeSystem,
46 route = PostgresSource<$proto, $tls> => ArrowDestination,
47 mappings = {
48 { Float4[f32] => Float64[f64] | conversion auto }
49 { Float8[f64] => Float64[f64] | conversion auto }
50 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
51 { Int2[i16] => Int64[i64] | conversion auto }
52 { Int4[i32] => Int64[i64] | conversion auto }
53 { Int8[i64] => Int64[i64] | conversion auto }
54 { UInt4[u32] => UInt64[u64] | conversion auto }
55 { Bool[bool] => Boolean[bool] | conversion auto }
56 { Text[&'r str] => LargeUtf8[String] | conversion owned }
57 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
58 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
59 { Name[&'r str] => LargeUtf8[String] | conversion none }
60 { Enum[&'r str] => LargeUtf8[String] | conversion none }
61 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
62 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
63 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
64 { TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
65 { UUID[Uuid] => LargeUtf8[String] | conversion option }
66 { Char[&'r str] => LargeUtf8[String] | conversion none }
67 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
68 { JSON[Value] => LargeUtf8[String] | conversion option }
69 { JSONB[Value] => LargeUtf8[String] | conversion none }
70 { Inet[IpInet] => LargeUtf8[String] | conversion none }
71 { BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
72 { VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
73 { TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
74 { Int2Array[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
75 { Int4Array[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
76 { Int8Array[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
77 { Float4Array[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
78 { Float8Array[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
79 { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
80 { Vector[Vector] => Float32Array[Vec<Option<f32>>] | conversion option }
81 { HalfVec[HalfVector] => Float32Array[Vec<Option<f32>>] | conversion option }
82 { Bit[Bit] => LargeBinary[Vec<u8>] | conversion option }
83 { SparseVec[SparseVector] => Float32Array[Vec<Option<f32>>] | conversion option }
84 }
85 );
86 }
87}
88
89impl_postgres_transport!(BinaryProtocol, NoTls);
90impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
91impl_postgres_transport!(CSVProtocol, NoTls);
92impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
93impl_postgres_transport!(CursorProtocol, NoTls);
94impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
95impl_postgres_transport!(SimpleProtocol, NoTls);
96impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
97
98impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
99 fn convert(val: IpInet) -> String {
100 val.to_string()
101 }
102}
103
104impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
105 fn convert(val: Option<IpInet>) -> Option<String> {
106 val.map(|val| val.to_string())
107 }
108}
109
110impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
111 fn convert(val: Uuid) -> String {
112 val.to_string()
113 }
114}
115
116impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
117 fn convert(val: Value) -> String {
118 val.to_string()
119 }
120}
121
122impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
123 fn convert(val: Vector) -> Vec<Option<f32>> {
124 val.to_vec().into_iter().map(Some).collect()
125 }
126}
127
128impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
129 fn convert(val: HalfVector) -> Vec<Option<f32>> {
130 val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
131 }
132}
133
134impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
135 fn convert(val: Bit) -> Vec<u8> {
136 val.as_bytes().into()
137 }
138}
139
140impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
141 fn convert(val: SparseVector) -> Vec<Option<f32>> {
142 val.to_vec().into_iter().map(Some).collect()
143 }
144}
145
146impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
147 for PostgresArrowTransport<P, C>
148{
149 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
150 NaiveDateTimeWrapperMicro(val)
151 }
152}
153
154impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrowTransport<P, C> {
155 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
156 NaiveTimeWrapperMicro(val)
157 }
158}
159
160impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrowTransport<P, C> {
161 fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
162 DateTimeWrapperMicro(val)
163 }
164}