connectorx/transports/
trino_arrow.rs1use crate::{
4 destinations::arrow::{typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError},
5 impl_transport,
6 sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
7 typesystem::TypeConversion,
8};
9use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
10use num_traits::ToPrimitive;
11use rust_decimal::Decimal;
12use serde_json::{to_string, Value};
13use thiserror::Error;
14
15#[derive(Error, Debug)]
16pub enum TrinoArrowTransportError {
17 #[error(transparent)]
18 Source(#[from] TrinoSourceError),
19
20 #[error(transparent)]
21 Destination(#[from] ArrowDestinationError),
22
23 #[error(transparent)]
24 ConnectorX(#[from] crate::errors::ConnectorXError),
25}
26
27pub struct TrinoArrowTransport();
29
30impl_transport!(
31 name = TrinoArrowTransport,
32 error = TrinoArrowTransportError,
33 systems = TrinoTypeSystem => ArrowTypeSystem,
34 route = TrinoSource => ArrowDestination,
35 mappings = {
36 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
37 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
38 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
39 { Boolean[bool] => Boolean[bool] | conversion auto }
40 { Bigint[i32] => Int64[i64] | conversion auto }
41 { Integer[i32] => Int64[i64] | conversion none }
42 { Smallint[i16] => Int64[i64] | conversion auto }
43 { Tinyint[i8] => Int64[i64] | conversion auto }
44 { Double[f64] => Float64[f64] | conversion auto }
45 { Real[f32] => Float64[f64] | conversion auto }
46 { Varchar[String] => LargeUtf8[String] | conversion auto }
47 { Char[String] => LargeUtf8[String] | conversion none }
48 }
49);
50
51impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
52 fn convert(val: Decimal) -> f64 {
53 val.to_f64()
54 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
55 }
56}
57
58impl TypeConversion<Value, String> for TrinoArrowTransport {
59 fn convert(val: Value) -> String {
60 to_string(&val).unwrap()
61 }
62}