connectorx/sources/clickhouse/
mod.rs

1//! Source implementation for ClickHouse using the native protocol.
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::ClickHouseSourceError;
7pub use self::typesystem::{ClickHouseTypeSystem, TypeMetadata};
8
9use crate::{
10    data_order::DataOrder,
11    errors::ConnectorXError,
12    sources::{
13        clickhouse::typesystem::DataType, PartitionParser, Produce, Source, SourcePartition,
14    },
15    sql::{count_query, limit1_query, CXQuery},
16};
17use anyhow::anyhow;
18use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
19use chrono_tz::Tz;
20use clickhouse::Client;
21use fehler::{throw, throws};
22use rust_decimal::Decimal;
23use serde::Deserialize;
24use serde_json::Value as JsonValue;
25use sqlparser::dialect::{ClickHouseDialect, GenericDialect};
26use std::io::{Cursor, Read};
27use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
28use std::sync::Arc;
29use tokio::runtime::Runtime;
30use uuid::Uuid;
31
32/// ClickHouse source that uses the HTTP protocol.
33pub struct ClickHouseSource {
34    rt: Arc<Runtime>,
35    pub client: Client,
36    origin_query: Option<String>,
37    queries: Vec<CXQuery<String>>,
38    names: Vec<String>,
39    schema: Vec<ClickHouseTypeSystem>,
40    metadata: Vec<TypeMetadata>,
41}
42
43impl ClickHouseSource {
44    #[throws(ClickHouseSourceError)]
45    pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
46        let url = url::Url::parse(conn)?;
47
48        let use_https = url
49            .query_pairs()
50            .find(|(k, v)| k == "protocol" && v == "https")
51            .is_some();
52
53        let base_url = format!(
54            "{}://{}:{}",
55            if use_https { "https" } else { "http" },
56            url.host_str().unwrap_or("localhost"),
57            url.port().unwrap_or(8123)
58        );
59
60        let mut client = Client::default().with_url(&base_url);
61
62        let database = url.path().trim_start_matches('/');
63        if !database.is_empty() {
64            client = client.with_database(database);
65        }
66
67        let username = url.username();
68        if !username.is_empty() {
69            client = client.with_user(username);
70        }
71
72        let password = url.password().unwrap_or("");
73        if !password.is_empty() {
74            client = client.with_password(password);
75        }
76
77        Self {
78            rt,
79            client,
80            origin_query: None,
81            queries: vec![],
82            names: vec![],
83            schema: vec![],
84            metadata: vec![],
85        }
86    }
87}
88
89impl Source for ClickHouseSource
90where
91    ClickHouseSourcePartition:
92        SourcePartition<TypeSystem = ClickHouseTypeSystem, Error = ClickHouseSourceError>,
93{
94    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
95    type Partition = ClickHouseSourcePartition;
96    type TypeSystem = ClickHouseTypeSystem;
97    type Error = ClickHouseSourceError;
98
99    #[throws(ClickHouseSourceError)]
100    fn set_data_order(&mut self, data_order: DataOrder) {
101        if !matches!(data_order, DataOrder::RowMajor) {
102            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
103        }
104    }
105
106    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
107        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
108    }
109
110    fn set_origin_query(&mut self, query: Option<String>) {
111        self.origin_query = query;
112    }
113
114    #[throws(ClickHouseSourceError)]
115    fn fetch_metadata(&mut self) {
116        assert!(!self.queries.is_empty());
117
118        let first_query = &self.queries[0];
119        let l1query = limit1_query(first_query, &ClickHouseDialect {})?;
120
121        let describe_query = format!("DESCRIBE ({})", l1query.as_str());
122
123        let response = self.rt.block_on(async {
124            let mut cursor = self
125                .client
126                .query(&describe_query)
127                .fetch_bytes("JSONCompact")
128                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
129            let bytes = cursor
130                .collect()
131                .await
132                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
133            Ok::<_, ClickHouseSourceError>(bytes)
134        })?;
135
136        #[derive(Debug, Deserialize)]
137        struct DescribeResponse {
138            data: Vec<Vec<JsonValue>>,
139        }
140
141        let parsed: DescribeResponse = serde_json::from_slice(&response)
142            .map_err(|e| anyhow!("Failed to parse DESCRIBE response: {}", e))?;
143
144        let mut names = Vec::new();
145        let mut types = Vec::new();
146        let mut metadata = Vec::new();
147
148        for row in parsed.data {
149            if row.len() >= 2 {
150                let name = row[0].as_str().unwrap_or("").to_string();
151                let type_str = row[1].as_str().unwrap_or("String");
152                let (ts, meta) = ClickHouseTypeSystem::from_type_str_with_metadata(type_str);
153                names.push(name);
154                types.push(ts);
155                metadata.push(meta);
156            }
157        }
158
159        self.names = names;
160        self.schema = types;
161        self.metadata = metadata;
162    }
163
164    #[throws(ClickHouseSourceError)]
165    fn result_rows(&mut self) -> Option<usize> {
166        match &self.origin_query {
167            Some(q) => {
168                let cxq = CXQuery::Naked(q.clone());
169                let cquery = count_query(&cxq, &ClickHouseDialect {})?;
170
171                let response = self.rt.block_on(async {
172                    let mut cursor = self
173                        .client
174                        .query(cquery.as_str())
175                        .fetch_bytes("JSONCompact")
176                        .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
177                    let bytes = cursor
178                        .collect()
179                        .await
180                        .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
181                    Ok::<_, ClickHouseSourceError>(bytes)
182                })?;
183
184                #[derive(Debug, Deserialize)]
185                struct CountResponse {
186                    data: Vec<Vec<JsonValue>>,
187                }
188
189                let parsed: CountResponse = serde_json::from_slice(&response)
190                    .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
191
192                if let Some(row) = parsed.data.first() {
193                    if let Some(count_val) = row.first() {
194                        let count = match count_val {
195                            JsonValue::Number(n) => n.as_u64().unwrap_or(0),
196                            JsonValue::String(s) => s.parse().unwrap_or(0),
197                            _ => 0,
198                        };
199                        return Some(count as usize);
200                    }
201                }
202                None
203            }
204            None => None,
205        }
206    }
207
208    fn names(&self) -> Vec<String> {
209        self.names.clone()
210    }
211
212    fn schema(&self) -> Vec<Self::TypeSystem> {
213        self.schema.clone()
214    }
215
216    #[throws(ClickHouseSourceError)]
217    fn partition(self) -> Vec<Self::Partition> {
218        let mut ret = vec![];
219        for query in self.queries {
220            ret.push(ClickHouseSourcePartition::new(
221                self.rt.clone(),
222                self.client.clone(),
223                &query,
224                &self.schema,
225                &self.metadata,
226            ));
227        }
228        ret
229    }
230}
231
232pub struct ClickHouseSourcePartition {
233    rt: Arc<Runtime>,
234    client: Client,
235    query: CXQuery<String>,
236    schema: Vec<ClickHouseTypeSystem>,
237    metadata: Vec<TypeMetadata>,
238    nrows: usize,
239    ncols: usize,
240}
241
242impl ClickHouseSourcePartition {
243    pub fn new(
244        rt: Arc<Runtime>,
245        client: Client,
246        query: &CXQuery<String>,
247        schema: &[ClickHouseTypeSystem],
248        metadata: &[TypeMetadata],
249    ) -> Self {
250        Self {
251            rt,
252            client,
253            query: query.clone(),
254            schema: schema.to_vec(),
255            metadata: metadata.to_vec(),
256            nrows: 0,
257            ncols: schema.len(),
258        }
259    }
260}
261
262impl SourcePartition for ClickHouseSourcePartition {
263    type TypeSystem = ClickHouseTypeSystem;
264    type Parser<'a> = ClickHouseSourceParser<'a>;
265    type Error = ClickHouseSourceError;
266
267    #[throws(ClickHouseSourceError)]
268    fn result_rows(&mut self) {
269        let cquery = count_query(&self.query, &GenericDialect {})?;
270
271        let response = self.rt.block_on(async {
272            let mut cursor = self
273                .client
274                .query(cquery.as_str())
275                .fetch_bytes("JSONCompact")
276                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
277            let bytes = cursor
278                .collect()
279                .await
280                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
281            Ok::<_, ClickHouseSourceError>(bytes)
282        })?;
283
284        #[derive(Debug, Deserialize)]
285        struct CountResponse {
286            data: Vec<Vec<JsonValue>>,
287        }
288
289        let parsed: CountResponse = serde_json::from_slice(&response)
290            .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
291
292        if let Some(row) = parsed.data.first() {
293            if let Some(count_val) = row.first() {
294                let count = match count_val {
295                    JsonValue::Number(n) => n.as_u64().unwrap_or(0),
296                    JsonValue::String(s) => s.parse().unwrap_or(0),
297                    _ => 0,
298                };
299                self.nrows = count as usize;
300            }
301        }
302    }
303
304    #[throws(ClickHouseSourceError)]
305    fn parser(&mut self) -> Self::Parser<'_> {
306        ClickHouseSourceParser::new(
307            self.rt.clone(),
308            self.client.clone(),
309            self.query.clone(),
310            &self.schema,
311            &self.metadata,
312        )?
313    }
314
315    fn nrows(&self) -> usize {
316        self.nrows
317    }
318
319    fn ncols(&self) -> usize {
320        self.ncols
321    }
322}
323
324struct BinaryReader<'a> {
325    cursor: Cursor<&'a [u8]>,
326}
327
328impl<'a> BinaryReader<'a> {
329    fn new(data: &'a [u8]) -> Self {
330        Self {
331            cursor: Cursor::new(data),
332        }
333    }
334
335    fn read_bytes<const N: usize>(&mut self) -> Result<[u8; N], ClickHouseSourceError> {
336        let mut buf = [0u8; N];
337        self.cursor
338            .read_exact(&mut buf)
339            .map_err(|e| anyhow!("Failed to read {} bytes: {}", N, e))?;
340        Ok(buf)
341    }
342
343    fn read_u8(&mut self) -> Result<u8, ClickHouseSourceError> {
344        Ok(self.read_bytes::<1>()?[0])
345    }
346
347    fn read_i8(&mut self) -> Result<i8, ClickHouseSourceError> {
348        Ok(self.read_u8()? as i8)
349    }
350
351    fn read_u16(&mut self) -> Result<u16, ClickHouseSourceError> {
352        Ok(u16::from_le_bytes(self.read_bytes()?))
353    }
354
355    fn read_i16(&mut self) -> Result<i16, ClickHouseSourceError> {
356        Ok(i16::from_le_bytes(self.read_bytes()?))
357    }
358
359    fn read_u32(&mut self) -> Result<u32, ClickHouseSourceError> {
360        Ok(u32::from_le_bytes(self.read_bytes()?))
361    }
362
363    fn read_i32(&mut self) -> Result<i32, ClickHouseSourceError> {
364        Ok(i32::from_le_bytes(self.read_bytes()?))
365    }
366
367    fn read_u64(&mut self) -> Result<u64, ClickHouseSourceError> {
368        Ok(u64::from_le_bytes(self.read_bytes()?))
369    }
370
371    fn read_i64(&mut self) -> Result<i64, ClickHouseSourceError> {
372        Ok(i64::from_le_bytes(self.read_bytes()?))
373    }
374
375    fn read_f32(&mut self) -> Result<f32, ClickHouseSourceError> {
376        Ok(f32::from_le_bytes(self.read_bytes()?))
377    }
378
379    fn read_f64(&mut self) -> Result<f64, ClickHouseSourceError> {
380        Ok(f64::from_le_bytes(self.read_bytes()?))
381    }
382
383    fn read_varint(&mut self) -> Result<u64, ClickHouseSourceError> {
384        let mut result: u64 = 0;
385        let mut shift = 0;
386        loop {
387            let byte = self.read_u8()?;
388            result |= ((byte & 0x7f) as u64) << shift;
389            if byte & 0x80 == 0 {
390                break;
391            }
392            shift += 7;
393            if shift >= 64 {
394                return Err(anyhow!("Varint too long").into());
395            }
396        }
397        Ok(result)
398    }
399
400    fn read_fixed_string(&mut self, len: usize) -> Result<Vec<u8>, ClickHouseSourceError> {
401        let mut buf = vec![0u8; len];
402        self.cursor
403            .read_exact(&mut buf)
404            .map_err(|e| anyhow!("Failed to read FixedString: {}", e))?;
405        Ok(buf)
406    }
407
408    fn read_string(&mut self) -> Result<String, ClickHouseSourceError> {
409        let len = self.read_varint()? as usize;
410        let mut buf = vec![0u8; len];
411        self.cursor
412            .read_exact(&mut buf)
413            .map_err(|e| anyhow!("Failed to read string: {}", e))?;
414        String::from_utf8(buf).map_err(|e| anyhow!("Invalid UTF-8: {}", e).into())
415    }
416
417    fn read_uuid(&mut self) -> Result<Uuid, ClickHouseSourceError> {
418        // ClickHouse stores UUID as two UInt64 in big-endian order
419        let high = self.read_u64()?;
420        let low = self.read_u64()?;
421
422        Ok(Uuid::from_u64_pair(high, low))
423    }
424
425    fn read_bool(&mut self) -> Result<bool, ClickHouseSourceError> {
426        Ok(self.read_u8()? != 0)
427    }
428
429    fn read_date(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
430        // ClickHouse stores Date as UInt16 representing days since 1970-01-01
431        let days = self.read_u16()? as i64;
432        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
433        epoch
434            .checked_add_signed(Duration::days(days))
435            .ok_or_else(|| anyhow!("Invalid date value: {} days since epoch", days).into())
436    }
437
438    fn read_date32(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
439        // ClickHouse stores Date32 as Int32 representing days since 1970-01-01
440        // negative values represent dates before 1970-01-01
441        let days = self.read_i32()? as i64;
442        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
443        epoch
444            .checked_add_signed(Duration::days(days))
445            .ok_or_else(|| anyhow!("Invalid date32 value: {} days since epoch", days).into())
446    }
447
448    fn read_datetime(&mut self, tz: Option<&Tz>) -> Result<DateTime<Utc>, ClickHouseSourceError> {
449        let seconds = self.read_u32()? as i64;
450        chrono::DateTime::from_timestamp(seconds, 0)
451            .map(|dt| dt.with_timezone(tz.unwrap_or(&Tz::UTC)).to_utc())
452            .ok_or_else(|| {
453                anyhow!("Invalid datetime value: {} seconds since epoch", seconds).into()
454            })
455    }
456
457    fn read_datetime64(
458        &mut self,
459        precision: u8,
460        tz: Option<&Tz>,
461    ) -> Result<DateTime<Utc>, ClickHouseSourceError> {
462        if precision > 9 {
463            return Err(anyhow!("Unsupported DateTime64 precision: {}", precision).into());
464        }
465        let ticks = self.read_i64()?;
466        let nanos = ticks * 10_i64.pow(9 - precision as u32);
467
468        Ok(DateTime::from_timestamp_nanos(nanos)
469            .with_timezone(tz.unwrap_or(&Tz::UTC))
470            .to_utc())
471    }
472
473    fn read_decimal(&mut self, precision: u8, scale: u8) -> Result<Decimal, ClickHouseSourceError> {
474        match precision {
475            1..=9 => {
476                let value = self.read_i32()?;
477                Ok(Decimal::new(value as i64, scale as u32))
478            }
479            10..=18 => {
480                let value = self.read_i64()?;
481                Ok(Decimal::new(value, scale as u32))
482            }
483            _ => Err(anyhow!("Unsupported Decimal precision: {}", precision).into()),
484        }
485    }
486
487    fn read_time(&mut self) -> Result<NaiveTime, ClickHouseSourceError> {
488        let seconds = self.read_u32()? as i64;
489        Ok(
490            NaiveTime::from_num_seconds_from_midnight_opt(seconds as u32, 0)
491                .ok_or_else(|| anyhow!("Invalid time value: {} seconds since midnight", seconds))?,
492        )
493    }
494
495    fn read_time64(&mut self, precision: u8) -> Result<NaiveTime, ClickHouseSourceError> {
496        if precision > 9 {
497            return Err(anyhow!("Unsupported Time64 precision: {}", precision).into());
498        }
499        let ticks = self.read_i64()?;
500        let nanos = ticks * 10_i64.pow(9 - precision as u32);
501        Ok(NaiveTime::from_num_seconds_from_midnight_opt(
502            (nanos / 1_000_000_000) as u32,
503            (nanos % 1_000_000_000) as u32,
504        )
505        .ok_or_else(|| anyhow!("Invalid time64 value: {} ticks since midnight", ticks))?)
506    }
507
508    fn read_ipv4(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
509        let bytes = self.read_u32()?;
510
511        Ok(IpAddr::V4(Ipv4Addr::from_bits(bytes)))
512    }
513
514    fn read_ipv6(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
515        let seg1 = u16::from_be(self.read_u16()?);
516        let seg2 = u16::from_be(self.read_u16()?);
517        let seg3 = u16::from_be(self.read_u16()?);
518        let seg4 = u16::from_be(self.read_u16()?);
519        let seg5 = u16::from_be(self.read_u16()?);
520        let seg6 = u16::from_be(self.read_u16()?);
521        let seg7 = u16::from_be(self.read_u16()?);
522        let seg8 = u16::from_be(self.read_u16()?);
523
524        Ok(IpAddr::V6(Ipv6Addr::from_segments([
525            seg1, seg2, seg3, seg4, seg5, seg6, seg7, seg8,
526        ])))
527    }
528
529    fn read_enum8(&mut self) -> Result<i8, ClickHouseSourceError> {
530        self.read_i8()
531    }
532
533    fn read_enum16(&mut self) -> Result<i16, ClickHouseSourceError> {
534        self.read_i16()
535    }
536
537    fn read_array<T, F>(&mut self, read_elem: F) -> Result<Vec<Option<T>>, ClickHouseSourceError>
538    where
539        F: Fn(&mut Self) -> Result<T, ClickHouseSourceError>,
540    {
541        let len = self.read_varint()? as usize;
542        let mut result = Vec::with_capacity(len);
543        for _ in 0..len {
544            result.push(Some(read_elem(self)?));
545        }
546        Ok(result)
547    }
548
549    fn is_empty(&self) -> bool {
550        self.cursor.position() as usize >= self.cursor.get_ref().len()
551    }
552}
553
554pub struct ClickHouseSourceParser<'a> {
555    rt: Arc<Runtime>,
556    client: Client,
557    query: CXQuery<String>,
558    schema: Vec<ClickHouseTypeSystem>,
559    metadata: Vec<TypeMetadata>,
560    rowbuf: Vec<Vec<DataType>>,
561    ncols: usize,
562    current_row: usize,
563    current_col: usize,
564    is_finished: bool,
565    _phantom: std::marker::PhantomData<&'a ()>,
566}
567
568impl<'a> ClickHouseSourceParser<'a> {
569    #[throws(ClickHouseSourceError)]
570    pub fn new(
571        rt: Arc<Runtime>,
572        client: Client,
573        query: CXQuery<String>,
574        schema: &[ClickHouseTypeSystem],
575        metadata: &[TypeMetadata],
576    ) -> Self {
577        Self {
578            rt,
579            client,
580            query,
581            schema: schema.to_vec(),
582            metadata: metadata.to_vec(),
583            rowbuf: Vec::new(),
584            ncols: schema.len(),
585            current_row: 0,
586            current_col: 0,
587            is_finished: false,
588            _phantom: std::marker::PhantomData,
589        }
590    }
591
592    fn parse_row_binary(
593        &self,
594        reader: &mut BinaryReader,
595    ) -> Result<Vec<DataType>, ClickHouseSourceError> {
596        let mut row = Vec::with_capacity(self.ncols);
597
598        for (col_idx, col_type) in self.schema.iter().enumerate() {
599            let is_nullable = col_type.is_nullable();
600            let meta = &self.metadata[col_idx];
601
602            if is_nullable {
603                let null_flag = reader.read_u8()?;
604                if null_flag == 1 {
605                    row.push(DataType::Null);
606                    continue;
607                }
608            }
609
610            let value = match col_type {
611                ClickHouseTypeSystem::Int8(_) => DataType::Int8(reader.read_i8()?),
612                ClickHouseTypeSystem::Int16(_) => DataType::Int16(reader.read_i16()?),
613                ClickHouseTypeSystem::Int32(_) => DataType::Int32(reader.read_i32()?),
614                ClickHouseTypeSystem::Int64(_) => DataType::Int64(reader.read_i64()?),
615                ClickHouseTypeSystem::UInt8(_) => DataType::UInt8(reader.read_u8()?),
616                ClickHouseTypeSystem::UInt16(_) => DataType::UInt16(reader.read_u16()?),
617                ClickHouseTypeSystem::UInt32(_) => DataType::UInt32(reader.read_u32()?),
618                ClickHouseTypeSystem::UInt64(_) => DataType::UInt64(reader.read_u64()?),
619
620                ClickHouseTypeSystem::Float32(_) => DataType::Float32(reader.read_f32()?),
621                ClickHouseTypeSystem::Float64(_) => DataType::Float64(reader.read_f64()?),
622
623                ClickHouseTypeSystem::Decimal(_) => {
624                    DataType::Decimal(reader.read_decimal(meta.precision, meta.scale)?)
625                }
626
627                ClickHouseTypeSystem::String(_) => DataType::String(reader.read_string()?),
628
629                ClickHouseTypeSystem::FixedString(_) => {
630                    DataType::FixedString(reader.read_fixed_string(meta.length)?)
631                }
632
633                ClickHouseTypeSystem::Date(_) => DataType::Date(reader.read_date()?),
634                ClickHouseTypeSystem::Date32(_) => DataType::Date32(reader.read_date32()?),
635
636                ClickHouseTypeSystem::DateTime(_) => {
637                    DataType::DateTime(reader.read_datetime(meta.timezone.as_ref())?)
638                }
639                ClickHouseTypeSystem::DateTime64(_) => DataType::DateTime64(
640                    reader.read_datetime64(meta.precision, meta.timezone.as_ref())?,
641                ),
642
643                ClickHouseTypeSystem::Time(_) => DataType::Time(reader.read_time()?),
644                ClickHouseTypeSystem::Time64(_) => {
645                    DataType::Time64(reader.read_time64(meta.precision)?)
646                }
647
648                ClickHouseTypeSystem::Enum8(_) => {
649                    let enum_value = reader.read_enum8()?;
650                    let enum_str = meta
651                        .named_values
652                        .as_ref()
653                        .and_then(|h| h.get(&(enum_value as i16)));
654                    DataType::Enum8(enum_str.cloned().unwrap_or_default())
655                }
656                ClickHouseTypeSystem::Enum16(_) => {
657                    let enum_value = reader.read_enum16()?;
658                    let enum_str = meta.named_values.as_ref().and_then(|h| h.get(&enum_value));
659                    DataType::Enum16(enum_str.cloned().unwrap_or_default())
660                }
661
662                ClickHouseTypeSystem::UUID(_) => DataType::UUID(reader.read_uuid()?),
663
664                ClickHouseTypeSystem::IPv4(_) => DataType::IPv4(reader.read_ipv4()?),
665                ClickHouseTypeSystem::IPv6(_) => DataType::IPv6(reader.read_ipv6()?),
666
667                ClickHouseTypeSystem::Bool(_) => DataType::Bool(reader.read_bool()?),
668
669                ClickHouseTypeSystem::ArrayBool(_) => {
670                    DataType::ArrayBool(reader.read_array(BinaryReader::read_bool)?)
671                }
672                ClickHouseTypeSystem::ArrayString(_) => {
673                    DataType::ArrayString(reader.read_array(BinaryReader::read_string)?)
674                }
675                ClickHouseTypeSystem::ArrayInt8(_) => {
676                    DataType::ArrayInt8(reader.read_array(BinaryReader::read_i8)?)
677                }
678                ClickHouseTypeSystem::ArrayInt16(_) => {
679                    DataType::ArrayInt16(reader.read_array(BinaryReader::read_i16)?)
680                }
681                ClickHouseTypeSystem::ArrayInt32(_) => {
682                    DataType::ArrayInt32(reader.read_array(BinaryReader::read_i32)?)
683                }
684                ClickHouseTypeSystem::ArrayInt64(_) => {
685                    DataType::ArrayInt64(reader.read_array(BinaryReader::read_i64)?)
686                }
687                ClickHouseTypeSystem::ArrayUInt8(_) => {
688                    DataType::ArrayUInt8(reader.read_array(BinaryReader::read_u8)?)
689                }
690                ClickHouseTypeSystem::ArrayUInt16(_) => {
691                    DataType::ArrayUInt16(reader.read_array(BinaryReader::read_u16)?)
692                }
693                ClickHouseTypeSystem::ArrayUInt32(_) => {
694                    DataType::ArrayUInt32(reader.read_array(BinaryReader::read_u32)?)
695                }
696                ClickHouseTypeSystem::ArrayUInt64(_) => {
697                    DataType::ArrayUInt64(reader.read_array(BinaryReader::read_u64)?)
698                }
699                ClickHouseTypeSystem::ArrayFloat32(_) => {
700                    DataType::ArrayFloat32(reader.read_array(BinaryReader::read_f32)?)
701                }
702                ClickHouseTypeSystem::ArrayFloat64(_) => {
703                    DataType::ArrayFloat64(reader.read_array(BinaryReader::read_f64)?)
704                }
705                ClickHouseTypeSystem::ArrayDecimal(_) => {
706                    let precision = meta.precision;
707                    let scale = meta.scale;
708                    DataType::ArrayDecimal(reader.read_array(|r| r.read_decimal(precision, scale))?)
709                }
710            };
711
712            row.push(value);
713        }
714
715        Ok(row)
716    }
717
718    #[throws(ClickHouseSourceError)]
719    fn next_loc(&mut self) -> (usize, usize) {
720        let ret = (self.current_row, self.current_col);
721        self.current_row += (self.current_col + 1) / self.ncols;
722        self.current_col = (self.current_col + 1) % self.ncols;
723        ret
724    }
725}
726
727impl<'a> PartitionParser<'a> for ClickHouseSourceParser<'a> {
728    type TypeSystem = ClickHouseTypeSystem;
729    type Error = ClickHouseSourceError;
730
731    #[throws(ClickHouseSourceError)]
732    fn fetch_next(&mut self) -> (usize, bool) {
733        assert!(self.current_col == 0);
734
735        if self.is_finished {
736            return (0, true);
737        }
738
739        let response = self.rt.block_on(async {
740            let mut cursor = self
741                .client
742                .query(self.query.as_str())
743                .fetch_bytes("RowBinary")
744                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
745            let bytes = cursor
746                .collect()
747                .await
748                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
749            Ok::<_, ClickHouseSourceError>(bytes)
750        })?;
751        let mut reader = BinaryReader::new(&response);
752        let mut rows = Vec::new();
753
754        while !reader.is_empty() {
755            match self.parse_row_binary(&mut reader) {
756                Ok(row) => rows.push(row),
757                Err(_) => break,
758            }
759        }
760
761        self.rowbuf = rows;
762        self.current_row = 0;
763        self.is_finished = true;
764
765        (self.rowbuf.len(), true)
766    }
767}
768
769macro_rules! impl_produce {
770    ($rust_type:ty, [$($variant:ident),+]) => {
771        impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
772            type Error = ClickHouseSourceError;
773
774            #[throws(ClickHouseSourceError)]
775            fn produce(&'r mut self) -> $rust_type {
776                let (ridx, cidx) = self.next_loc()?;
777                let value = &self.rowbuf[ridx][cidx];
778
779                match value {
780                    $(DataType::$variant(v) => *v as $rust_type,)+
781                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
782                        format!("{:?}", value)
783                    ))),
784                }
785            }
786        }
787
788        impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
789            type Error = ClickHouseSourceError;
790
791            #[throws(ClickHouseSourceError)]
792            fn produce(&'r mut self) -> Option<$rust_type> {
793                let (ridx, cidx) = self.next_loc()?;
794                let value = &self.rowbuf[ridx][cidx];
795
796                match value {
797                    DataType::Null => None,
798                    $(DataType::$variant(v) => Some(*v as $rust_type),)+
799                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
800                        format!("{:?}", value)
801                    ))),
802                }
803            }
804        }
805    };
806}
807
808macro_rules! impl_produce_with_clone {
809    ($rust_type:ty, [$($variant:ident),+]) => {
810        impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
811            type Error = ClickHouseSourceError;
812
813            #[throws(ClickHouseSourceError)]
814            fn produce(&'r mut self) -> $rust_type {
815                let (ridx, cidx) = self.next_loc()?;
816                let value = &self.rowbuf[ridx][cidx];
817
818                match value {
819                    $(DataType::$variant(v) => v.clone() as $rust_type,)+
820                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
821                        format!("{:?}", value)
822                    ))),
823                }
824            }
825        }
826
827        impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
828            type Error = ClickHouseSourceError;
829
830            #[throws(ClickHouseSourceError)]
831            fn produce(&'r mut self) -> Option<$rust_type> {
832                let (ridx, cidx) = self.next_loc()?;
833                let value = &self.rowbuf[ridx][cidx];
834
835                match value {
836                    DataType::Null => None,
837                    $(DataType::$variant(v) => Some(v.clone() as $rust_type),)+
838                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
839                        format!("{:?}", value)
840                    ))),
841                }
842            }
843        }
844    };
845}
846
847macro_rules! impl_produce_vec {
848    ($rust_type:ty, [$($variant:ident),+]) => {
849        impl<'r, 'a> Produce<'r, Vec<Option<$rust_type>>> for ClickHouseSourceParser<'a> {
850            type Error = ClickHouseSourceError;
851
852            #[throws(ClickHouseSourceError)]
853            fn produce(&'r mut self) -> Vec<Option<$rust_type>> {
854                let (ridx, cidx) = self.next_loc()?;
855                let value = &self.rowbuf[ridx][cidx];
856
857                match value {
858                    $(DataType::$variant(v) => v.clone(),)+
859                    _ => throw!(ConnectorXError::cannot_produce::<Vec<Option<$rust_type>>>(Some(
860                        format!("{:?}", value)
861                    ))),
862                }
863            }
864        }
865
866        impl<'r, 'a> Produce<'r, Option<Vec<Option<$rust_type>>>> for ClickHouseSourceParser<'a> {
867            type Error = ClickHouseSourceError;
868
869            #[throws(ClickHouseSourceError)]
870            fn produce(&'r mut self) -> Option<Vec<Option<$rust_type>>> {
871                let (ridx, cidx) = self.next_loc()?;
872                let value = &self.rowbuf[ridx][cidx];
873
874                match value {
875                    DataType::Null => None,
876                    $(DataType::$variant(v) => Some(v.clone()),)+
877                    _ => throw!(ConnectorXError::cannot_produce::<Option<Vec<Option<$rust_type>>>>(Some(
878                        format!("{:?}", value)
879                    ))),
880                }
881            }
882        }
883    };
884}
885
886impl_produce!(i8, [Int8]);
887impl_produce!(i16, [Int16]);
888impl_produce!(i32, [Int32]);
889impl_produce!(i64, [Int64]);
890impl_produce!(u8, [UInt8]);
891impl_produce!(u16, [UInt16]);
892impl_produce!(u32, [UInt32]);
893impl_produce!(u64, [UInt64]);
894impl_produce!(f32, [Float32]);
895impl_produce!(f64, [Float64]);
896impl_produce!(Decimal, [Decimal]);
897impl_produce_with_clone!(String, [String, Enum8, Enum16]);
898impl_produce_with_clone!(Vec<u8>, [FixedString]);
899impl_produce!(NaiveDate, [Date, Date32]);
900impl_produce!(DateTime<Utc>, [DateTime, DateTime64]);
901impl_produce!(NaiveTime, [Time, Time64]);
902impl_produce!(Uuid, [UUID]);
903impl_produce!(IpAddr, [IPv4, IPv6]);
904impl_produce!(bool, [Bool]);
905impl_produce_vec!(bool, [ArrayBool]);
906impl_produce_vec!(String, [ArrayString]);
907impl_produce_vec!(i8, [ArrayInt8]);
908impl_produce_vec!(i16, [ArrayInt16]);
909impl_produce_vec!(i32, [ArrayInt32]);
910impl_produce_vec!(i64, [ArrayInt64]);
911impl_produce_vec!(u8, [ArrayUInt8]);
912impl_produce_vec!(u16, [ArrayUInt16]);
913impl_produce_vec!(u32, [ArrayUInt32]);
914impl_produce_vec!(u64, [ArrayUInt64]);
915impl_produce_vec!(f32, [ArrayFloat32]);
916impl_produce_vec!(f64, [ArrayFloat64]);
917impl_produce_vec!(Decimal, [ArrayDecimal]);