connectorx/transports/
postgres_arrowstream.rs1use crate::destinations::arrowstream::{
4 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8 PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use cidr_02::IpInet;
13use pgvector::{Bit, HalfVector, SparseVector, Vector};
14use postgres::NoTls;
15use postgres_openssl::MakeTlsConnector;
16use rust_decimal::Decimal;
17use serde_json::Value;
18use std::marker::PhantomData;
19use thiserror::Error;
20use uuid::Uuid;
21
22#[derive(Error, Debug)]
23pub enum PostgresArrowTransportError {
24 #[error(transparent)]
25 Source(#[from] PostgresSourceError),
26
27 #[error(transparent)]
28 Destination(#[from] ArrowDestinationError),
29
30 #[error(transparent)]
31 ConnectorX(#[from] crate::errors::ConnectorXError),
32}
33
34pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
36
37macro_rules! impl_postgres_transport {
38 ($proto:ty, $tls:ty) => {
39 impl_transport!(
40 name = PostgresArrowTransport<$proto, $tls>,
41 error = PostgresArrowTransportError,
42 systems = PostgresTypeSystem => ArrowTypeSystem,
43 route = PostgresSource<$proto, $tls> => ArrowDestination,
44 mappings = {
45 { Float4[f32] => Float64[f64] | conversion auto }
46 { Float8[f64] => Float64[f64] | conversion auto }
47 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
48 { Int2[i16] => Int64[i64] | conversion auto }
49 { Int4[i32] => Int64[i64] | conversion auto }
50 { Int8[i64] => Int64[i64] | conversion auto }
51 { UInt4[u32] => UInt64[u64] | conversion auto }
52 { Bool[bool] => Boolean[bool] | conversion auto }
53 { Text[&'r str] => LargeUtf8[String] | conversion owned }
54 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
55 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
56 { Name[&'r str] => LargeUtf8[String] | conversion none }
57 { Enum[&'r str] => LargeUtf8[String] | conversion none }
58 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
59 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
60 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
61 { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
62 { UUID[Uuid] => LargeUtf8[String] | conversion option }
63 { Char[&'r str] => LargeUtf8[String] | conversion none }
64 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
65 { JSON[Value] => LargeUtf8[String] | conversion option }
66 { JSONB[Value] => LargeUtf8[String] | conversion none }
67 { Inet[IpInet] => LargeUtf8[String] | conversion none }
68 { BoolArray[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
69 { VarcharArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
70 { TextArray[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion none }
71 { Int2Array[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
72 { Int4Array[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
73 { Int8Array[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
74 { Float4Array[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
75 { Float8Array[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
76 { NumericArray[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
77 { Vector[Vector] => Float32Array[Vec<Option<f32>>] | conversion option }
78 { HalfVec[HalfVector] => Float32Array[Vec<Option<f32>>] | conversion option }
79 { Bit[Bit] => LargeBinary[Vec<u8>] | conversion option }
80 { SparseVec[SparseVector] => Float32Array[Vec<Option<f32>>] | conversion option }
81 }
82 );
83 }
84}
85
86impl_postgres_transport!(BinaryProtocol, NoTls);
87impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
88impl_postgres_transport!(CSVProtocol, NoTls);
89impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
90impl_postgres_transport!(CursorProtocol, NoTls);
91impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
92impl_postgres_transport!(SimpleProtocol, NoTls);
93impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
94
95impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
96 fn convert(val: IpInet) -> String {
97 val.to_string()
98 }
99}
100
101impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
102 fn convert(val: Option<IpInet>) -> Option<String> {
103 val.map(|val| val.to_string())
104 }
105}
106
107impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
108 fn convert(val: Uuid) -> String {
109 val.to_string()
110 }
111}
112
113impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
114 fn convert(val: Value) -> String {
115 val.to_string()
116 }
117}
118
119impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
120 fn convert(val: Vector) -> Vec<Option<f32>> {
121 val.to_vec().into_iter().map(Some).collect()
122 }
123}
124
125impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
126 fn convert(val: HalfVector) -> Vec<Option<f32>> {
127 val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
128 }
129}
130
131impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
132 fn convert(val: Bit) -> Vec<u8> {
133 val.as_bytes().into()
134 }
135}
136
137impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
138 fn convert(val: SparseVector) -> Vec<Option<f32>> {
139 val.to_vec().into_iter().map(Some).collect()
140 }
141}