connectorx/transports/
clickhouse_arrowstream.rs

1//! Transport from ClickHouse Source to Arrow Stream Destination.
2
3use crate::{
4    destinations::arrowstream::{
5        typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
6    },
7    impl_transport,
8    sources::clickhouse::{ClickHouseSource, ClickHouseSourceError, ClickHouseTypeSystem},
9    typesystem::TypeConversion,
10};
11use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
12use rust_decimal::Decimal;
13use std::net::IpAddr;
14use thiserror::Error;
15use uuid::Uuid;
16
17#[derive(Error, Debug)]
18pub enum ClickHouseArrowTransportError {
19    #[error(transparent)]
20    Source(#[from] ClickHouseSourceError),
21
22    #[error(transparent)]
23    Destination(#[from] ArrowDestinationError),
24
25    #[error(transparent)]
26    ConnectorX(#[from] crate::errors::ConnectorXError),
27}
28
29/// Convert ClickHouse data types to Arrow Stream data types.
30pub struct ClickHouseArrowStreamTransport;
31
32impl_transport!(
33    name = ClickHouseArrowStreamTransport,
34    error = ClickHouseArrowTransportError,
35    systems = ClickHouseTypeSystem => ArrowTypeSystem,
36    route = ClickHouseSource => ArrowDestination,
37    mappings = {
38        { Int8[i8]                   => Int16[i16]                              | conversion auto }
39        { Int16[i16]                 => Int16[i16]                              | conversion auto }
40        { Int32[i32]                 => Int32[i32]                              | conversion auto }
41        { Int64[i64]                 => Int64[i64]                              | conversion auto }
42
43        { UInt8[u8]                  => UInt16[u16]                             | conversion auto }
44        { UInt16[u16]                => UInt16[u16]                             | conversion auto }
45        { UInt32[u32]                => UInt32[u32]                             | conversion auto }
46        { UInt64[u64]                => UInt64[u64]                             | conversion auto }
47
48        { Float32[f32]               => Float32[f32]                            | conversion auto }
49        { Float64[f64]               => Float64[f64]                            | conversion auto }
50
51        { Decimal[Decimal]           => Decimal[Decimal]                        | conversion auto }
52
53        { String[String]             => LargeUtf8[String]                       | conversion auto }
54        { FixedString[Vec<u8>]       => LargeBinary[Vec<u8>]                    | conversion auto }
55
56        { Enum8[String]              => LargeUtf8[String]                       | conversion none }
57        { Enum16[String]             => LargeUtf8[String]                       | conversion none }
58
59        { Date[NaiveDate]            => Date32[NaiveDate]                       | conversion auto }
60        { Date32[NaiveDate]          => Date32[NaiveDate]                       | conversion none }
61        { DateTime[DateTime<Utc>]    => DateTimeTz[DateTime<Utc>]               | conversion auto }
62        { DateTime64[DateTime<Utc>]  => DateTimeTz[DateTime<Utc>]               | conversion none }
63        { Time[NaiveTime]            => Time64[NaiveTime]                       | conversion auto }
64        { Time64[NaiveTime]          => Time64[NaiveTime]                       | conversion none }
65
66        { UUID[Uuid]                 => LargeUtf8[String]                       | conversion option }
67        { IPv4[IpAddr]               => LargeUtf8[String]                       | conversion option }
68        { IPv6[IpAddr]               => LargeUtf8[String]                       | conversion none }
69        { Bool[bool]                 => Boolean[bool]                           | conversion auto }
70
71        { ArrayBool[Vec<Option<bool>>]         => BoolArray[Vec<Option<bool>>]           | conversion auto }
72        { ArrayString[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]         | conversion auto }
73        { ArrayInt8[Vec<Option<i8>>]           => Int16Array[Vec<Option<i16>>]           | conversion none }
74        { ArrayInt16[Vec<Option<i16>>]         => Int16Array[Vec<Option<i16>>]           | conversion auto }
75        { ArrayInt32[Vec<Option<i32>>]         => Int32Array[Vec<Option<i32>>]           | conversion auto }
76        { ArrayInt64[Vec<Option<i64>>]         => Int64Array[Vec<Option<i64>>]           | conversion auto }
77        { ArrayUInt8[Vec<Option<u8>>]          => UInt16Array[Vec<Option<u16>>]          | conversion none }
78        { ArrayUInt16[Vec<Option<u16>>]        => UInt16Array[Vec<Option<u16>>]          | conversion auto }
79        { ArrayUInt32[Vec<Option<u32>>]        => UInt32Array[Vec<Option<u32>>]          | conversion auto }
80        { ArrayUInt64[Vec<Option<u64>>]        => UInt64Array[Vec<Option<u64>>]          | conversion auto }
81        { ArrayFloat32[Vec<Option<f32>>]       => Float32Array[Vec<Option<f32>>]         | conversion auto }
82        { ArrayFloat64[Vec<Option<f64>>]       => Float64Array[Vec<Option<f64>>]         | conversion auto }
83        { ArrayDecimal[Vec<Option<Decimal>>]   => DecimalArray[Vec<Option<Decimal>>]     | conversion auto }
84    }
85);
86
87impl TypeConversion<Uuid, String> for ClickHouseArrowStreamTransport {
88    fn convert(val: Uuid) -> String {
89        val.to_string()
90    }
91}
92
93impl TypeConversion<IpAddr, String> for ClickHouseArrowStreamTransport {
94    fn convert(val: IpAddr) -> String {
95        val.to_string()
96    }
97}
98
99impl TypeConversion<Vec<Option<i8>>, Vec<Option<i16>>> for ClickHouseArrowStreamTransport {
100    fn convert(val: Vec<Option<i8>>) -> Vec<Option<i16>> {
101        val.into_iter().map(|opt| opt.map(|v| v as i16)).collect()
102    }
103}
104
105impl TypeConversion<Vec<Option<u8>>, Vec<Option<u16>>> for ClickHouseArrowStreamTransport {
106    fn convert(val: Vec<Option<u8>>) -> Vec<Option<u16>> {
107        val.into_iter().map(|opt| opt.map(|v| v as u16)).collect()
108    }
109}
110
111impl TypeConversion<Option<Vec<Option<i8>>>, Option<Vec<Option<i16>>>>
112    for ClickHouseArrowStreamTransport
113{
114    fn convert(val: Option<Vec<Option<i8>>>) -> Option<Vec<Option<i16>>> {
115        val.map(Self::convert)
116    }
117}
118
119impl TypeConversion<Option<Vec<Option<u8>>>, Option<Vec<Option<u16>>>>
120    for ClickHouseArrowStreamTransport
121{
122    fn convert(val: Option<Vec<Option<u8>>>) -> Option<Vec<Option<u16>>> {
123        val.map(Self::convert)
124    }
125}