connectorx/transports/
trino_arrow.rs

1//! Transport from Trino Source to Arrow Destination.
2
3use crate::{
4    destinations::arrow::{typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError},
5    impl_transport,
6    sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
7    typesystem::TypeConversion,
8};
9use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
10use num_traits::ToPrimitive;
11use rust_decimal::Decimal;
12use serde_json::{to_string, Value};
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum TrinoArrowTransportError {
17    #[error(transparent)]
18    Source(#[from] TrinoSourceError),
19
20    #[error(transparent)]
21    Destination(#[from] ArrowDestinationError),
22
23    #[error(transparent)]
24    ConnectorX(#[from] crate::errors::ConnectorXError),
25}
26
27/// Convert Trino data types to Arrow data types.
28pub struct TrinoArrowTransport();
29
30impl_transport!(
31    name = TrinoArrowTransport,
32    error = TrinoArrowTransportError,
33    systems = TrinoTypeSystem => ArrowTypeSystem,
34    route = TrinoSource => ArrowDestination,
35    mappings = {
36        { Date[NaiveDate]            => Date32[NaiveDate]       | conversion auto }
37        { Time[NaiveTime]            => Time64[NaiveTime]       | conversion auto }
38        { Timestamp[NaiveDateTime]   => Date64[NaiveDateTime]   | conversion auto }
39        { Boolean[bool]              => Boolean[bool]           | conversion auto }
40        { Bigint[i32]                => Int64[i64]              | conversion auto }
41        { Integer[i32]               => Int64[i64]              | conversion none }
42        { Smallint[i16]              => Int64[i64]              | conversion auto }
43        { Tinyint[i8]                => Int64[i64]              | conversion auto }
44        { Double[f64]                => Float64[f64]            | conversion auto }
45        { Real[f32]                  => Float64[f64]            | conversion auto }
46        { Varchar[String]            => LargeUtf8[String]       | conversion auto }
47        { Char[String]               => LargeUtf8[String]       | conversion none }
48    }
49);
50
51impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
52    fn convert(val: Decimal) -> f64 {
53        val.to_f64()
54            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
55    }
56}
57
58impl TypeConversion<Value, String> for TrinoArrowTransport {
59    fn convert(val: Value) -> String {
60        to_string(&val).unwrap()
61    }
62}