connectorx/transports/
clickhouse_arrowstream.rs1use crate::{
4 destinations::arrowstream::{
5 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
6 },
7 impl_transport,
8 sources::clickhouse::{ClickHouseSource, ClickHouseSourceError, ClickHouseTypeSystem},
9 typesystem::TypeConversion,
10};
11use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
12use rust_decimal::Decimal;
13use std::net::IpAddr;
14use thiserror::Error;
15use uuid::Uuid;
16
17#[derive(Error, Debug)]
18pub enum ClickHouseArrowTransportError {
19 #[error(transparent)]
20 Source(#[from] ClickHouseSourceError),
21
22 #[error(transparent)]
23 Destination(#[from] ArrowDestinationError),
24
25 #[error(transparent)]
26 ConnectorX(#[from] crate::errors::ConnectorXError),
27}
28
29pub struct ClickHouseArrowStreamTransport;
31
32impl_transport!(
33 name = ClickHouseArrowStreamTransport,
34 error = ClickHouseArrowTransportError,
35 systems = ClickHouseTypeSystem => ArrowTypeSystem,
36 route = ClickHouseSource => ArrowDestination,
37 mappings = {
38 { Int8[i8] => Int16[i16] | conversion auto }
39 { Int16[i16] => Int16[i16] | conversion auto }
40 { Int32[i32] => Int32[i32] | conversion auto }
41 { Int64[i64] => Int64[i64] | conversion auto }
42
43 { UInt8[u8] => UInt16[u16] | conversion auto }
44 { UInt16[u16] => UInt16[u16] | conversion auto }
45 { UInt32[u32] => UInt32[u32] | conversion auto }
46 { UInt64[u64] => UInt64[u64] | conversion auto }
47
48 { Float32[f32] => Float32[f32] | conversion auto }
49 { Float64[f64] => Float64[f64] | conversion auto }
50
51 { Decimal[Decimal] => Decimal[Decimal] | conversion auto }
52
53 { String[String] => LargeUtf8[String] | conversion auto }
54 { FixedString[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
55
56 { Enum8[String] => LargeUtf8[String] | conversion none }
57 { Enum16[String] => LargeUtf8[String] | conversion none }
58
59 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
60 { Date32[NaiveDate] => Date32[NaiveDate] | conversion none }
61 { DateTime[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
62 { DateTime64[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion none }
63 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
64 { Time64[NaiveTime] => Time64[NaiveTime] | conversion none }
65
66 { UUID[Uuid] => LargeUtf8[String] | conversion option }
67 { IPv4[IpAddr] => LargeUtf8[String] | conversion option }
68 { IPv6[IpAddr] => LargeUtf8[String] | conversion none }
69 { Bool[bool] => Boolean[bool] | conversion auto }
70
71 { ArrayBool[Vec<Option<bool>>] => BoolArray[Vec<Option<bool>>] | conversion auto }
72 { ArrayString[Vec<Option<String>>] => Utf8Array[Vec<Option<String>>] | conversion auto }
73 { ArrayInt8[Vec<Option<i8>>] => Int16Array[Vec<Option<i16>>] | conversion none }
74 { ArrayInt16[Vec<Option<i16>>] => Int16Array[Vec<Option<i16>>] | conversion auto }
75 { ArrayInt32[Vec<Option<i32>>] => Int32Array[Vec<Option<i32>>] | conversion auto }
76 { ArrayInt64[Vec<Option<i64>>] => Int64Array[Vec<Option<i64>>] | conversion auto }
77 { ArrayUInt8[Vec<Option<u8>>] => UInt16Array[Vec<Option<u16>>] | conversion none }
78 { ArrayUInt16[Vec<Option<u16>>] => UInt16Array[Vec<Option<u16>>] | conversion auto }
79 { ArrayUInt32[Vec<Option<u32>>] => UInt32Array[Vec<Option<u32>>] | conversion auto }
80 { ArrayUInt64[Vec<Option<u64>>] => UInt64Array[Vec<Option<u64>>] | conversion auto }
81 { ArrayFloat32[Vec<Option<f32>>] => Float32Array[Vec<Option<f32>>] | conversion auto }
82 { ArrayFloat64[Vec<Option<f64>>] => Float64Array[Vec<Option<f64>>] | conversion auto }
83 { ArrayDecimal[Vec<Option<Decimal>>] => DecimalArray[Vec<Option<Decimal>>] | conversion auto }
84 }
85);
86
87impl TypeConversion<Uuid, String> for ClickHouseArrowStreamTransport {
88 fn convert(val: Uuid) -> String {
89 val.to_string()
90 }
91}
92
93impl TypeConversion<IpAddr, String> for ClickHouseArrowStreamTransport {
94 fn convert(val: IpAddr) -> String {
95 val.to_string()
96 }
97}
98
99impl TypeConversion<Vec<Option<i8>>, Vec<Option<i16>>> for ClickHouseArrowStreamTransport {
100 fn convert(val: Vec<Option<i8>>) -> Vec<Option<i16>> {
101 val.into_iter().map(|opt| opt.map(|v| v as i16)).collect()
102 }
103}
104
105impl TypeConversion<Vec<Option<u8>>, Vec<Option<u16>>> for ClickHouseArrowStreamTransport {
106 fn convert(val: Vec<Option<u8>>) -> Vec<Option<u16>> {
107 val.into_iter().map(|opt| opt.map(|v| v as u16)).collect()
108 }
109}
110
111impl TypeConversion<Option<Vec<Option<i8>>>, Option<Vec<Option<i16>>>>
112 for ClickHouseArrowStreamTransport
113{
114 fn convert(val: Option<Vec<Option<i8>>>) -> Option<Vec<Option<i16>>> {
115 val.map(Self::convert)
116 }
117}
118
119impl TypeConversion<Option<Vec<Option<u8>>>, Option<Vec<Option<u16>>>>
120 for ClickHouseArrowStreamTransport
121{
122 fn convert(val: Option<Vec<Option<u8>>>) -> Option<Vec<Option<u16>>> {
123 val.map(Self::convert)
124 }
125}