connectorx/transports/
trino_arrowstream.rs

1//! Transport from Trino Source to Arrow Destination.
2
3use crate::{
4    destinations::arrowstream::{
5        typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
6    },
7    impl_transport,
8    sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
9    typesystem::TypeConversion,
10};
11use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
12use num_traits::ToPrimitive;
13use rust_decimal::Decimal;
14use serde_json::{to_string, Value};
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum TrinoArrowTransportError {
19    #[error(transparent)]
20    Source(#[from] TrinoSourceError),
21
22    #[error(transparent)]
23    Destination(#[from] ArrowDestinationError),
24
25    #[error(transparent)]
26    ConnectorX(#[from] crate::errors::ConnectorXError),
27}
28
29/// Convert Trino data types to Arrow data types.
30pub struct TrinoArrowTransport();
31
32impl_transport!(
33    name = TrinoArrowTransport,
34    error = TrinoArrowTransportError,
35    systems = TrinoTypeSystem => ArrowTypeSystem,
36    route = TrinoSource => ArrowDestination,
37    mappings = {
38        { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
39        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
40        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion auto }
41        { Boolean[bool]              => Boolean[bool]           | conversion auto }
42        { Bigint[i32]                => Int64[i64]              | conversion auto }
43        { Integer[i32]               => Int64[i64]              | conversion none }
44        { Smallint[i16]              => Int64[i64]              | conversion auto }
45        { Tinyint[i8]                => Int64[i64]              | conversion auto }
46        { Double[f64]                => Float64[f64]            | conversion auto }
47        { Real[f32]                  => Float64[f64]            | conversion auto }
48        { Varchar[String]            => LargeUtf8[String]       | conversion auto }
49        { Char[String]               => LargeUtf8[String]       | conversion none }
50    }
51);
52
53impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
54    fn convert(val: Decimal) -> f64 {
55        val.to_f64()
56            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
57    }
58}
59
60impl TypeConversion<Value, String> for TrinoArrowTransport {
61    fn convert(val: Value) -> String {
62        to_string(&val).unwrap()
63    }
64}