connectorx/transports/
postgres_arrowstream.rs1use crate::destinations::arrowstream::{
4 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8 PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use cidr_02::IpInet;
13use pgvector::{Bit, HalfVector, SparseVector, Vector};
14use postgres::NoTls;
15use postgres_openssl::MakeTlsConnector;
16use rust_decimal::Decimal;
17use serde_json::Value;
18use std::marker::PhantomData;
19use thiserror::Error;
20use uuid::Uuid;
21
22#[derive(Error, Debug)]
23pub enum PostgresArrowTransportError {
24 #[error(transparent)]
25 Source(#[from] PostgresSourceError),
26
27 #[error(transparent)]
28 Destination(#[from] ArrowDestinationError),
29
30 #[error(transparent)]
31 ConnectorX(#[from] crate::errors::ConnectorXError),
32}
33
34pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
36
37macro_rules! impl_postgres_transport {
38 ($proto:ty, $tls:ty) => {
39 impl_transport!(
40 name = PostgresArrowTransport<$proto, $tls>,
41 error = PostgresArrowTransportError,
42 systems = PostgresTypeSystem => ArrowTypeSystem,
43 route = PostgresSource<$proto, $tls> => ArrowDestination,
44 mappings = {
45 { Float4[f32] => Float64[f64] | conversion auto }
46 { Float8[f64] => Float64[f64] | conversion auto }
47 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
48 { Int2[i16] => Int64[i64] | conversion auto }
49 { Int4[i32] => Int64[i64] | conversion auto }
50 { Int8[i64] => Int64[i64] | conversion auto }
51 { Bool[bool] => Boolean[bool] | conversion auto }
52 { Text[&'r str] => LargeUtf8[String] | conversion owned }
53 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
54 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
55 { Name[&'r str] => LargeUtf8[String] | conversion none }
56 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
57 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
58 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
59 { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
60 { UUID[Uuid] => LargeUtf8[String] | conversion option }
61 { Char[&'r str] => LargeUtf8[String] | conversion none }
62 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
63 { JSON[Value] => LargeUtf8[String] | conversion option }
64 { JSONB[Value] => LargeUtf8[String] | conversion none }
65 { Inet[IpInet] => LargeUtf8[String] | conversion none }
66 { Vector[Vector] => Float32Array[Vec<Option<f32>>] | conversion option }
67 { HalfVec[HalfVector] => Float32Array[Vec<Option<f32>>] | conversion option }
68 { Bit[Bit] => LargeBinary[Vec<u8>] | conversion option }
69 { SparseVec[SparseVector] => Float32Array[Vec<Option<f32>>] | conversion option }
70 }
71 );
72 }
73}
74
75impl_postgres_transport!(BinaryProtocol, NoTls);
76impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
77impl_postgres_transport!(CSVProtocol, NoTls);
78impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
79impl_postgres_transport!(CursorProtocol, NoTls);
80impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
81impl_postgres_transport!(SimpleProtocol, NoTls);
82impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
83
84impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
85 fn convert(val: IpInet) -> String {
86 val.to_string()
87 }
88}
89
90impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
91 fn convert(val: Option<IpInet>) -> Option<String> {
92 val.map(|val| val.to_string())
93 }
94}
95
96impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
97 fn convert(val: Uuid) -> String {
98 val.to_string()
99 }
100}
101
102impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
103 fn convert(val: Value) -> String {
104 val.to_string()
105 }
106}
107
108impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
109 fn convert(val: Vector) -> Vec<Option<f32>> {
110 val.to_vec().into_iter().map(Some).collect()
111 }
112}
113
114impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
115 fn convert(val: HalfVector) -> Vec<Option<f32>> {
116 val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
117 }
118}
119
120impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
121 fn convert(val: Bit) -> Vec<u8> {
122 val.as_bytes().into()
123 }
124}
125
126impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
127 fn convert(val: SparseVector) -> Vec<Option<f32>> {
128 val.to_vec().into_iter().map(Some).collect()
129 }
130}