Skip to main content

connectorx/sources/clickhouse/
mod.rs

1//! Source implementation for ClickHouse using the native protocol.
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::ClickHouseSourceError;
7pub use self::typesystem::{ClickHouseTypeSystem, TypeMetadata};
8
9use crate::{
10    data_order::DataOrder,
11    errors::ConnectorXError,
12    sources::{
13        clickhouse::typesystem::DataType, PartitionParser, Produce, Source, SourcePartition,
14    },
15    sql::{count_query, limit1_query, CXQuery},
16};
17use anyhow::anyhow;
18use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
19use chrono_tz::Tz;
20use clickhouse::Client;
21use fehler::{throw, throws};
22use rust_decimal::Decimal;
23use serde::Deserialize;
24use serde_json::Value as JsonValue;
25use sqlparser::dialect::{ClickHouseDialect, GenericDialect};
26use std::io::{Cursor, Read};
27use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
28use std::sync::Arc;
29use tokio::runtime::Runtime;
30use uuid::Uuid;
31
32/// ClickHouse source that uses the HTTP protocol.
33pub struct ClickHouseSource {
34    rt: Arc<Runtime>,
35    pub client: Client,
36    origin_query: Option<String>,
37    queries: Vec<CXQuery<String>>,
38    names: Vec<String>,
39    schema: Vec<ClickHouseTypeSystem>,
40    metadata: Vec<TypeMetadata>,
41}
42
43impl ClickHouseSource {
44    #[throws(ClickHouseSourceError)]
45    pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
46        let url = url::Url::parse(conn)?;
47
48        let use_https = url
49            .query_pairs()
50            .find(|(k, v)| k == "protocol" && v == "https")
51            .is_some();
52
53        let base_url = format!(
54            "{}://{}:{}",
55            if use_https { "https" } else { "http" },
56            url.host_str().unwrap_or("localhost"),
57            url.port().unwrap_or(8123)
58        );
59
60        let mut client = Client::default().with_url(&base_url);
61
62        let database = url.path().trim_start_matches('/');
63        if !database.is_empty() {
64            client = client.with_database(database);
65        }
66
67        let username = url.username();
68        if !username.is_empty() {
69            client = client.with_user(username);
70        }
71
72        let password = url.password().unwrap_or("");
73        if !password.is_empty() {
74            client = client.with_password(password);
75        }
76
77        Self {
78            rt,
79            client,
80            origin_query: None,
81            queries: vec![],
82            names: vec![],
83            schema: vec![],
84            metadata: vec![],
85        }
86    }
87}
88
89impl Source for ClickHouseSource
90where
91    ClickHouseSourcePartition:
92        SourcePartition<TypeSystem = ClickHouseTypeSystem, Error = ClickHouseSourceError>,
93{
94    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
95    type Partition = ClickHouseSourcePartition;
96    type TypeSystem = ClickHouseTypeSystem;
97    type Error = ClickHouseSourceError;
98
99    #[throws(ClickHouseSourceError)]
100    fn set_data_order(&mut self, data_order: DataOrder) {
101        if !matches!(data_order, DataOrder::RowMajor) {
102            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
103        }
104    }
105
106    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
107        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
108    }
109
110    fn set_origin_query(&mut self, query: Option<String>) {
111        self.origin_query = query;
112    }
113
114    #[throws(ClickHouseSourceError)]
115    fn fetch_metadata(&mut self) {
116        assert!(!self.queries.is_empty());
117
118        let first_query = &self.queries[0];
119        let l1query = limit1_query(first_query, &ClickHouseDialect {})?;
120
121        let describe_query = format!("DESCRIBE ({})", l1query.as_str());
122
123        let response = self.rt.block_on(async {
124            let mut cursor = self
125                .client
126                .query(&describe_query)
127                .fetch_bytes("JSONCompact")
128                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
129            let bytes = cursor
130                .collect()
131                .await
132                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
133            Ok::<_, ClickHouseSourceError>(bytes)
134        })?;
135
136        #[derive(Debug, Deserialize)]
137        struct DescribeResponse {
138            data: Vec<Vec<JsonValue>>,
139        }
140
141        let parsed: DescribeResponse = serde_json::from_slice(&response)
142            .map_err(|e| anyhow!("Failed to parse DESCRIBE response: {}", e))?;
143
144        let mut names = Vec::new();
145        let mut types = Vec::new();
146        let mut metadata = Vec::new();
147
148        for row in parsed.data {
149            if row.len() >= 2 {
150                let name = row[0].as_str().unwrap_or("").to_string();
151                let type_str = row[1].as_str().unwrap_or("String");
152                let (ts, meta) = ClickHouseTypeSystem::from_type_str_with_metadata(type_str);
153                names.push(name);
154                types.push(ts);
155                metadata.push(meta);
156            }
157        }
158
159        self.names = names;
160        self.schema = types;
161        self.metadata = metadata;
162    }
163
164    #[throws(ClickHouseSourceError)]
165    fn result_rows(&mut self) -> Option<usize> {
166        match &self.origin_query {
167            Some(q) => {
168                let cxq = CXQuery::Naked(q.clone());
169                let cquery = count_query(&cxq, &ClickHouseDialect {})?;
170
171                let response = self.rt.block_on(async {
172                    let mut cursor = self
173                        .client
174                        .query(cquery.as_str())
175                        .fetch_bytes("JSONCompact")
176                        .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
177                    let bytes = cursor
178                        .collect()
179                        .await
180                        .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
181                    Ok::<_, ClickHouseSourceError>(bytes)
182                })?;
183
184                #[derive(Debug, Deserialize)]
185                struct CountResponse {
186                    data: Vec<Vec<JsonValue>>,
187                }
188
189                let parsed: CountResponse = serde_json::from_slice(&response)
190                    .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
191
192                if let Some(row) = parsed.data.first() {
193                    if let Some(count_val) = row.first() {
194                        let count = match count_val {
195                            JsonValue::Number(n) => n.as_u64().unwrap_or(0),
196                            JsonValue::String(s) => s.parse().unwrap_or(0),
197                            _ => 0,
198                        };
199                        return Some(count as usize);
200                    }
201                }
202                None
203            }
204            None => None,
205        }
206    }
207
208    fn names(&self) -> Vec<String> {
209        self.names.clone()
210    }
211
212    fn schema(&self) -> Vec<Self::TypeSystem> {
213        self.schema.clone()
214    }
215
216    #[throws(ClickHouseSourceError)]
217    fn partition(self) -> Vec<Self::Partition> {
218        let mut ret = vec![];
219        for query in self.queries {
220            ret.push(ClickHouseSourcePartition::new(
221                self.rt.clone(),
222                self.client.clone(),
223                &query,
224                &self.schema,
225                &self.metadata,
226            ));
227        }
228        ret
229    }
230}
231
232pub struct ClickHouseSourcePartition {
233    rt: Arc<Runtime>,
234    client: Client,
235    query: CXQuery<String>,
236    schema: Vec<ClickHouseTypeSystem>,
237    metadata: Vec<TypeMetadata>,
238    nrows: usize,
239    ncols: usize,
240}
241
242impl ClickHouseSourcePartition {
243    pub fn new(
244        rt: Arc<Runtime>,
245        client: Client,
246        query: &CXQuery<String>,
247        schema: &[ClickHouseTypeSystem],
248        metadata: &[TypeMetadata],
249    ) -> Self {
250        Self {
251            rt,
252            client,
253            query: query.clone(),
254            schema: schema.to_vec(),
255            metadata: metadata.to_vec(),
256            nrows: 0,
257            ncols: schema.len(),
258        }
259    }
260}
261
262impl SourcePartition for ClickHouseSourcePartition {
263    type TypeSystem = ClickHouseTypeSystem;
264    type Parser<'a> = ClickHouseSourceParser<'a>;
265    type Error = ClickHouseSourceError;
266
267    #[throws(ClickHouseSourceError)]
268    fn result_rows(&mut self) {
269        let cquery = count_query(&self.query, &GenericDialect {})?;
270
271        let response = self.rt.block_on(async {
272            let mut cursor = self
273                .client
274                .query(cquery.as_str())
275                .fetch_bytes("JSONCompact")
276                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
277            let bytes = cursor
278                .collect()
279                .await
280                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
281            Ok::<_, ClickHouseSourceError>(bytes)
282        })?;
283
284        #[derive(Debug, Deserialize)]
285        struct CountResponse {
286            data: Vec<Vec<JsonValue>>,
287        }
288
289        let parsed: CountResponse = serde_json::from_slice(&response)
290            .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
291
292        if let Some(row) = parsed.data.first() {
293            if let Some(count_val) = row.first() {
294                let count = match count_val {
295                    JsonValue::Number(n) => n.as_u64().unwrap_or(0),
296                    JsonValue::String(s) => s.parse().unwrap_or(0),
297                    _ => 0,
298                };
299                self.nrows = count as usize;
300            }
301        }
302    }
303
304    #[throws(ClickHouseSourceError)]
305    fn parser(&mut self) -> Self::Parser<'_> {
306        ClickHouseSourceParser::new(
307            self.rt.clone(),
308            self.client.clone(),
309            self.query.clone(),
310            &self.schema,
311            &self.metadata,
312        )?
313    }
314
315    fn nrows(&self) -> usize {
316        self.nrows
317    }
318
319    fn ncols(&self) -> usize {
320        self.ncols
321    }
322}
323
324struct BinaryReader<'a> {
325    cursor: Cursor<&'a [u8]>,
326}
327
328impl<'a> BinaryReader<'a> {
329    fn new(data: &'a [u8]) -> Self {
330        Self {
331            cursor: Cursor::new(data),
332        }
333    }
334
335    fn read_bytes<const N: usize>(&mut self) -> Result<[u8; N], ClickHouseSourceError> {
336        let mut buf = [0u8; N];
337        self.cursor
338            .read_exact(&mut buf)
339            .map_err(|e| anyhow!("Failed to read {} bytes: {}", N, e))?;
340        Ok(buf)
341    }
342
343    fn read_u8(&mut self) -> Result<u8, ClickHouseSourceError> {
344        Ok(self.read_bytes::<1>()?[0])
345    }
346
347    fn read_i8(&mut self) -> Result<i8, ClickHouseSourceError> {
348        Ok(self.read_u8()? as i8)
349    }
350
351    fn read_u16(&mut self) -> Result<u16, ClickHouseSourceError> {
352        Ok(u16::from_le_bytes(self.read_bytes()?))
353    }
354
355    fn read_i16(&mut self) -> Result<i16, ClickHouseSourceError> {
356        Ok(i16::from_le_bytes(self.read_bytes()?))
357    }
358
359    fn read_u32(&mut self) -> Result<u32, ClickHouseSourceError> {
360        Ok(u32::from_le_bytes(self.read_bytes()?))
361    }
362
363    fn read_i32(&mut self) -> Result<i32, ClickHouseSourceError> {
364        Ok(i32::from_le_bytes(self.read_bytes()?))
365    }
366
367    fn read_u64(&mut self) -> Result<u64, ClickHouseSourceError> {
368        Ok(u64::from_le_bytes(self.read_bytes()?))
369    }
370
371    fn read_i64(&mut self) -> Result<i64, ClickHouseSourceError> {
372        Ok(i64::from_le_bytes(self.read_bytes()?))
373    }
374
375    fn read_f32(&mut self) -> Result<f32, ClickHouseSourceError> {
376        Ok(f32::from_le_bytes(self.read_bytes()?))
377    }
378
379    fn read_f64(&mut self) -> Result<f64, ClickHouseSourceError> {
380        Ok(f64::from_le_bytes(self.read_bytes()?))
381    }
382
383    fn read_varint(&mut self) -> Result<u64, ClickHouseSourceError> {
384        let mut result: u64 = 0;
385        let mut shift = 0;
386        loop {
387            let byte = self.read_u8()?;
388            result |= ((byte & 0x7f) as u64) << shift;
389            if byte & 0x80 == 0 {
390                break;
391            }
392            shift += 7;
393            if shift >= 64 {
394                return Err(anyhow!("Varint too long").into());
395            }
396        }
397        Ok(result)
398    }
399
400    fn read_fixed_string(&mut self, len: usize) -> Result<Vec<u8>, ClickHouseSourceError> {
401        let mut buf = vec![0u8; len];
402        self.cursor
403            .read_exact(&mut buf)
404            .map_err(|e| anyhow!("Failed to read FixedString: {}", e))?;
405        Ok(buf)
406    }
407
408    fn read_string(&mut self) -> Result<String, ClickHouseSourceError> {
409        let len = self.read_varint()? as usize;
410        let mut buf = vec![0u8; len];
411        self.cursor
412            .read_exact(&mut buf)
413            .map_err(|e| anyhow!("Failed to read string: {}", e))?;
414        String::from_utf8(buf).map_err(|e| anyhow!("Invalid UTF-8: {}", e).into())
415    }
416
417    fn read_uuid(&mut self) -> Result<Uuid, ClickHouseSourceError> {
418        // ClickHouse stores UUID as two UInt64 in big-endian order
419        let high = self.read_u64()?;
420        let low = self.read_u64()?;
421
422        Ok(Uuid::from_u64_pair(high, low))
423    }
424
425    fn read_bool(&mut self) -> Result<bool, ClickHouseSourceError> {
426        Ok(self.read_u8()? != 0)
427    }
428
429    fn read_date(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
430        // ClickHouse stores Date as UInt16 representing days since 1970-01-01
431        let days = self.read_u16()? as i64;
432        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
433        epoch
434            .checked_add_signed(Duration::days(days))
435            .ok_or_else(|| anyhow!("Invalid date value: {} days since epoch", days).into())
436    }
437
438    fn read_date32(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
439        // ClickHouse stores Date32 as Int32 representing days since 1970-01-01
440        // negative values represent dates before 1970-01-01
441        let days = self.read_i32()? as i64;
442        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
443        epoch
444            .checked_add_signed(Duration::days(days))
445            .ok_or_else(|| anyhow!("Invalid date32 value: {} days since epoch", days).into())
446    }
447
448    fn read_datetime(&mut self, tz: Option<&Tz>) -> Result<DateTime<Utc>, ClickHouseSourceError> {
449        let seconds = self.read_u32()? as i64;
450        chrono::DateTime::from_timestamp(seconds, 0)
451            .map(|dt| dt.with_timezone(tz.unwrap_or(&Tz::UTC)).to_utc())
452            .ok_or_else(|| {
453                anyhow!("Invalid datetime value: {} seconds since epoch", seconds).into()
454            })
455    }
456
457    fn read_datetime64(
458        &mut self,
459        precision: u8,
460        tz: Option<&Tz>,
461    ) -> Result<DateTime<Utc>, ClickHouseSourceError> {
462        if precision > 9 {
463            return Err(anyhow!("Unsupported DateTime64 precision: {}", precision).into());
464        }
465        let ticks = self.read_i64()?;
466        let scale = 10_i64.pow(precision as u32);
467        let seconds = ticks.div_euclid(scale);
468        let fractional_ticks = ticks.rem_euclid(scale) as u32;
469        let nanos = fractional_ticks * 10_u32.pow(9 - precision as u32);
470
471        DateTime::from_timestamp(seconds, nanos)
472            .map(|dt| dt.with_timezone(tz.unwrap_or(&Tz::UTC)).to_utc())
473            .ok_or_else(|| anyhow!("Invalid datetime64 value: {} ticks", ticks).into())
474    }
475
476    fn read_decimal(&mut self, precision: u8, scale: u8) -> Result<Decimal, ClickHouseSourceError> {
477        match precision {
478            1..=9 => {
479                let value = self.read_i32()?;
480                Ok(Decimal::new(value as i64, scale as u32))
481            }
482            10..=18 => {
483                let value = self.read_i64()?;
484                Ok(Decimal::new(value, scale as u32))
485            }
486            _ => Err(anyhow!("Unsupported Decimal precision: {}", precision).into()),
487        }
488    }
489
490    fn read_time(&mut self) -> Result<NaiveTime, ClickHouseSourceError> {
491        let seconds = self.read_u32()? as i64;
492        Ok(
493            NaiveTime::from_num_seconds_from_midnight_opt(seconds as u32, 0)
494                .ok_or_else(|| anyhow!("Invalid time value: {} seconds since midnight", seconds))?,
495        )
496    }
497
498    fn read_time64(&mut self, precision: u8) -> Result<NaiveTime, ClickHouseSourceError> {
499        if precision > 9 {
500            return Err(anyhow!("Unsupported Time64 precision: {}", precision).into());
501        }
502        let ticks = self.read_i64()?;
503        let nanos = ticks * 10_i64.pow(9 - precision as u32);
504        Ok(NaiveTime::from_num_seconds_from_midnight_opt(
505            (nanos / 1_000_000_000) as u32,
506            (nanos % 1_000_000_000) as u32,
507        )
508        .ok_or_else(|| anyhow!("Invalid time64 value: {} ticks since midnight", ticks))?)
509    }
510
511    fn read_ipv4(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
512        let bytes = self.read_u32()?;
513
514        Ok(IpAddr::V4(Ipv4Addr::from_bits(bytes)))
515    }
516
517    fn read_ipv6(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
518        let seg1 = u16::from_be(self.read_u16()?);
519        let seg2 = u16::from_be(self.read_u16()?);
520        let seg3 = u16::from_be(self.read_u16()?);
521        let seg4 = u16::from_be(self.read_u16()?);
522        let seg5 = u16::from_be(self.read_u16()?);
523        let seg6 = u16::from_be(self.read_u16()?);
524        let seg7 = u16::from_be(self.read_u16()?);
525        let seg8 = u16::from_be(self.read_u16()?);
526
527        Ok(IpAddr::V6(Ipv6Addr::from_segments([
528            seg1, seg2, seg3, seg4, seg5, seg6, seg7, seg8,
529        ])))
530    }
531
532    fn read_enum8(&mut self) -> Result<i8, ClickHouseSourceError> {
533        self.read_i8()
534    }
535
536    fn read_enum16(&mut self) -> Result<i16, ClickHouseSourceError> {
537        self.read_i16()
538    }
539
540    fn read_array<T, F>(&mut self, read_elem: F) -> Result<Vec<Option<T>>, ClickHouseSourceError>
541    where
542        F: Fn(&mut Self) -> Result<T, ClickHouseSourceError>,
543    {
544        let len = self.read_varint()? as usize;
545        let mut result = Vec::with_capacity(len);
546        for _ in 0..len {
547            result.push(Some(read_elem(self)?));
548        }
549        Ok(result)
550    }
551
552    fn is_empty(&self) -> bool {
553        self.cursor.position() as usize >= self.cursor.get_ref().len()
554    }
555}
556
557pub struct ClickHouseSourceParser<'a> {
558    rt: Arc<Runtime>,
559    client: Client,
560    query: CXQuery<String>,
561    schema: Vec<ClickHouseTypeSystem>,
562    metadata: Vec<TypeMetadata>,
563    rowbuf: Vec<Vec<DataType>>,
564    ncols: usize,
565    current_row: usize,
566    current_col: usize,
567    is_finished: bool,
568    _phantom: std::marker::PhantomData<&'a ()>,
569}
570
571impl<'a> ClickHouseSourceParser<'a> {
572    #[throws(ClickHouseSourceError)]
573    pub fn new(
574        rt: Arc<Runtime>,
575        client: Client,
576        query: CXQuery<String>,
577        schema: &[ClickHouseTypeSystem],
578        metadata: &[TypeMetadata],
579    ) -> Self {
580        Self {
581            rt,
582            client,
583            query,
584            schema: schema.to_vec(),
585            metadata: metadata.to_vec(),
586            rowbuf: Vec::new(),
587            ncols: schema.len(),
588            current_row: 0,
589            current_col: 0,
590            is_finished: false,
591            _phantom: std::marker::PhantomData,
592        }
593    }
594
595    fn parse_row_binary(
596        &self,
597        reader: &mut BinaryReader,
598    ) -> Result<Vec<DataType>, ClickHouseSourceError> {
599        let mut row = Vec::with_capacity(self.ncols);
600
601        for (col_idx, col_type) in self.schema.iter().enumerate() {
602            let is_nullable = col_type.is_nullable();
603            let meta = &self.metadata[col_idx];
604
605            if is_nullable {
606                let null_flag = reader.read_u8()?;
607                if null_flag == 1 {
608                    row.push(DataType::Null);
609                    continue;
610                }
611            }
612
613            let value = match col_type {
614                ClickHouseTypeSystem::Int8(_) => DataType::Int8(reader.read_i8()?),
615                ClickHouseTypeSystem::Int16(_) => DataType::Int16(reader.read_i16()?),
616                ClickHouseTypeSystem::Int32(_) => DataType::Int32(reader.read_i32()?),
617                ClickHouseTypeSystem::Int64(_) => DataType::Int64(reader.read_i64()?),
618                ClickHouseTypeSystem::UInt8(_) => DataType::UInt8(reader.read_u8()?),
619                ClickHouseTypeSystem::UInt16(_) => DataType::UInt16(reader.read_u16()?),
620                ClickHouseTypeSystem::UInt32(_) => DataType::UInt32(reader.read_u32()?),
621                ClickHouseTypeSystem::UInt64(_) => DataType::UInt64(reader.read_u64()?),
622
623                ClickHouseTypeSystem::Float32(_) => DataType::Float32(reader.read_f32()?),
624                ClickHouseTypeSystem::Float64(_) => DataType::Float64(reader.read_f64()?),
625
626                ClickHouseTypeSystem::Decimal(_) => {
627                    DataType::Decimal(reader.read_decimal(meta.precision, meta.scale)?)
628                }
629
630                ClickHouseTypeSystem::String(_) => DataType::String(reader.read_string()?),
631
632                ClickHouseTypeSystem::FixedString(_) => {
633                    DataType::FixedString(reader.read_fixed_string(meta.length)?)
634                }
635
636                ClickHouseTypeSystem::Date(_) => DataType::Date(reader.read_date()?),
637                ClickHouseTypeSystem::Date32(_) => DataType::Date32(reader.read_date32()?),
638
639                ClickHouseTypeSystem::DateTime(_) => {
640                    DataType::DateTime(reader.read_datetime(meta.timezone.as_ref())?)
641                }
642                ClickHouseTypeSystem::DateTime64(_) => DataType::DateTime64(
643                    reader.read_datetime64(meta.precision, meta.timezone.as_ref())?,
644                ),
645
646                ClickHouseTypeSystem::Time(_) => DataType::Time(reader.read_time()?),
647                ClickHouseTypeSystem::Time64(_) => {
648                    DataType::Time64(reader.read_time64(meta.precision)?)
649                }
650
651                ClickHouseTypeSystem::Enum8(_) => {
652                    let enum_value = reader.read_enum8()?;
653                    let enum_str = meta
654                        .named_values
655                        .as_ref()
656                        .and_then(|h| h.get(&(enum_value as i16)));
657                    DataType::Enum8(enum_str.cloned().unwrap_or_default())
658                }
659                ClickHouseTypeSystem::Enum16(_) => {
660                    let enum_value = reader.read_enum16()?;
661                    let enum_str = meta.named_values.as_ref().and_then(|h| h.get(&enum_value));
662                    DataType::Enum16(enum_str.cloned().unwrap_or_default())
663                }
664
665                ClickHouseTypeSystem::UUID(_) => DataType::UUID(reader.read_uuid()?),
666
667                ClickHouseTypeSystem::IPv4(_) => DataType::IPv4(reader.read_ipv4()?),
668                ClickHouseTypeSystem::IPv6(_) => DataType::IPv6(reader.read_ipv6()?),
669
670                ClickHouseTypeSystem::Bool(_) => DataType::Bool(reader.read_bool()?),
671
672                ClickHouseTypeSystem::ArrayBool(_) => {
673                    DataType::ArrayBool(reader.read_array(BinaryReader::read_bool)?)
674                }
675                ClickHouseTypeSystem::ArrayString(_) => {
676                    DataType::ArrayString(reader.read_array(BinaryReader::read_string)?)
677                }
678                ClickHouseTypeSystem::ArrayInt8(_) => {
679                    DataType::ArrayInt8(reader.read_array(BinaryReader::read_i8)?)
680                }
681                ClickHouseTypeSystem::ArrayInt16(_) => {
682                    DataType::ArrayInt16(reader.read_array(BinaryReader::read_i16)?)
683                }
684                ClickHouseTypeSystem::ArrayInt32(_) => {
685                    DataType::ArrayInt32(reader.read_array(BinaryReader::read_i32)?)
686                }
687                ClickHouseTypeSystem::ArrayInt64(_) => {
688                    DataType::ArrayInt64(reader.read_array(BinaryReader::read_i64)?)
689                }
690                ClickHouseTypeSystem::ArrayUInt8(_) => {
691                    DataType::ArrayUInt8(reader.read_array(BinaryReader::read_u8)?)
692                }
693                ClickHouseTypeSystem::ArrayUInt16(_) => {
694                    DataType::ArrayUInt16(reader.read_array(BinaryReader::read_u16)?)
695                }
696                ClickHouseTypeSystem::ArrayUInt32(_) => {
697                    DataType::ArrayUInt32(reader.read_array(BinaryReader::read_u32)?)
698                }
699                ClickHouseTypeSystem::ArrayUInt64(_) => {
700                    DataType::ArrayUInt64(reader.read_array(BinaryReader::read_u64)?)
701                }
702                ClickHouseTypeSystem::ArrayFloat32(_) => {
703                    DataType::ArrayFloat32(reader.read_array(BinaryReader::read_f32)?)
704                }
705                ClickHouseTypeSystem::ArrayFloat64(_) => {
706                    DataType::ArrayFloat64(reader.read_array(BinaryReader::read_f64)?)
707                }
708                ClickHouseTypeSystem::ArrayDecimal(_) => {
709                    let precision = meta.precision;
710                    let scale = meta.scale;
711                    DataType::ArrayDecimal(reader.read_array(|r| r.read_decimal(precision, scale))?)
712                }
713            };
714
715            row.push(value);
716        }
717
718        Ok(row)
719    }
720
721    #[throws(ClickHouseSourceError)]
722    fn next_loc(&mut self) -> (usize, usize) {
723        let ret = (self.current_row, self.current_col);
724        self.current_row += (self.current_col + 1) / self.ncols;
725        self.current_col = (self.current_col + 1) % self.ncols;
726        ret
727    }
728}
729
730impl<'a> PartitionParser<'a> for ClickHouseSourceParser<'a> {
731    type TypeSystem = ClickHouseTypeSystem;
732    type Error = ClickHouseSourceError;
733
734    #[throws(ClickHouseSourceError)]
735    fn fetch_next(&mut self) -> (usize, bool) {
736        assert!(self.current_col == 0);
737
738        if self.is_finished {
739            return (0, true);
740        }
741
742        let response = self.rt.block_on(async {
743            let mut cursor = self
744                .client
745                .query(self.query.as_str())
746                .fetch_bytes("RowBinary")
747                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
748            let bytes = cursor
749                .collect()
750                .await
751                .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
752            Ok::<_, ClickHouseSourceError>(bytes)
753        })?;
754        let mut reader = BinaryReader::new(&response);
755        let mut rows = Vec::new();
756
757        while !reader.is_empty() {
758            match self.parse_row_binary(&mut reader) {
759                Ok(row) => rows.push(row),
760                Err(_) => break,
761            }
762        }
763
764        self.rowbuf = rows;
765        self.current_row = 0;
766        self.is_finished = true;
767
768        (self.rowbuf.len(), true)
769    }
770}
771
772macro_rules! impl_produce {
773    ($rust_type:ty, [$($variant:ident),+]) => {
774        impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
775            type Error = ClickHouseSourceError;
776
777            #[throws(ClickHouseSourceError)]
778            fn produce(&'r mut self) -> $rust_type {
779                let (ridx, cidx) = self.next_loc()?;
780                let value = &self.rowbuf[ridx][cidx];
781
782                match value {
783                    $(DataType::$variant(v) => *v as $rust_type,)+
784                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
785                        format!("{:?}", value)
786                    ))),
787                }
788            }
789        }
790
791        impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
792            type Error = ClickHouseSourceError;
793
794            #[throws(ClickHouseSourceError)]
795            fn produce(&'r mut self) -> Option<$rust_type> {
796                let (ridx, cidx) = self.next_loc()?;
797                let value = &self.rowbuf[ridx][cidx];
798
799                match value {
800                    DataType::Null => None,
801                    $(DataType::$variant(v) => Some(*v as $rust_type),)+
802                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
803                        format!("{:?}", value)
804                    ))),
805                }
806            }
807        }
808    };
809}
810
811macro_rules! impl_produce_with_clone {
812    ($rust_type:ty, [$($variant:ident),+]) => {
813        impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
814            type Error = ClickHouseSourceError;
815
816            #[throws(ClickHouseSourceError)]
817            fn produce(&'r mut self) -> $rust_type {
818                let (ridx, cidx) = self.next_loc()?;
819                let value = &self.rowbuf[ridx][cidx];
820
821                match value {
822                    $(DataType::$variant(v) => v.clone() as $rust_type,)+
823                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
824                        format!("{:?}", value)
825                    ))),
826                }
827            }
828        }
829
830        impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
831            type Error = ClickHouseSourceError;
832
833            #[throws(ClickHouseSourceError)]
834            fn produce(&'r mut self) -> Option<$rust_type> {
835                let (ridx, cidx) = self.next_loc()?;
836                let value = &self.rowbuf[ridx][cidx];
837
838                match value {
839                    DataType::Null => None,
840                    $(DataType::$variant(v) => Some(v.clone() as $rust_type),)+
841                    _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
842                        format!("{:?}", value)
843                    ))),
844                }
845            }
846        }
847    };
848}
849
850macro_rules! impl_produce_vec {
851    ($rust_type:ty, [$($variant:ident),+]) => {
852        impl<'r, 'a> Produce<'r, Vec<Option<$rust_type>>> for ClickHouseSourceParser<'a> {
853            type Error = ClickHouseSourceError;
854
855            #[throws(ClickHouseSourceError)]
856            fn produce(&'r mut self) -> Vec<Option<$rust_type>> {
857                let (ridx, cidx) = self.next_loc()?;
858                let value = &self.rowbuf[ridx][cidx];
859
860                match value {
861                    $(DataType::$variant(v) => v.clone(),)+
862                    _ => throw!(ConnectorXError::cannot_produce::<Vec<Option<$rust_type>>>(Some(
863                        format!("{:?}", value)
864                    ))),
865                }
866            }
867        }
868
869        impl<'r, 'a> Produce<'r, Option<Vec<Option<$rust_type>>>> for ClickHouseSourceParser<'a> {
870            type Error = ClickHouseSourceError;
871
872            #[throws(ClickHouseSourceError)]
873            fn produce(&'r mut self) -> Option<Vec<Option<$rust_type>>> {
874                let (ridx, cidx) = self.next_loc()?;
875                let value = &self.rowbuf[ridx][cidx];
876
877                match value {
878                    DataType::Null => None,
879                    $(DataType::$variant(v) => Some(v.clone()),)+
880                    _ => throw!(ConnectorXError::cannot_produce::<Option<Vec<Option<$rust_type>>>>(Some(
881                        format!("{:?}", value)
882                    ))),
883                }
884            }
885        }
886    };
887}
888
889impl_produce!(i8, [Int8]);
890impl_produce!(i16, [Int16]);
891impl_produce!(i32, [Int32]);
892impl_produce!(i64, [Int64]);
893impl_produce!(u8, [UInt8]);
894impl_produce!(u16, [UInt16]);
895impl_produce!(u32, [UInt32]);
896impl_produce!(u64, [UInt64]);
897impl_produce!(f32, [Float32]);
898impl_produce!(f64, [Float64]);
899impl_produce!(Decimal, [Decimal]);
900impl_produce_with_clone!(String, [String, Enum8, Enum16]);
901impl_produce_with_clone!(Vec<u8>, [FixedString]);
902impl_produce!(NaiveDate, [Date, Date32]);
903impl_produce!(DateTime<Utc>, [DateTime, DateTime64]);
904impl_produce!(NaiveTime, [Time, Time64]);
905impl_produce!(Uuid, [UUID]);
906impl_produce!(IpAddr, [IPv4, IPv6]);
907impl_produce!(bool, [Bool]);
908impl_produce_vec!(bool, [ArrayBool]);
909impl_produce_vec!(String, [ArrayString]);
910impl_produce_vec!(i8, [ArrayInt8]);
911impl_produce_vec!(i16, [ArrayInt16]);
912impl_produce_vec!(i32, [ArrayInt32]);
913impl_produce_vec!(i64, [ArrayInt64]);
914impl_produce_vec!(u8, [ArrayUInt8]);
915impl_produce_vec!(u16, [ArrayUInt16]);
916impl_produce_vec!(u32, [ArrayUInt32]);
917impl_produce_vec!(u64, [ArrayUInt64]);
918impl_produce_vec!(f32, [ArrayFloat32]);
919impl_produce_vec!(f64, [ArrayFloat64]);
920impl_produce_vec!(Decimal, [ArrayDecimal]);