connectorx/transports/
trino_arrowstream.rs1use crate::{
4 destinations::arrowstream::{
5 typesystem::{ArrowTypeSystem, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
6 ArrowDestination, ArrowDestinationError,
7 },
8 impl_transport,
9 sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
10 typesystem::TypeConversion,
11};
12use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
13use num_traits::ToPrimitive;
14use rust_decimal::Decimal;
15use serde_json::{to_string, Value};
16use thiserror::Error;
17
18#[derive(Error, Debug)]
19pub enum TrinoArrowTransportError {
20 #[error(transparent)]
21 Source(#[from] TrinoSourceError),
22
23 #[error(transparent)]
24 Destination(#[from] ArrowDestinationError),
25
26 #[error(transparent)]
27 ConnectorX(#[from] crate::errors::ConnectorXError),
28}
29
30pub struct TrinoArrowTransport();
32
33impl_transport!(
34 name = TrinoArrowTransport,
35 error = TrinoArrowTransportError,
36 systems = TrinoTypeSystem => ArrowTypeSystem,
37 route = TrinoSource => ArrowDestination,
38 mappings = {
39 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
40 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
41 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
42 { Boolean[bool] => Boolean[bool] | conversion auto }
43 { Bigint[i32] => Int64[i64] | conversion auto }
44 { Integer[i32] => Int64[i64] | conversion none }
45 { Smallint[i16] => Int64[i64] | conversion auto }
46 { Tinyint[i8] => Int64[i64] | conversion auto }
47 { Double[f64] => Float64[f64] | conversion auto }
48 { Real[f32] => Float64[f64] | conversion auto }
49 { Varchar[String] => LargeUtf8[String] | conversion auto }
50 { Char[String] => LargeUtf8[String] | conversion none }
51 }
52);
53
54impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
55 fn convert(val: Decimal) -> f64 {
56 val.to_f64()
57 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
58 }
59}
60
61impl TypeConversion<Value, String> for TrinoArrowTransport {
62 fn convert(val: Value) -> String {
63 to_string(&val).unwrap()
64 }
65}
66
67impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for TrinoArrowTransport {
68 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
69 NaiveTimeWrapperMicro(val)
70 }
71}
72
73impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for TrinoArrowTransport {
74 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
75 NaiveDateTimeWrapperMicro(val)
76 }
77}