connectorx/transports/
postgres_arrowstream.rs

1//! Transport from Postgres Source to Arrow Destination.
2
3use crate::destinations::arrowstream::{
4    typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7    BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8    PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use postgres::NoTls;
13use postgres_openssl::MakeTlsConnector;
14use rust_decimal::Decimal;
15use serde_json::Value;
16use std::marker::PhantomData;
17use thiserror::Error;
18use uuid::Uuid;
19
20#[derive(Error, Debug)]
21pub enum PostgresArrowTransportError {
22    #[error(transparent)]
23    Source(#[from] PostgresSourceError),
24
25    #[error(transparent)]
26    Destination(#[from] ArrowDestinationError),
27
28    #[error(transparent)]
29    ConnectorX(#[from] crate::errors::ConnectorXError),
30}
31
32/// Convert Postgres data types to Arrow data types.
33pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
34
35macro_rules! impl_postgres_transport {
36    ($proto:ty, $tls:ty) => {
37        impl_transport!(
38            name = PostgresArrowTransport<$proto, $tls>,
39            error = PostgresArrowTransportError,
40            systems = PostgresTypeSystem => ArrowTypeSystem,
41            route = PostgresSource<$proto, $tls> => ArrowDestination,
42            mappings = {
43                { Float4[f32]                => Float64[f64]              | conversion auto }
44                { Float8[f64]                => Float64[f64]              | conversion auto }
45                { Numeric[Decimal]           => Decimal[Decimal]          | conversion auto }
46                { Int2[i16]                  => Int64[i64]                | conversion auto }
47                { Int4[i32]                  => Int64[i64]                | conversion auto }
48                { Int8[i64]                  => Int64[i64]                | conversion auto }
49                { Bool[bool]                 => Boolean[bool]             | conversion auto  }
50                { Text[&'r str]              => LargeUtf8[String]         | conversion owned }
51                { BpChar[&'r str]            => LargeUtf8[String]         | conversion none }
52                { VarChar[&'r str]           => LargeUtf8[String]         | conversion none }
53                { Name[&'r str]              => LargeUtf8[String]         | conversion none }
54                { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]     | conversion auto }
55                { Date[NaiveDate]            => Date32[NaiveDate]         | conversion auto }
56                { Time[NaiveTime]            => Time64[NaiveTime]         | conversion auto }
57                { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
58                { UUID[Uuid]                 => LargeUtf8[String]         | conversion option }
59                { Char[&'r str]              => LargeUtf8[String]         | conversion none }
60                { ByteA[Vec<u8>]             => LargeBinary[Vec<u8>]      | conversion auto }
61                { JSON[Value]                => LargeUtf8[String]         | conversion option }
62                { JSONB[Value]               => LargeUtf8[String]         | conversion none }
63            }
64        );
65    }
66}
67
68impl_postgres_transport!(BinaryProtocol, NoTls);
69impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
70impl_postgres_transport!(CSVProtocol, NoTls);
71impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
72impl_postgres_transport!(CursorProtocol, NoTls);
73impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
74impl_postgres_transport!(SimpleProtocol, NoTls);
75impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
76
77impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
78    fn convert(val: Uuid) -> String {
79        val.to_string()
80    }
81}
82
83impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
84    fn convert(val: Value) -> String {
85        val.to_string()
86    }
87}