Skip to main content

connectorx/transports/
clickhouse_arrow.rs

1//! Transport from ClickHouse Source to Arrow Destination.
2
3use crate::{
4    destinations::arrow::{
5        typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveTimeWrapperMicro},
6        ArrowDestination, ArrowDestinationError,
7    },
8    impl_transport,
9    sources::clickhouse::{ClickHouseSource, ClickHouseSourceError, ClickHouseTypeSystem},
10    typesystem::TypeConversion,
11};
12use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
13use rust_decimal::Decimal;
14use std::net::IpAddr;
15use thiserror::Error;
16use uuid::Uuid;
17
18#[derive(Error, Debug)]
19pub enum ClickHouseArrowTransportError {
20    #[error(transparent)]
21    Source(#[from] ClickHouseSourceError),
22
23    #[error(transparent)]
24    Destination(#[from] ArrowDestinationError),
25
26    #[error(transparent)]
27    ConnectorX(#[from] crate::errors::ConnectorXError),
28}
29
30/// Convert ClickHouse data types to Arrow data types.
31pub struct ClickHouseArrowTransport;
32
33impl_transport!(
34    name = ClickHouseArrowTransport,
35    error = ClickHouseArrowTransportError,
36    systems = ClickHouseTypeSystem => ArrowTypeSystem,
37    route = ClickHouseSource => ArrowDestination,
38    mappings = {
39        { Int8[i8]                   => Int16[i16]                              | conversion auto }
40        { Int16[i16]                 => Int16[i16]                              | conversion auto }
41        { Int32[i32]                 => Int32[i32]                              | conversion auto }
42        { Int64[i64]                 => Int64[i64]                              | conversion auto }
43
44        { UInt8[u8]                  => UInt16[u16]                             | conversion auto }
45        { UInt16[u16]                => UInt16[u16]                             | conversion auto }
46        { UInt32[u32]                => UInt32[u32]                             | conversion auto }
47        { UInt64[u64]                => UInt64[u64]                             | conversion auto }
48
49        { Float32[f32]               => Float32[f32]                            | conversion auto }
50        { Float64[f64]               => Float64[f64]                            | conversion auto }
51
52        { Decimal[Decimal]           => Decimal[Decimal]                        | conversion auto }
53
54        { String[String]             => LargeUtf8[String]                       | conversion auto }
55        { FixedString[Vec<u8>]       => LargeBinary[Vec<u8>]                    | conversion auto }
56
57        { Enum8[String]              => LargeUtf8[String]                       | conversion none }
58        { Enum16[String]             => LargeUtf8[String]                       | conversion none }
59
60        { Date[NaiveDate]            => Date32[NaiveDate]                       | conversion auto }
61        { Date32[NaiveDate]          => Date32[NaiveDate]                       | conversion none }
62        { DateTime[DateTime<Utc>]    => DateTimeTzMicro[DateTimeWrapperMicro]   | conversion option }
63        { DateTime64[DateTime<Utc>]  => DateTimeTzMicro[DateTimeWrapperMicro]   | conversion none }
64        { Time[NaiveTime]            => Time64Micro[NaiveTimeWrapperMicro]      | conversion option }
65        { Time64[NaiveTime]          => Time64Micro[NaiveTimeWrapperMicro]      | conversion none }
66
67        { UUID[Uuid]                 => LargeUtf8[String]                       | conversion option }
68        { IPv4[IpAddr]               => LargeUtf8[String]                       | conversion option }
69        { IPv6[IpAddr]               => LargeUtf8[String]                       | conversion none }
70        { Bool[bool]                 => Boolean[bool]                           | conversion auto }
71
72        { ArrayBool[Vec<Option<bool>>]         => BoolArray[Vec<Option<bool>>]           | conversion auto }
73        { ArrayString[Vec<Option<String>>]     => Utf8Array[Vec<Option<String>>]         | conversion auto }
74        { ArrayInt8[Vec<Option<i8>>]           => Int16Array[Vec<Option<i16>>]           | conversion none }
75        { ArrayInt16[Vec<Option<i16>>]         => Int16Array[Vec<Option<i16>>]           | conversion auto }
76        { ArrayInt32[Vec<Option<i32>>]         => Int32Array[Vec<Option<i32>>]           | conversion auto }
77        { ArrayInt64[Vec<Option<i64>>]         => Int64Array[Vec<Option<i64>>]           | conversion auto }
78        { ArrayUInt8[Vec<Option<u8>>]          => UInt16Array[Vec<Option<u16>>]          | conversion none }
79        { ArrayUInt16[Vec<Option<u16>>]        => UInt16Array[Vec<Option<u16>>]          | conversion auto }
80        { ArrayUInt32[Vec<Option<u32>>]        => UInt32Array[Vec<Option<u32>>]          | conversion auto }
81        { ArrayUInt64[Vec<Option<u64>>]        => UInt64Array[Vec<Option<u64>>]          | conversion auto }
82        { ArrayFloat32[Vec<Option<f32>>]       => Float32Array[Vec<Option<f32>>]         | conversion auto }
83        { ArrayFloat64[Vec<Option<f64>>]       => Float64Array[Vec<Option<f64>>]         | conversion auto }
84        { ArrayDecimal[Vec<Option<Decimal>>]   => DecimalArray[Vec<Option<Decimal>>]     | conversion auto }
85    }
86);
87
88impl TypeConversion<Uuid, String> for ClickHouseArrowTransport {
89    fn convert(val: Uuid) -> String {
90        val.to_string()
91    }
92}
93
94impl TypeConversion<IpAddr, String> for ClickHouseArrowTransport {
95    fn convert(val: IpAddr) -> String {
96        val.to_string()
97    }
98}
99
100impl TypeConversion<Vec<Option<i8>>, Vec<Option<i16>>> for ClickHouseArrowTransport {
101    fn convert(val: Vec<Option<i8>>) -> Vec<Option<i16>> {
102        val.into_iter().map(|opt| opt.map(|v| v as i16)).collect()
103    }
104}
105
106impl TypeConversion<Vec<Option<u8>>, Vec<Option<u16>>> for ClickHouseArrowTransport {
107    fn convert(val: Vec<Option<u8>>) -> Vec<Option<u16>> {
108        val.into_iter().map(|opt| opt.map(|v| v as u16)).collect()
109    }
110}
111
112impl TypeConversion<Option<Vec<Option<i8>>>, Option<Vec<Option<i16>>>>
113    for ClickHouseArrowTransport
114{
115    fn convert(val: Option<Vec<Option<i8>>>) -> Option<Vec<Option<i16>>> {
116        val.map(Self::convert)
117    }
118}
119
120impl TypeConversion<Option<Vec<Option<u8>>>, Option<Vec<Option<u16>>>>
121    for ClickHouseArrowTransport
122{
123    fn convert(val: Option<Vec<Option<u8>>>) -> Option<Vec<Option<u16>>> {
124        val.map(Self::convert)
125    }
126}
127
128impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for ClickHouseArrowTransport {
129    fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
130        DateTimeWrapperMicro(val)
131    }
132}
133
134impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for ClickHouseArrowTransport {
135    fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
136        NaiveTimeWrapperMicro(val)
137    }
138}