connectorx/transports/
trino_arrowstream.rs1use crate::{
4 destinations::arrowstream::{
5 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
6 },
7 impl_transport,
8 sources::trino::{TrinoSource, TrinoSourceError, TrinoTypeSystem},
9 typesystem::TypeConversion,
10};
11use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
12use num_traits::ToPrimitive;
13use rust_decimal::Decimal;
14use serde_json::{to_string, Value};
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum TrinoArrowTransportError {
19 #[error(transparent)]
20 Source(#[from] TrinoSourceError),
21
22 #[error(transparent)]
23 Destination(#[from] ArrowDestinationError),
24
25 #[error(transparent)]
26 ConnectorX(#[from] crate::errors::ConnectorXError),
27}
28
29pub struct TrinoArrowTransport();
31
32impl_transport!(
33 name = TrinoArrowTransport,
34 error = TrinoArrowTransportError,
35 systems = TrinoTypeSystem => ArrowTypeSystem,
36 route = TrinoSource => ArrowDestination,
37 mappings = {
38 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
39 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
40 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
41 { Boolean[bool] => Boolean[bool] | conversion auto }
42 { Bigint[i32] => Int64[i64] | conversion auto }
43 { Integer[i32] => Int64[i64] | conversion none }
44 { Smallint[i16] => Int64[i64] | conversion auto }
45 { Tinyint[i8] => Int64[i64] | conversion auto }
46 { Double[f64] => Float64[f64] | conversion auto }
47 { Real[f32] => Float64[f64] | conversion auto }
48 { Varchar[String] => LargeUtf8[String] | conversion auto }
49 { Char[String] => LargeUtf8[String] | conversion none }
50 }
51);
52
53impl TypeConversion<Decimal, f64> for TrinoArrowTransport {
54 fn convert(val: Decimal) -> f64 {
55 val.to_f64()
56 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
57 }
58}
59
60impl TypeConversion<Value, String> for TrinoArrowTransport {
61 fn convert(val: Value) -> String {
62 to_string(&val).unwrap()
63 }
64}