connectorx/transports/
postgres_arrowstream.rs1use crate::destinations::arrowstream::{
4 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
5};
6use crate::sources::postgres::{
7 BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
8 PostgresTypeSystem, SimpleProtocol,
9};
10use crate::typesystem::TypeConversion;
11use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
12use postgres::NoTls;
13use postgres_openssl::MakeTlsConnector;
14use rust_decimal::Decimal;
15use serde_json::Value;
16use std::marker::PhantomData;
17use thiserror::Error;
18use uuid::Uuid;
19
20#[derive(Error, Debug)]
21pub enum PostgresArrowTransportError {
22 #[error(transparent)]
23 Source(#[from] PostgresSourceError),
24
25 #[error(transparent)]
26 Destination(#[from] ArrowDestinationError),
27
28 #[error(transparent)]
29 ConnectorX(#[from] crate::errors::ConnectorXError),
30}
31
32pub struct PostgresArrowTransport<P, C>(PhantomData<P>, PhantomData<C>);
34
35macro_rules! impl_postgres_transport {
36 ($proto:ty, $tls:ty) => {
37 impl_transport!(
38 name = PostgresArrowTransport<$proto, $tls>,
39 error = PostgresArrowTransportError,
40 systems = PostgresTypeSystem => ArrowTypeSystem,
41 route = PostgresSource<$proto, $tls> => ArrowDestination,
42 mappings = {
43 { Float4[f32] => Float64[f64] | conversion auto }
44 { Float8[f64] => Float64[f64] | conversion auto }
45 { Numeric[Decimal] => Decimal[Decimal] | conversion auto }
46 { Int2[i16] => Int64[i64] | conversion auto }
47 { Int4[i32] => Int64[i64] | conversion auto }
48 { Int8[i64] => Int64[i64] | conversion auto }
49 { Bool[bool] => Boolean[bool] | conversion auto }
50 { Text[&'r str] => LargeUtf8[String] | conversion owned }
51 { BpChar[&'r str] => LargeUtf8[String] | conversion none }
52 { VarChar[&'r str] => LargeUtf8[String] | conversion none }
53 { Name[&'r str] => LargeUtf8[String] | conversion none }
54 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
55 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
56 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
57 { TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
58 { UUID[Uuid] => LargeUtf8[String] | conversion option }
59 { Char[&'r str] => LargeUtf8[String] | conversion none }
60 { ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
61 { JSON[Value] => LargeUtf8[String] | conversion option }
62 { JSONB[Value] => LargeUtf8[String] | conversion none }
63 }
64 );
65 }
66}
67
68impl_postgres_transport!(BinaryProtocol, NoTls);
69impl_postgres_transport!(BinaryProtocol, MakeTlsConnector);
70impl_postgres_transport!(CSVProtocol, NoTls);
71impl_postgres_transport!(CSVProtocol, MakeTlsConnector);
72impl_postgres_transport!(CursorProtocol, NoTls);
73impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
74impl_postgres_transport!(SimpleProtocol, NoTls);
75impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
76
77impl<P, C> TypeConversion<Uuid, String> for PostgresArrowTransport<P, C> {
78 fn convert(val: Uuid) -> String {
79 val.to_string()
80 }
81}
82
83impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
84 fn convert(val: Value) -> String {
85 val.to_string()
86 }
87}