Skip to main content

connectorx/transports/
trino_arrow.rs

1//! Transport from Trino Source to Arrow Destination.
2
3use crate::{
4    destinations::arrow::{
5        typesystem::{ArrowTypeSystem, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
6        ArrowDestination, ArrowDestinationError,
7    },
8    impl_transport,
9    sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
10    typesystem::TypeConversion,
11};
12use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
13use num_traits::ToPrimitive;
14use rust_decimal::Decimal;
15use serde_json::{to_string, Value};
16use thiserror::Error;
17
18#[derive(Error, Debug)]
19pub enum TrinoArrowTransportError {
20    #[error(transparent)]
21    Source(#[from] TrinoSourceError),
22
23    #[error(transparent)]
24    Destination(#[from] ArrowDestinationError),
25
26    #[error(transparent)]
27    ConnectorX(#[from] crate::errors::ConnectorXError),
28}
29
30/// Convert Trino data types to Arrow data types.
31pub struct TrinoArrowTransport();
32
33impl_transport!(
34    name = TrinoArrowTransport,
35    error = TrinoArrowTransportError,
36    systems = TrinoTypeSystem => ArrowTypeSystem,
37    route = TrinoSource => ArrowDestination,
38    mappings = {
39        { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
40        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]       | conversion option }
41        { Timestamp[NaiveDateTime]   => Date64Micro[NaiveDateTimeWrapperMicro]   | conversion option }
42        { Boolean[bool]              => Boolean[bool]           | conversion auto }
43        { Bigint[i32]                => Int64[i64]              | conversion auto }
44        { Integer[i32]               => Int64[i64]              | conversion none }
45        { Smallint[i16]              => Int64[i64]              | conversion auto }
46        { Tinyint[i8]                => Int64[i64]              | conversion auto }
47        { Double[f64]                => Float64[f64]            | conversion auto }
48        { Real[f32]                  => Float64[f64]            | conversion auto }
49        { Varchar[String]            => LargeUtf8[String]       | conversion auto }
50        { Char[String]               => LargeUtf8[String]       | conversion none }
51    }
52);
53
54impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
55    fn convert(val: Decimal) -> f64 {
56        val.to_f64()
57            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
58    }
59}
60
61impl TypeConversion<Value, String> for TrinoArrowTransport {
62    fn convert(val: Value) -> String {
63        to_string(&val).unwrap()
64    }
65}
66
67impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for TrinoArrowTransport {
68    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
69        NaiveTimeWrapperMicro(val)
70    }
71}
72
73impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for TrinoArrowTransport {
74    fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
75        NaiveDateTimeWrapperMicro(val)
76    }
77}