connectorx/transports/
postgres_arrowstream.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrowstream::{
4    typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8    PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use cidr_02::IpInet;
13use pgvector::{Bit, HalfVector, SparseVector, Vector};
14use postgres::NoTls;
15use postgres_openssl::MakeTlsConnector;
16use rust_decimal::Decimal;
17use serde_json::Value;
18use std::marker::PhantomData;
19use thiserror::Error;
20use uuid::Uuid;
21
22#[derive(Error, Debug)]
23pub enum PostgresArrowTransportError {
24    #[error(transparent)]
25    Source(#[from] PostgresSourceError),
26
27    #[error(transparent)]
28    Destination(#[from] ArrowDestinationError),
29
30    #[error(transparent)]
31    ConnectorX(#[from] crate::errors::ConnectorXError),
32}
33
34/// Convert Postgres data types to Arrow data types.
35pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
36
37macro_rules! impl_postgres_transport {
38    ($proto:ty, $tls:ty) => {
39        impl_transport!(
40            name = PostgresArrowTransport<$proto, $tls>,
41            error = PostgresArrowTransportError,
42            systems = PostgresTypeSystem => ArrowTypeSystem,
43            route = PostgresSource<$proto, $tls> => ArrowDestination,
44            mappings = {
45                { Float4[f32]                => Float64[f64]                           | conversion auto   }
46                { Float8[f64]                => Float64[f64]                           | conversion auto   }
47                { Numeric[Decimal]           => Decimal[Decimal]                       | conversion auto   }
48                { Int2[i16]                  => Int64[i64]                             | conversion auto   }
49                { Int4[i32]                  => Int64[i64]                             | conversion auto   }
50                { Int8[i64]                  => Int64[i64]                             | conversion auto   }
51                { Bool[bool]                 => Boolean[bool]                          | conversion auto   }
52                { Text[&'r str]              => LargeUtf8[String]                      | conversion owned  }
53                { BpChar[&'r str]            => LargeUtf8[String]                      | conversion none   }
54                { VarChar[&'r str]           => LargeUtf8[String]                      | conversion none   }
55                { Name[&'r str]              => LargeUtf8[String]                      | conversion none   }
56                { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]                  | conversion auto   }
57                { Date[NaiveDate]            => Date32[NaiveDate]                      | conversion auto   }
58                { Time[NaiveTime]            => Time64[NaiveTime]                      | conversion auto   }
59                { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>]              | conversion auto   }
60                { UUID[Uuid]                 => LargeUtf8[String]                      | conversion option }
61                { Char[&'r str]              => LargeUtf8[String]                      | conversion none   }
62                { ByteA[Vec<u8>]             => LargeBinary[Vec<u8>]                   | conversion auto   }
63                { JSON[Value]                => LargeUtf8[String]                      | conversion option }
64                { JSONB[Value]               => LargeUtf8[String]                      | conversion none   }
65                { Inet[IpInet]               => LargeUtf8[String]                      | conversion none   }
66                { Vector[Vector]             => Float32Array[Vec<Option<f32>>]         | conversion option }
67                { HalfVec[HalfVector]        => Float32Array[Vec<Option<f32>>]         | conversion option }
68                { Bit[Bit]                   => LargeBinary[Vec<u8>]                   | conversion option }
69                { SparseVec[SparseVector]    => Float32Array[Vec<Option<f32>>]         | conversion option }
70            }
71        );
72    }
73}
74
75impl_postgres_transport!(BinaryProtocol, NoTls);
76impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
77impl_postgres_transport!(CSVProtocol, NoTls);
78impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
79impl_postgres_transport!(CursorProtocol, NoTls);
80impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
81impl_postgres_transport!(SimpleProtocol, NoTls);
82impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
83
84impl<P, C> TypeConversion<IpInet, String> for PostgresArrowTransport<P, C> {
85    fn convert(val: IpInet) -> String {
86        val.to_string()
87    }
88}
89
90impl<P, C> TypeConversion<Option<IpInet>, Option<String>> for PostgresArrowTransport<P, C> {
91    fn convert(val: Option<IpInet>) -> Option<String> {
92        val.map(|val| val.to_string())
93    }
94}
95
96impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
97    fn convert(val: Uuid) -> String {
98        val.to_string()
99    }
100}
101
102impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
103    fn convert(val: Value) -> String {
104        val.to_string()
105    }
106}
107
108impl<P, C> TypeConversion<Vector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
109    fn convert(val: Vector) -> Vec<Option<f32>> {
110        val.to_vec().into_iter().map(Some).collect()
111    }
112}
113
114impl<P, C> TypeConversion<HalfVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
115    fn convert(val: HalfVector) -> Vec<Option<f32>> {
116        val.to_vec().into_iter().map(|v| Some(v.to_f32())).collect()
117    }
118}
119
120impl<P, C> TypeConversion<Bit, Vec<u8>> for PostgresArrowTransport<P, C> {
121    fn convert(val: Bit) -> Vec<u8> {
122        val.as_bytes().into()
123    }
124}
125
126impl<P, C> TypeConversion<SparseVector, Vec<Option<f32>>> for PostgresArrowTransport<P, C> {
127    fn convert(val: SparseVector) -> Vec<Option<f32>> {
128        val.to_vec().into_iter().map(Some).collect()
129    }
130}