1mod errors;
4mod typesystem;
5
6pub use self::errors::ClickHouseSourceError;
7pub use self::typesystem::{ClickHouseTypeSystem, TypeMetadata};
8
9use crate::{
10 data_order::DataOrder,
11 errors::ConnectorXError,
12 sources::{
13 clickhouse::typesystem::DataType, PartitionParser, Produce, Source, SourcePartition,
14 },
15 sql::{count_query, limit1_query, CXQuery},
16};
17use anyhow::anyhow;
18use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
19use chrono_tz::Tz;
20use clickhouse::Client;
21use fehler::{throw, throws};
22use rust_decimal::Decimal;
23use serde::Deserialize;
24use serde_json::Value as JsonValue;
25use sqlparser::dialect::{ClickHouseDialect, GenericDialect};
26use std::io::{Cursor, Read};
27use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
28use std::sync::Arc;
29use tokio::runtime::Runtime;
30use uuid::Uuid;
31
32pub struct ClickHouseSource {
34 rt: Arc<Runtime>,
35 pub client: Client,
36 origin_query: Option<String>,
37 queries: Vec<CXQuery<String>>,
38 names: Vec<String>,
39 schema: Vec<ClickHouseTypeSystem>,
40 metadata: Vec<TypeMetadata>,
41}
42
43impl ClickHouseSource {
44 #[throws(ClickHouseSourceError)]
45 pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
46 let url = url::Url::parse(conn)?;
47
48 let use_https = url
49 .query_pairs()
50 .find(|(k, v)| k == "protocol" && v == "https")
51 .is_some();
52
53 let base_url = format!(
54 "{}://{}:{}",
55 if use_https { "https" } else { "http" },
56 url.host_str().unwrap_or("localhost"),
57 url.port().unwrap_or(8123)
58 );
59
60 let mut client = Client::default().with_url(&base_url);
61
62 let database = url.path().trim_start_matches('/');
63 if !database.is_empty() {
64 client = client.with_database(database);
65 }
66
67 let username = url.username();
68 if !username.is_empty() {
69 client = client.with_user(username);
70 }
71
72 let password = url.password().unwrap_or("");
73 if !password.is_empty() {
74 client = client.with_password(password);
75 }
76
77 Self {
78 rt,
79 client,
80 origin_query: None,
81 queries: vec![],
82 names: vec![],
83 schema: vec![],
84 metadata: vec![],
85 }
86 }
87}
88
89impl Source for ClickHouseSource
90where
91 ClickHouseSourcePartition:
92 SourcePartition<TypeSystem = ClickHouseTypeSystem, Error = ClickHouseSourceError>,
93{
94 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
95 type Partition = ClickHouseSourcePartition;
96 type TypeSystem = ClickHouseTypeSystem;
97 type Error = ClickHouseSourceError;
98
99 #[throws(ClickHouseSourceError)]
100 fn set_data_order(&mut self, data_order: DataOrder) {
101 if !matches!(data_order, DataOrder::RowMajor) {
102 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
103 }
104 }
105
106 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
107 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
108 }
109
110 fn set_origin_query(&mut self, query: Option<String>) {
111 self.origin_query = query;
112 }
113
114 #[throws(ClickHouseSourceError)]
115 fn fetch_metadata(&mut self) {
116 assert!(!self.queries.is_empty());
117
118 let first_query = &self.queries[0];
119 let l1query = limit1_query(first_query, &ClickHouseDialect {})?;
120
121 let describe_query = format!("DESCRIBE ({})", l1query.as_str());
122
123 let response = self.rt.block_on(async {
124 let mut cursor = self
125 .client
126 .query(&describe_query)
127 .fetch_bytes("JSONCompact")
128 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
129 let bytes = cursor
130 .collect()
131 .await
132 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
133 Ok::<_, ClickHouseSourceError>(bytes)
134 })?;
135
136 #[derive(Debug, Deserialize)]
137 struct DescribeResponse {
138 data: Vec<Vec<JsonValue>>,
139 }
140
141 let parsed: DescribeResponse = serde_json::from_slice(&response)
142 .map_err(|e| anyhow!("Failed to parse DESCRIBE response: {}", e))?;
143
144 let mut names = Vec::new();
145 let mut types = Vec::new();
146 let mut metadata = Vec::new();
147
148 for row in parsed.data {
149 if row.len() >= 2 {
150 let name = row[0].as_str().unwrap_or("").to_string();
151 let type_str = row[1].as_str().unwrap_or("String");
152 let (ts, meta) = ClickHouseTypeSystem::from_type_str_with_metadata(type_str);
153 names.push(name);
154 types.push(ts);
155 metadata.push(meta);
156 }
157 }
158
159 self.names = names;
160 self.schema = types;
161 self.metadata = metadata;
162 }
163
164 #[throws(ClickHouseSourceError)]
165 fn result_rows(&mut self) -> Option<usize> {
166 match &self.origin_query {
167 Some(q) => {
168 let cxq = CXQuery::Naked(q.clone());
169 let cquery = count_query(&cxq, &ClickHouseDialect {})?;
170
171 let response = self.rt.block_on(async {
172 let mut cursor = self
173 .client
174 .query(cquery.as_str())
175 .fetch_bytes("JSONCompact")
176 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
177 let bytes = cursor
178 .collect()
179 .await
180 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
181 Ok::<_, ClickHouseSourceError>(bytes)
182 })?;
183
184 #[derive(Debug, Deserialize)]
185 struct CountResponse {
186 data: Vec<Vec<JsonValue>>,
187 }
188
189 let parsed: CountResponse = serde_json::from_slice(&response)
190 .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
191
192 if let Some(row) = parsed.data.first() {
193 if let Some(count_val) = row.first() {
194 let count = match count_val {
195 JsonValue::Number(n) => n.as_u64().unwrap_or(0),
196 JsonValue::String(s) => s.parse().unwrap_or(0),
197 _ => 0,
198 };
199 return Some(count as usize);
200 }
201 }
202 None
203 }
204 None => None,
205 }
206 }
207
208 fn names(&self) -> Vec<String> {
209 self.names.clone()
210 }
211
212 fn schema(&self) -> Vec<Self::TypeSystem> {
213 self.schema.clone()
214 }
215
216 #[throws(ClickHouseSourceError)]
217 fn partition(self) -> Vec<Self::Partition> {
218 let mut ret = vec![];
219 for query in self.queries {
220 ret.push(ClickHouseSourcePartition::new(
221 self.rt.clone(),
222 self.client.clone(),
223 &query,
224 &self.schema,
225 &self.metadata,
226 ));
227 }
228 ret
229 }
230}
231
232pub struct ClickHouseSourcePartition {
233 rt: Arc<Runtime>,
234 client: Client,
235 query: CXQuery<String>,
236 schema: Vec<ClickHouseTypeSystem>,
237 metadata: Vec<TypeMetadata>,
238 nrows: usize,
239 ncols: usize,
240}
241
242impl ClickHouseSourcePartition {
243 pub fn new(
244 rt: Arc<Runtime>,
245 client: Client,
246 query: &CXQuery<String>,
247 schema: &[ClickHouseTypeSystem],
248 metadata: &[TypeMetadata],
249 ) -> Self {
250 Self {
251 rt,
252 client,
253 query: query.clone(),
254 schema: schema.to_vec(),
255 metadata: metadata.to_vec(),
256 nrows: 0,
257 ncols: schema.len(),
258 }
259 }
260}
261
262impl SourcePartition for ClickHouseSourcePartition {
263 type TypeSystem = ClickHouseTypeSystem;
264 type Parser<'a> = ClickHouseSourceParser<'a>;
265 type Error = ClickHouseSourceError;
266
267 #[throws(ClickHouseSourceError)]
268 fn result_rows(&mut self) {
269 let cquery = count_query(&self.query, &GenericDialect {})?;
270
271 let response = self.rt.block_on(async {
272 let mut cursor = self
273 .client
274 .query(cquery.as_str())
275 .fetch_bytes("JSONCompact")
276 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
277 let bytes = cursor
278 .collect()
279 .await
280 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
281 Ok::<_, ClickHouseSourceError>(bytes)
282 })?;
283
284 #[derive(Debug, Deserialize)]
285 struct CountResponse {
286 data: Vec<Vec<JsonValue>>,
287 }
288
289 let parsed: CountResponse = serde_json::from_slice(&response)
290 .map_err(|e| anyhow!("Failed to parse count response: {}", e))?;
291
292 if let Some(row) = parsed.data.first() {
293 if let Some(count_val) = row.first() {
294 let count = match count_val {
295 JsonValue::Number(n) => n.as_u64().unwrap_or(0),
296 JsonValue::String(s) => s.parse().unwrap_or(0),
297 _ => 0,
298 };
299 self.nrows = count as usize;
300 }
301 }
302 }
303
304 #[throws(ClickHouseSourceError)]
305 fn parser(&mut self) -> Self::Parser<'_> {
306 ClickHouseSourceParser::new(
307 self.rt.clone(),
308 self.client.clone(),
309 self.query.clone(),
310 &self.schema,
311 &self.metadata,
312 )?
313 }
314
315 fn nrows(&self) -> usize {
316 self.nrows
317 }
318
319 fn ncols(&self) -> usize {
320 self.ncols
321 }
322}
323
324struct BinaryReader<'a> {
325 cursor: Cursor<&'a [u8]>,
326}
327
328impl<'a> BinaryReader<'a> {
329 fn new(data: &'a [u8]) -> Self {
330 Self {
331 cursor: Cursor::new(data),
332 }
333 }
334
335 fn read_bytes<const N: usize>(&mut self) -> Result<[u8; N], ClickHouseSourceError> {
336 let mut buf = [0u8; N];
337 self.cursor
338 .read_exact(&mut buf)
339 .map_err(|e| anyhow!("Failed to read {} bytes: {}", N, e))?;
340 Ok(buf)
341 }
342
343 fn read_u8(&mut self) -> Result<u8, ClickHouseSourceError> {
344 Ok(self.read_bytes::<1>()?[0])
345 }
346
347 fn read_i8(&mut self) -> Result<i8, ClickHouseSourceError> {
348 Ok(self.read_u8()? as i8)
349 }
350
351 fn read_u16(&mut self) -> Result<u16, ClickHouseSourceError> {
352 Ok(u16::from_le_bytes(self.read_bytes()?))
353 }
354
355 fn read_i16(&mut self) -> Result<i16, ClickHouseSourceError> {
356 Ok(i16::from_le_bytes(self.read_bytes()?))
357 }
358
359 fn read_u32(&mut self) -> Result<u32, ClickHouseSourceError> {
360 Ok(u32::from_le_bytes(self.read_bytes()?))
361 }
362
363 fn read_i32(&mut self) -> Result<i32, ClickHouseSourceError> {
364 Ok(i32::from_le_bytes(self.read_bytes()?))
365 }
366
367 fn read_u64(&mut self) -> Result<u64, ClickHouseSourceError> {
368 Ok(u64::from_le_bytes(self.read_bytes()?))
369 }
370
371 fn read_i64(&mut self) -> Result<i64, ClickHouseSourceError> {
372 Ok(i64::from_le_bytes(self.read_bytes()?))
373 }
374
375 fn read_f32(&mut self) -> Result<f32, ClickHouseSourceError> {
376 Ok(f32::from_le_bytes(self.read_bytes()?))
377 }
378
379 fn read_f64(&mut self) -> Result<f64, ClickHouseSourceError> {
380 Ok(f64::from_le_bytes(self.read_bytes()?))
381 }
382
383 fn read_varint(&mut self) -> Result<u64, ClickHouseSourceError> {
384 let mut result: u64 = 0;
385 let mut shift = 0;
386 loop {
387 let byte = self.read_u8()?;
388 result |= ((byte & 0x7f) as u64) << shift;
389 if byte & 0x80 == 0 {
390 break;
391 }
392 shift += 7;
393 if shift >= 64 {
394 return Err(anyhow!("Varint too long").into());
395 }
396 }
397 Ok(result)
398 }
399
400 fn read_fixed_string(&mut self, len: usize) -> Result<Vec<u8>, ClickHouseSourceError> {
401 let mut buf = vec![0u8; len];
402 self.cursor
403 .read_exact(&mut buf)
404 .map_err(|e| anyhow!("Failed to read FixedString: {}", e))?;
405 Ok(buf)
406 }
407
408 fn read_string(&mut self) -> Result<String, ClickHouseSourceError> {
409 let len = self.read_varint()? as usize;
410 let mut buf = vec![0u8; len];
411 self.cursor
412 .read_exact(&mut buf)
413 .map_err(|e| anyhow!("Failed to read string: {}", e))?;
414 String::from_utf8(buf).map_err(|e| anyhow!("Invalid UTF-8: {}", e).into())
415 }
416
417 fn read_uuid(&mut self) -> Result<Uuid, ClickHouseSourceError> {
418 let high = self.read_u64()?;
420 let low = self.read_u64()?;
421
422 Ok(Uuid::from_u64_pair(high, low))
423 }
424
425 fn read_bool(&mut self) -> Result<bool, ClickHouseSourceError> {
426 Ok(self.read_u8()? != 0)
427 }
428
429 fn read_date(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
430 let days = self.read_u16()? as i64;
432 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
433 epoch
434 .checked_add_signed(Duration::days(days))
435 .ok_or_else(|| anyhow!("Invalid date value: {} days since epoch", days).into())
436 }
437
438 fn read_date32(&mut self) -> Result<NaiveDate, ClickHouseSourceError> {
439 let days = self.read_i32()? as i64;
442 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
443 epoch
444 .checked_add_signed(Duration::days(days))
445 .ok_or_else(|| anyhow!("Invalid date32 value: {} days since epoch", days).into())
446 }
447
448 fn read_datetime(&mut self, tz: Option<&Tz>) -> Result<DateTime<Utc>, ClickHouseSourceError> {
449 let seconds = self.read_u32()? as i64;
450 chrono::DateTime::from_timestamp(seconds, 0)
451 .map(|dt| dt.with_timezone(tz.unwrap_or(&Tz::UTC)).to_utc())
452 .ok_or_else(|| {
453 anyhow!("Invalid datetime value: {} seconds since epoch", seconds).into()
454 })
455 }
456
457 fn read_datetime64(
458 &mut self,
459 precision: u8,
460 tz: Option<&Tz>,
461 ) -> Result<DateTime<Utc>, ClickHouseSourceError> {
462 if precision > 9 {
463 return Err(anyhow!("Unsupported DateTime64 precision: {}", precision).into());
464 }
465 let ticks = self.read_i64()?;
466 let nanos = ticks * 10_i64.pow(9 - precision as u32);
467
468 Ok(DateTime::from_timestamp_nanos(nanos)
469 .with_timezone(tz.unwrap_or(&Tz::UTC))
470 .to_utc())
471 }
472
473 fn read_decimal(&mut self, precision: u8, scale: u8) -> Result<Decimal, ClickHouseSourceError> {
474 match precision {
475 1..=9 => {
476 let value = self.read_i32()?;
477 Ok(Decimal::new(value as i64, scale as u32))
478 }
479 10..=18 => {
480 let value = self.read_i64()?;
481 Ok(Decimal::new(value, scale as u32))
482 }
483 _ => Err(anyhow!("Unsupported Decimal precision: {}", precision).into()),
484 }
485 }
486
487 fn read_time(&mut self) -> Result<NaiveTime, ClickHouseSourceError> {
488 let seconds = self.read_u32()? as i64;
489 Ok(
490 NaiveTime::from_num_seconds_from_midnight_opt(seconds as u32, 0)
491 .ok_or_else(|| anyhow!("Invalid time value: {} seconds since midnight", seconds))?,
492 )
493 }
494
495 fn read_time64(&mut self, precision: u8) -> Result<NaiveTime, ClickHouseSourceError> {
496 if precision > 9 {
497 return Err(anyhow!("Unsupported Time64 precision: {}", precision).into());
498 }
499 let ticks = self.read_i64()?;
500 let nanos = ticks * 10_i64.pow(9 - precision as u32);
501 Ok(NaiveTime::from_num_seconds_from_midnight_opt(
502 (nanos / 1_000_000_000) as u32,
503 (nanos % 1_000_000_000) as u32,
504 )
505 .ok_or_else(|| anyhow!("Invalid time64 value: {} ticks since midnight", ticks))?)
506 }
507
508 fn read_ipv4(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
509 let bytes = self.read_u32()?;
510
511 Ok(IpAddr::V4(Ipv4Addr::from_bits(bytes)))
512 }
513
514 fn read_ipv6(&mut self) -> Result<IpAddr, ClickHouseSourceError> {
515 let seg1 = u16::from_be(self.read_u16()?);
516 let seg2 = u16::from_be(self.read_u16()?);
517 let seg3 = u16::from_be(self.read_u16()?);
518 let seg4 = u16::from_be(self.read_u16()?);
519 let seg5 = u16::from_be(self.read_u16()?);
520 let seg6 = u16::from_be(self.read_u16()?);
521 let seg7 = u16::from_be(self.read_u16()?);
522 let seg8 = u16::from_be(self.read_u16()?);
523
524 Ok(IpAddr::V6(Ipv6Addr::from_segments([
525 seg1, seg2, seg3, seg4, seg5, seg6, seg7, seg8,
526 ])))
527 }
528
529 fn read_enum8(&mut self) -> Result<i8, ClickHouseSourceError> {
530 self.read_i8()
531 }
532
533 fn read_enum16(&mut self) -> Result<i16, ClickHouseSourceError> {
534 self.read_i16()
535 }
536
537 fn read_array<T, F>(&mut self, read_elem: F) -> Result<Vec<Option<T>>, ClickHouseSourceError>
538 where
539 F: Fn(&mut Self) -> Result<T, ClickHouseSourceError>,
540 {
541 let len = self.read_varint()? as usize;
542 let mut result = Vec::with_capacity(len);
543 for _ in 0..len {
544 result.push(Some(read_elem(self)?));
545 }
546 Ok(result)
547 }
548
549 fn is_empty(&self) -> bool {
550 self.cursor.position() as usize >= self.cursor.get_ref().len()
551 }
552}
553
554pub struct ClickHouseSourceParser<'a> {
555 rt: Arc<Runtime>,
556 client: Client,
557 query: CXQuery<String>,
558 schema: Vec<ClickHouseTypeSystem>,
559 metadata: Vec<TypeMetadata>,
560 rowbuf: Vec<Vec<DataType>>,
561 ncols: usize,
562 current_row: usize,
563 current_col: usize,
564 is_finished: bool,
565 _phantom: std::marker::PhantomData<&'a ()>,
566}
567
568impl<'a> ClickHouseSourceParser<'a> {
569 #[throws(ClickHouseSourceError)]
570 pub fn new(
571 rt: Arc<Runtime>,
572 client: Client,
573 query: CXQuery<String>,
574 schema: &[ClickHouseTypeSystem],
575 metadata: &[TypeMetadata],
576 ) -> Self {
577 Self {
578 rt,
579 client,
580 query,
581 schema: schema.to_vec(),
582 metadata: metadata.to_vec(),
583 rowbuf: Vec::new(),
584 ncols: schema.len(),
585 current_row: 0,
586 current_col: 0,
587 is_finished: false,
588 _phantom: std::marker::PhantomData,
589 }
590 }
591
592 fn parse_row_binary(
593 &self,
594 reader: &mut BinaryReader,
595 ) -> Result<Vec<DataType>, ClickHouseSourceError> {
596 let mut row = Vec::with_capacity(self.ncols);
597
598 for (col_idx, col_type) in self.schema.iter().enumerate() {
599 let is_nullable = col_type.is_nullable();
600 let meta = &self.metadata[col_idx];
601
602 if is_nullable {
603 let null_flag = reader.read_u8()?;
604 if null_flag == 1 {
605 row.push(DataType::Null);
606 continue;
607 }
608 }
609
610 let value = match col_type {
611 ClickHouseTypeSystem::Int8(_) => DataType::Int8(reader.read_i8()?),
612 ClickHouseTypeSystem::Int16(_) => DataType::Int16(reader.read_i16()?),
613 ClickHouseTypeSystem::Int32(_) => DataType::Int32(reader.read_i32()?),
614 ClickHouseTypeSystem::Int64(_) => DataType::Int64(reader.read_i64()?),
615 ClickHouseTypeSystem::UInt8(_) => DataType::UInt8(reader.read_u8()?),
616 ClickHouseTypeSystem::UInt16(_) => DataType::UInt16(reader.read_u16()?),
617 ClickHouseTypeSystem::UInt32(_) => DataType::UInt32(reader.read_u32()?),
618 ClickHouseTypeSystem::UInt64(_) => DataType::UInt64(reader.read_u64()?),
619
620 ClickHouseTypeSystem::Float32(_) => DataType::Float32(reader.read_f32()?),
621 ClickHouseTypeSystem::Float64(_) => DataType::Float64(reader.read_f64()?),
622
623 ClickHouseTypeSystem::Decimal(_) => {
624 DataType::Decimal(reader.read_decimal(meta.precision, meta.scale)?)
625 }
626
627 ClickHouseTypeSystem::String(_) => DataType::String(reader.read_string()?),
628
629 ClickHouseTypeSystem::FixedString(_) => {
630 DataType::FixedString(reader.read_fixed_string(meta.length)?)
631 }
632
633 ClickHouseTypeSystem::Date(_) => DataType::Date(reader.read_date()?),
634 ClickHouseTypeSystem::Date32(_) => DataType::Date32(reader.read_date32()?),
635
636 ClickHouseTypeSystem::DateTime(_) => {
637 DataType::DateTime(reader.read_datetime(meta.timezone.as_ref())?)
638 }
639 ClickHouseTypeSystem::DateTime64(_) => DataType::DateTime64(
640 reader.read_datetime64(meta.precision, meta.timezone.as_ref())?,
641 ),
642
643 ClickHouseTypeSystem::Time(_) => DataType::Time(reader.read_time()?),
644 ClickHouseTypeSystem::Time64(_) => {
645 DataType::Time64(reader.read_time64(meta.precision)?)
646 }
647
648 ClickHouseTypeSystem::Enum8(_) => {
649 let enum_value = reader.read_enum8()?;
650 let enum_str = meta
651 .named_values
652 .as_ref()
653 .and_then(|h| h.get(&(enum_value as i16)));
654 DataType::Enum8(enum_str.cloned().unwrap_or_default())
655 }
656 ClickHouseTypeSystem::Enum16(_) => {
657 let enum_value = reader.read_enum16()?;
658 let enum_str = meta.named_values.as_ref().and_then(|h| h.get(&enum_value));
659 DataType::Enum16(enum_str.cloned().unwrap_or_default())
660 }
661
662 ClickHouseTypeSystem::UUID(_) => DataType::UUID(reader.read_uuid()?),
663
664 ClickHouseTypeSystem::IPv4(_) => DataType::IPv4(reader.read_ipv4()?),
665 ClickHouseTypeSystem::IPv6(_) => DataType::IPv6(reader.read_ipv6()?),
666
667 ClickHouseTypeSystem::Bool(_) => DataType::Bool(reader.read_bool()?),
668
669 ClickHouseTypeSystem::ArrayBool(_) => {
670 DataType::ArrayBool(reader.read_array(BinaryReader::read_bool)?)
671 }
672 ClickHouseTypeSystem::ArrayString(_) => {
673 DataType::ArrayString(reader.read_array(BinaryReader::read_string)?)
674 }
675 ClickHouseTypeSystem::ArrayInt8(_) => {
676 DataType::ArrayInt8(reader.read_array(BinaryReader::read_i8)?)
677 }
678 ClickHouseTypeSystem::ArrayInt16(_) => {
679 DataType::ArrayInt16(reader.read_array(BinaryReader::read_i16)?)
680 }
681 ClickHouseTypeSystem::ArrayInt32(_) => {
682 DataType::ArrayInt32(reader.read_array(BinaryReader::read_i32)?)
683 }
684 ClickHouseTypeSystem::ArrayInt64(_) => {
685 DataType::ArrayInt64(reader.read_array(BinaryReader::read_i64)?)
686 }
687 ClickHouseTypeSystem::ArrayUInt8(_) => {
688 DataType::ArrayUInt8(reader.read_array(BinaryReader::read_u8)?)
689 }
690 ClickHouseTypeSystem::ArrayUInt16(_) => {
691 DataType::ArrayUInt16(reader.read_array(BinaryReader::read_u16)?)
692 }
693 ClickHouseTypeSystem::ArrayUInt32(_) => {
694 DataType::ArrayUInt32(reader.read_array(BinaryReader::read_u32)?)
695 }
696 ClickHouseTypeSystem::ArrayUInt64(_) => {
697 DataType::ArrayUInt64(reader.read_array(BinaryReader::read_u64)?)
698 }
699 ClickHouseTypeSystem::ArrayFloat32(_) => {
700 DataType::ArrayFloat32(reader.read_array(BinaryReader::read_f32)?)
701 }
702 ClickHouseTypeSystem::ArrayFloat64(_) => {
703 DataType::ArrayFloat64(reader.read_array(BinaryReader::read_f64)?)
704 }
705 ClickHouseTypeSystem::ArrayDecimal(_) => {
706 let precision = meta.precision;
707 let scale = meta.scale;
708 DataType::ArrayDecimal(reader.read_array(|r| r.read_decimal(precision, scale))?)
709 }
710 };
711
712 row.push(value);
713 }
714
715 Ok(row)
716 }
717
718 #[throws(ClickHouseSourceError)]
719 fn next_loc(&mut self) -> (usize, usize) {
720 let ret = (self.current_row, self.current_col);
721 self.current_row += (self.current_col + 1) / self.ncols;
722 self.current_col = (self.current_col + 1) % self.ncols;
723 ret
724 }
725}
726
727impl<'a> PartitionParser<'a> for ClickHouseSourceParser<'a> {
728 type TypeSystem = ClickHouseTypeSystem;
729 type Error = ClickHouseSourceError;
730
731 #[throws(ClickHouseSourceError)]
732 fn fetch_next(&mut self) -> (usize, bool) {
733 assert!(self.current_col == 0);
734
735 if self.is_finished {
736 return (0, true);
737 }
738
739 let response = self.rt.block_on(async {
740 let mut cursor = self
741 .client
742 .query(self.query.as_str())
743 .fetch_bytes("RowBinary")
744 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
745 let bytes = cursor
746 .collect()
747 .await
748 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
749 Ok::<_, ClickHouseSourceError>(bytes)
750 })?;
751 let mut reader = BinaryReader::new(&response);
752 let mut rows = Vec::new();
753
754 while !reader.is_empty() {
755 match self.parse_row_binary(&mut reader) {
756 Ok(row) => rows.push(row),
757 Err(_) => break,
758 }
759 }
760
761 self.rowbuf = rows;
762 self.current_row = 0;
763 self.is_finished = true;
764
765 (self.rowbuf.len(), true)
766 }
767}
768
769macro_rules! impl_produce {
770 ($rust_type:ty, [$($variant:ident),+]) => {
771 impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
772 type Error = ClickHouseSourceError;
773
774 #[throws(ClickHouseSourceError)]
775 fn produce(&'r mut self) -> $rust_type {
776 let (ridx, cidx) = self.next_loc()?;
777 let value = &self.rowbuf[ridx][cidx];
778
779 match value {
780 $(DataType::$variant(v) => *v as $rust_type,)+
781 _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
782 format!("{:?}", value)
783 ))),
784 }
785 }
786 }
787
788 impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
789 type Error = ClickHouseSourceError;
790
791 #[throws(ClickHouseSourceError)]
792 fn produce(&'r mut self) -> Option<$rust_type> {
793 let (ridx, cidx) = self.next_loc()?;
794 let value = &self.rowbuf[ridx][cidx];
795
796 match value {
797 DataType::Null => None,
798 $(DataType::$variant(v) => Some(*v as $rust_type),)+
799 _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
800 format!("{:?}", value)
801 ))),
802 }
803 }
804 }
805 };
806}
807
808macro_rules! impl_produce_with_clone {
809 ($rust_type:ty, [$($variant:ident),+]) => {
810 impl<'r, 'a> Produce<'r, $rust_type> for ClickHouseSourceParser<'a> {
811 type Error = ClickHouseSourceError;
812
813 #[throws(ClickHouseSourceError)]
814 fn produce(&'r mut self) -> $rust_type {
815 let (ridx, cidx) = self.next_loc()?;
816 let value = &self.rowbuf[ridx][cidx];
817
818 match value {
819 $(DataType::$variant(v) => v.clone() as $rust_type,)+
820 _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
821 format!("{:?}", value)
822 ))),
823 }
824 }
825 }
826
827 impl<'r, 'a> Produce<'r, Option<$rust_type>> for ClickHouseSourceParser<'a> {
828 type Error = ClickHouseSourceError;
829
830 #[throws(ClickHouseSourceError)]
831 fn produce(&'r mut self) -> Option<$rust_type> {
832 let (ridx, cidx) = self.next_loc()?;
833 let value = &self.rowbuf[ridx][cidx];
834
835 match value {
836 DataType::Null => None,
837 $(DataType::$variant(v) => Some(v.clone() as $rust_type),)+
838 _ => throw!(ConnectorXError::cannot_produce::<$rust_type>(Some(
839 format!("{:?}", value)
840 ))),
841 }
842 }
843 }
844 };
845}
846
847macro_rules! impl_produce_vec {
848 ($rust_type:ty, [$($variant:ident),+]) => {
849 impl<'r, 'a> Produce<'r, Vec<Option<$rust_type>>> for ClickHouseSourceParser<'a> {
850 type Error = ClickHouseSourceError;
851
852 #[throws(ClickHouseSourceError)]
853 fn produce(&'r mut self) -> Vec<Option<$rust_type>> {
854 let (ridx, cidx) = self.next_loc()?;
855 let value = &self.rowbuf[ridx][cidx];
856
857 match value {
858 $(DataType::$variant(v) => v.clone(),)+
859 _ => throw!(ConnectorXError::cannot_produce::<Vec<Option<$rust_type>>>(Some(
860 format!("{:?}", value)
861 ))),
862 }
863 }
864 }
865
866 impl<'r, 'a> Produce<'r, Option<Vec<Option<$rust_type>>>> for ClickHouseSourceParser<'a> {
867 type Error = ClickHouseSourceError;
868
869 #[throws(ClickHouseSourceError)]
870 fn produce(&'r mut self) -> Option<Vec<Option<$rust_type>>> {
871 let (ridx, cidx) = self.next_loc()?;
872 let value = &self.rowbuf[ridx][cidx];
873
874 match value {
875 DataType::Null => None,
876 $(DataType::$variant(v) => Some(v.clone()),)+
877 _ => throw!(ConnectorXError::cannot_produce::<Option<Vec<Option<$rust_type>>>>(Some(
878 format!("{:?}", value)
879 ))),
880 }
881 }
882 }
883 };
884}
885
886impl_produce!(i8, [Int8]);
887impl_produce!(i16, [Int16]);
888impl_produce!(i32, [Int32]);
889impl_produce!(i64, [Int64]);
890impl_produce!(u8, [UInt8]);
891impl_produce!(u16, [UInt16]);
892impl_produce!(u32, [UInt32]);
893impl_produce!(u64, [UInt64]);
894impl_produce!(f32, [Float32]);
895impl_produce!(f64, [Float64]);
896impl_produce!(Decimal, [Decimal]);
897impl_produce_with_clone!(String, [String, Enum8, Enum16]);
898impl_produce_with_clone!(Vec<u8>, [FixedString]);
899impl_produce!(NaiveDate, [Date, Date32]);
900impl_produce!(DateTime<Utc>, [DateTime, DateTime64]);
901impl_produce!(NaiveTime, [Time, Time64]);
902impl_produce!(Uuid, [UUID]);
903impl_produce!(IpAddr, [IPv4, IPv6]);
904impl_produce!(bool, [Bool]);
905impl_produce_vec!(bool, [ArrayBool]);
906impl_produce_vec!(String, [ArrayString]);
907impl_produce_vec!(i8, [ArrayInt8]);
908impl_produce_vec!(i16, [ArrayInt16]);
909impl_produce_vec!(i32, [ArrayInt32]);
910impl_produce_vec!(i64, [ArrayInt64]);
911impl_produce_vec!(u8, [ArrayUInt8]);
912impl_produce_vec!(u16, [ArrayUInt16]);
913impl_produce_vec!(u32, [ArrayUInt32]);
914impl_produce_vec!(u64, [ArrayUInt64]);
915impl_produce_vec!(f32, [ArrayFloat32]);
916impl_produce_vec!(f64, [ArrayFloat64]);
917impl_produce_vec!(Decimal, [ArrayDecimal]);