1use std::sync::Arc;
2
3use crate::errors::{ConnectorXOutError, OutResult};
4use crate::source_router::{SourceConn, SourceType};
5#[cfg(feature = "src_bigquery")]
6use crate::sources::bigquery::BigQueryDialect;
7#[cfg(feature = "src_clickhouse")]
8use crate::sources::clickhouse::{ClickHouseSource, ClickHouseSourceError};
9#[cfg(feature = "src_mssql")]
10use crate::sources::mssql::{mssql_config, FloatN, IntN, MsSQLTypeSystem};
11#[cfg(feature = "src_mysql")]
12use crate::sources::mysql::{MySQLSourceError, MySQLTypeSystem};
13#[cfg(feature = "src_oracle")]
14use crate::sources::oracle::{OracleDialect, OracleSource};
15#[cfg(feature = "src_postgres")]
16use crate::sources::postgres::{rewrite_tls_args, PostgresTypeSystem};
17#[cfg(feature = "src_trino")]
18use crate::sources::trino::TrinoDialect;
19#[cfg(feature = "src_sqlite")]
20use crate::sql::get_partition_range_query_sep;
21use crate::sql::{get_partition_range_query, single_col_partition_query, CXQuery};
22use anyhow::anyhow;
23use fehler::{throw, throws};
24#[cfg(feature = "src_bigquery")]
25use gcp_bigquery_client;
26#[cfg(feature = "src_mysql")]
27use r2d2_mysql::mysql::{prelude::Queryable, Opts, Pool, Row};
28#[cfg(feature = "src_sqlite")]
29use rusqlite::{types::Type, Connection};
30#[cfg(feature = "src_postgres")]
31use rust_decimal::{prelude::ToPrimitive, Decimal};
32#[cfg(feature = "src_postgres")]
33use rust_decimal_macros::dec;
34#[cfg(feature = "src_clickhouse")]
35use serde::Deserialize;
36#[cfg(feature = "src_clickhouse")]
37use serde_json::Value as JsonValue;
38#[cfg(feature = "src_clickhouse")]
39use sqlparser::dialect::ClickHouseDialect;
40#[cfg(feature = "src_mssql")]
41use sqlparser::dialect::MsSqlDialect;
42#[cfg(feature = "src_mysql")]
43use sqlparser::dialect::MySqlDialect;
44#[cfg(feature = "src_postgres")]
45use sqlparser::dialect::PostgreSqlDialect;
46#[cfg(feature = "src_sqlite")]
47use sqlparser::dialect::SQLiteDialect;
48#[cfg(feature = "src_mssql")]
49use tiberius::Client;
50#[cfg(any(feature = "src_bigquery", feature = "src_mssql", feature = "src_trino"))]
51use tokio::{net::TcpStream, runtime::Runtime};
52#[cfg(feature = "src_mssql")]
53use tokio_util::compat::TokioAsyncWriteCompatExt;
54use url::Url;
55
56pub struct PartitionQuery {
57 query: String,
58 column: String,
59 min: Option<i64>,
60 max: Option<i64>,
61 num: usize,
62}
63
64impl PartitionQuery {
65 pub fn new(query: &str, column: &str, min: Option<i64>, max: Option<i64>, num: usize) -> Self {
66 Self {
67 query: query.into(),
68 column: column.into(),
69 min,
70 max,
71 num,
72 }
73 }
74}
75
76pub fn partition(part: &PartitionQuery, source_conn: &SourceConn) -> OutResult<Vec<CXQuery>> {
77 let mut queries = vec![];
78 let num = part.num as i64;
79 let (min, max) = match (part.min, part.max) {
80 (None, None) => get_col_range(source_conn, &part.query, &part.column)?,
81 (Some(min), Some(max)) => (min, max),
82 _ => throw!(anyhow!(
83 "partition_query range can not be partially specified",
84 )),
85 };
86
87 let partition_size = (max - min + 1) / num;
88
89 for i in 0..num {
90 let lower = min + i * partition_size;
91 let upper = match i == num - 1 {
92 true => max + 1,
93 false => min + (i + 1) * partition_size,
94 };
95 let partition_query = get_part_query(source_conn, &part.query, &part.column, lower, upper)?;
96 queries.push(partition_query);
97 }
98 Ok(queries)
99}
100
101pub fn get_col_range(source_conn: &SourceConn, query: &str, col: &str) -> OutResult<(i64, i64)> {
102 match source_conn.ty {
103 #[cfg(feature = "src_postgres")]
104 SourceType::Postgres => pg_get_partition_range(&source_conn.conn, query, col),
105 #[cfg(feature = "src_sqlite")]
106 SourceType::SQLite => sqlite_get_partition_range(&source_conn.conn, query, col),
107 #[cfg(feature = "src_mysql")]
108 SourceType::MySQL => mysql_get_partition_range(&source_conn.conn, query, col),
109 #[cfg(feature = "src_mssql")]
110 SourceType::MsSQL => mssql_get_partition_range(&source_conn.conn, query, col),
111 #[cfg(feature = "src_oracle")]
112 SourceType::Oracle => oracle_get_partition_range(&source_conn.conn, query, col),
113 #[cfg(feature = "src_bigquery")]
114 SourceType::BigQuery => bigquery_get_partition_range(&source_conn.conn, query, col),
115 #[cfg(feature = "src_trino")]
116 SourceType::Trino => trino_get_partition_range(&source_conn.conn, query, col),
117 #[cfg(feature = "src_clickhouse")]
118 SourceType::ClickHouse => clickhouse_get_partition_range(&source_conn.conn, query, col),
119 _ => unimplemented!("{:?} not implemented!", source_conn.ty),
120 }
121}
122
123#[throws(ConnectorXOutError)]
124pub fn get_part_query(
125 source_conn: &SourceConn,
126 query: &str,
127 col: &str,
128 lower: i64,
129 upper: i64,
130) -> CXQuery<String> {
131 let query = match source_conn.ty {
132 #[cfg(feature = "src_postgres")]
133 SourceType::Postgres => {
134 single_col_partition_query(query, col, lower, upper, &PostgreSqlDialect {})?
135 }
136 #[cfg(feature = "src_sqlite")]
137 SourceType::SQLite => {
138 single_col_partition_query(query, col, lower, upper, &SQLiteDialect {})?
139 }
140 #[cfg(feature = "src_mysql")]
141 SourceType::MySQL => {
142 single_col_partition_query(query, col, lower, upper, &MySqlDialect {})?
143 }
144 #[cfg(feature = "src_mssql")]
145 SourceType::MsSQL => {
146 single_col_partition_query(query, col, lower, upper, &MsSqlDialect {})?
147 }
148 #[cfg(feature = "src_oracle")]
149 SourceType::Oracle => {
150 single_col_partition_query(query, col, lower, upper, &OracleDialect {})?
151 }
152 #[cfg(feature = "src_bigquery")]
153 SourceType::BigQuery => {
154 single_col_partition_query(query, col, lower, upper, &BigQueryDialect {})?
155 }
156 #[cfg(feature = "src_trino")]
157 SourceType::Trino => {
158 single_col_partition_query(query, col, lower, upper, &TrinoDialect {})?
159 }
160 #[cfg(feature = "src_clickhouse")]
161 SourceType::ClickHouse => {
162 single_col_partition_query(query, col, lower, upper, &ClickHouseDialect {})?
163 }
164 _ => unimplemented!("{:?} not implemented!", source_conn.ty),
165 };
166 CXQuery::Wrapped(query)
167}
168
169#[cfg(feature = "src_postgres")]
170#[throws(ConnectorXOutError)]
171fn pg_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
172 let (config, tls) = rewrite_tls_args(conn)?;
173 let mut client = match tls {
174 None => config.connect(postgres::NoTls)?,
175 Some(tls_conn) => config.connect(tls_conn)?,
176 };
177 let range_query = get_partition_range_query(query, col, &PostgreSqlDialect {})?;
178 let row = client.query_one(range_query.as_str(), &[])?;
179
180 let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
181 let (min_v, max_v) = match col_type {
182 PostgresTypeSystem::Int2(_) => {
183 let min_v: Option<i16> = row.get(0);
184 let max_v: Option<i16> = row.get(1);
185 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
186 }
187 PostgresTypeSystem::Int4(_) => {
188 let min_v: Option<i32> = row.get(0);
189 let max_v: Option<i32> = row.get(1);
190 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
191 }
192 PostgresTypeSystem::Int8(_) => {
193 let min_v: Option<i64> = row.get(0);
194 let max_v: Option<i64> = row.get(1);
195 (min_v.unwrap_or(0), max_v.unwrap_or(0))
196 }
197 PostgresTypeSystem::Float4(_) => {
198 let min_v: Option<f32> = row.get(0);
199 let max_v: Option<f32> = row.get(1);
200 (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
201 }
202 PostgresTypeSystem::Float8(_) => {
203 let min_v: Option<f64> = row.get(0);
204 let max_v: Option<f64> = row.get(1);
205 (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
206 }
207 PostgresTypeSystem::Numeric(_) => {
208 let min_v: Option<Decimal> = row.get(0);
209 let max_v: Option<Decimal> = row.get(1);
210 (
211 min_v.unwrap_or(dec!(0.0)).to_i64().unwrap_or(0),
212 max_v.unwrap_or(dec!(0.0)).to_i64().unwrap_or(0),
213 )
214 }
215 _ => throw!(anyhow!(
216 "Partition can only be done on int or float columns"
217 )),
218 };
219
220 (min_v, max_v)
221}
222
223#[cfg(feature = "src_sqlite")]
224#[throws(ConnectorXOutError)]
225fn sqlite_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
226 let conn = Connection::open(&conn.as_str()[9..])?;
228 let (min_query, max_query) = get_partition_range_query_sep(query, col, &SQLiteDialect {})?;
231 let mut error = None;
232 let min_v = conn.query_row(min_query.as_str(), [], |row| {
233 let col_type = row.get_ref(0)?.data_type();
235 match col_type {
236 Type::Integer => row.get(0),
237 Type::Real => {
238 let v: f64 = row.get(0)?;
239 Ok(v as i64)
240 }
241 Type::Null => Ok(0),
242 _ => {
243 error = Some(anyhow!("Partition can only be done on integer columns"));
244 Ok(0)
245 }
246 }
247 })?;
248 match error {
249 None => {}
250 Some(e) => throw!(e),
251 }
252 let max_v = conn.query_row(max_query.as_str(), [], |row| {
253 let col_type = row.get_ref(0)?.data_type();
254 match col_type {
255 Type::Integer => row.get(0),
256 Type::Real => {
257 let v: f64 = row.get(0)?;
258 Ok(v as i64)
259 }
260 Type::Null => Ok(0),
261 _ => {
262 error = Some(anyhow!("Partition can only be done on integer columns"));
263 Ok(0)
264 }
265 }
266 })?;
267 match error {
268 None => {}
269 Some(e) => throw!(e),
270 }
271
272 (min_v, max_v)
273}
274
275#[cfg(feature = "src_mysql")]
276#[throws(ConnectorXOutError)]
277fn mysql_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
278 let pool = Pool::new(Opts::from_url(conn.as_str()).map_err(MySQLSourceError::MySQLUrlError)?)?;
279 let mut conn = pool.get_conn()?;
280 let range_query = get_partition_range_query(query, col, &MySqlDialect {})?;
281 let row: Row = conn
282 .query_first(range_query)?
283 .ok_or_else(|| anyhow!("mysql range: no row returns"))?;
284
285 let col_type =
286 MySQLTypeSystem::from((&row.columns()[0].column_type(), &row.columns()[0].flags()));
287
288 let (min_v, max_v) = match col_type {
289 MySQLTypeSystem::Tiny(_) => {
290 let min_v: Option<i8> = row
291 .get(0)
292 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
293 let max_v: Option<i8> = row
294 .get(1)
295 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
296 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
297 }
298 MySQLTypeSystem::Short(_) => {
299 let min_v: Option<i16> = row
300 .get(0)
301 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
302 let max_v: Option<i16> = row
303 .get(1)
304 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
305 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
306 }
307 MySQLTypeSystem::Int24(_) => {
308 let min_v: Option<i32> = row
309 .get(0)
310 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
311 let max_v: Option<i32> = row
312 .get(1)
313 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
314 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
315 }
316 MySQLTypeSystem::Long(_) => {
317 let min_v: Option<i64> = row
318 .get(0)
319 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
320 let max_v: Option<i64> = row
321 .get(1)
322 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
323 (min_v.unwrap_or(0), max_v.unwrap_or(0))
324 }
325 MySQLTypeSystem::LongLong(_) => {
326 let min_v: Option<i64> = row
327 .get(0)
328 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
329 let max_v: Option<i64> = row
330 .get(1)
331 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
332 (min_v.unwrap_or(0), max_v.unwrap_or(0))
333 }
334 MySQLTypeSystem::UTiny(_) => {
335 let min_v: Option<u8> = row
336 .get(0)
337 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
338 let max_v: Option<u8> = row
339 .get(1)
340 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
341 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
342 }
343 MySQLTypeSystem::UShort(_) => {
344 let min_v: Option<u16> = row
345 .get(0)
346 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
347 let max_v: Option<u16> = row
348 .get(1)
349 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
350 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
351 }
352 MySQLTypeSystem::UInt24(_) => {
353 let min_v: Option<u32> = row
354 .get(0)
355 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
356 let max_v: Option<u32> = row
357 .get(1)
358 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
359 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
360 }
361 MySQLTypeSystem::ULong(_) => {
362 let min_v: Option<u32> = row
363 .get(0)
364 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
365 let max_v: Option<u32> = row
366 .get(1)
367 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
368 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
369 }
370 MySQLTypeSystem::ULongLong(_) => {
371 let min_v: Option<u64> = row
372 .get(0)
373 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
374 let max_v: Option<u64> = row
375 .get(1)
376 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
377 (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
378 }
379 MySQLTypeSystem::Float(_) => {
380 let min_v: Option<f32> = row
381 .get(0)
382 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
383 let max_v: Option<f32> = row
384 .get(1)
385 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
386 (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
387 }
388 MySQLTypeSystem::Double(_) => {
389 let min_v: Option<f64> = row
390 .get(0)
391 .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
392 let max_v: Option<f64> = row
393 .get(1)
394 .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
395 (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
396 }
397 _ => throw!(anyhow!("Partition can only be done on int columns")),
398 };
399
400 (min_v, max_v)
401}
402
403#[cfg(feature = "src_mssql")]
404#[throws(ConnectorXOutError)]
405fn mssql_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
406 let rt = Runtime::new().expect("Failed to create runtime");
407 let config = mssql_config(conn)?;
408 let tcp = rt.block_on(TcpStream::connect(config.get_addr()))?;
409 tcp.set_nodelay(true)?;
410
411 let mut client = rt.block_on(Client::connect(config, tcp.compat_write()))?;
412
413 let range_query = get_partition_range_query(query, col, &MsSqlDialect {})?;
414 let query_result = rt.block_on(client.query(range_query.as_str(), &[]))?;
415 let row = rt.block_on(query_result.into_row())?.unwrap();
416
417 let col_type = MsSQLTypeSystem::from(&row.columns()[0].column_type());
418 let (min_v, max_v) = match col_type {
419 MsSQLTypeSystem::Tinyint(_) => {
420 let min_v: u8 = row.get(0).unwrap_or(0);
421 let max_v: u8 = row.get(1).unwrap_or(0);
422 (min_v as i64, max_v as i64)
423 }
424 MsSQLTypeSystem::Smallint(_) => {
425 let min_v: i16 = row.get(0).unwrap_or(0);
426 let max_v: i16 = row.get(1).unwrap_or(0);
427 (min_v as i64, max_v as i64)
428 }
429 MsSQLTypeSystem::Int(_) => {
430 let min_v: i32 = row.get(0).unwrap_or(0);
431 let max_v: i32 = row.get(1).unwrap_or(0);
432 (min_v as i64, max_v as i64)
433 }
434 MsSQLTypeSystem::Bigint(_) => {
435 let min_v: i64 = row.get(0).unwrap_or(0);
436 let max_v: i64 = row.get(1).unwrap_or(0);
437 (min_v, max_v)
438 }
439 MsSQLTypeSystem::Intn(_) => {
440 let min_v: IntN = row.get(0).unwrap_or(IntN(0));
441 let max_v: IntN = row.get(1).unwrap_or(IntN(0));
442 (min_v.0, max_v.0)
443 }
444 MsSQLTypeSystem::Float24(_) => {
445 let min_v: f32 = row.get(0).unwrap_or(0.0);
446 let max_v: f32 = row.get(1).unwrap_or(0.0);
447 (min_v as i64, max_v as i64)
448 }
449 MsSQLTypeSystem::Float53(_) => {
450 let min_v: f64 = row.get(0).unwrap_or(0.0);
451 let max_v: f64 = row.get(1).unwrap_or(0.0);
452 (min_v as i64, max_v as i64)
453 }
454 MsSQLTypeSystem::Floatn(_) => {
455 let min_v: FloatN = row.get(0).unwrap_or(FloatN(0.0));
456 let max_v: FloatN = row.get(1).unwrap_or(FloatN(0.0));
457 (min_v.0 as i64, max_v.0 as i64)
458 }
459 _ => throw!(anyhow!(
460 "Partition can only be done on int or float columns"
461 )),
462 };
463
464 (min_v, max_v)
465}
466
467#[cfg(feature = "src_oracle")]
468#[throws(ConnectorXOutError)]
469fn oracle_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
470 let source = OracleSource::new(conn.as_str(), 1)?;
471 let conn = source.get_conn()?;
472 let range_query = get_partition_range_query(query, col, &OracleDialect {})?;
473 let row = conn.query_row(range_query.as_str(), &[])?;
474 let min_v: i64 = row.get(0).unwrap_or(0);
475 let max_v: i64 = row.get(1).unwrap_or(0);
476 (min_v, max_v)
477}
478
479#[cfg(feature = "src_bigquery")]
480#[throws(ConnectorXOutError)] fn bigquery_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
482 let rt = Runtime::new().expect("Failed to create runtime");
483 let url = Url::parse(conn.as_str())?;
484 let sa_key_path = url.path();
485 let client = rt.block_on(gcp_bigquery_client::Client::from_service_account_key_file(
486 sa_key_path,
487 ))?;
488
489 let auth_data = std::fs::read_to_string(sa_key_path)?;
490 let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
491 let project_id = auth_json
492 .get("project_id")
493 .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
494 .as_str()
495 .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?;
496 let range_query = get_partition_range_query(query, col, &BigQueryDialect {})?;
497
498 let query_result = rt.block_on(client.job().query(
499 project_id,
500 gcp_bigquery_client::model::query_request::QueryRequest::new(range_query.as_str()),
501 ))?;
502 let mut rs = gcp_bigquery_client::model::query_response::ResultSet::new_from_query_response(
503 query_result,
504 );
505 rs.next_row();
506 let min_v = rs.get_i64(0)?.unwrap_or(0);
507 let max_v = rs.get_i64(1)?.unwrap_or(0);
508
509 (min_v, max_v)
510}
511
512#[cfg(feature = "src_trino")]
513#[throws(ConnectorXOutError)]
514fn trino_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
515 use prusto::{auth::Auth, ClientBuilder};
516
517 use crate::sources::trino::{TrinoDialect, TrinoPartitionQueryResult};
518
519 let rt = Runtime::new().expect("Failed to create runtime");
520
521 let username = match conn.username() {
522 "" => "connectorx",
523 username => username,
524 };
525
526 let builder = ClientBuilder::new(username, conn.host().unwrap().to_owned())
527 .port(conn.port().unwrap_or(8080))
528 .ssl(prusto::ssl::Ssl { root_cert: None })
529 .secure(conn.scheme() == "trino+https")
530 .catalog(conn.path_segments().unwrap().last().unwrap_or("hive"));
531
532 let builder = match conn.password() {
533 None => builder,
534 Some(password) => builder.auth(Auth::Basic(username.to_owned(), Some(password.to_owned()))),
535 };
536
537 let client = builder
538 .build()
539 .map_err(|e| anyhow!("Failed to build client: {}", e))?;
540
541 let range_query = get_partition_range_query(query, col, &TrinoDialect {})?;
542 let query_result = rt.block_on(client.get_all::<TrinoPartitionQueryResult>(range_query));
543
544 let query_result = match query_result {
545 Ok(query_result) => Ok(query_result.into_vec()),
546 Err(e) => match e {
547 prusto::error::Error::EmptyData => {
548 Ok(vec![TrinoPartitionQueryResult { _col0: 0, _col1: 0 }])
549 }
550 _ => Err(anyhow!("Failed to get query result: {}", e)),
551 },
552 }?;
553
554 let result = query_result
555 .first()
556 .unwrap_or(&TrinoPartitionQueryResult { _col0: 0, _col1: 0 });
557
558 (result._col0, result._col1)
559}
560
561#[cfg(feature = "src_clickhouse")]
562#[throws(ConnectorXOutError)]
563fn clickhouse_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
564 use sqlparser::dialect::ClickHouseDialect;
565
566 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
567 let clickhouse_source = ClickHouseSource::new(rt.clone(), conn.as_str())
568 .expect("Failed to create ClickHouse client");
569
570 let range_query = get_partition_range_query(query, col, &ClickHouseDialect {})?;
571
572 let response = rt.block_on(async {
573 let mut cursor = clickhouse_source
574 .client
575 .query(range_query.as_str())
576 .fetch_bytes("JSONCompact")
577 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
578 let bytes = cursor
579 .collect()
580 .await
581 .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
582 Ok::<_, ClickHouseSourceError>(bytes)
583 })?;
584
585 #[derive(Debug, Deserialize)]
586 struct MinMaxResponse {
587 data: Vec<Vec<JsonValue>>,
588 }
589
590 let parsed: MinMaxResponse = serde_json::from_slice(&response)
591 .map_err(|e| anyhow!("Failed to parse min max response: {}", e))?;
592
593 let (min_v, max_v) = if let Some(row) = parsed.data.first() {
594 let min_v = row.get(0).and_then(|v| v.as_i64()).unwrap_or(0);
595 let max_v = row.get(1).and_then(|v| v.as_i64()).unwrap_or(0);
596
597 (min_v, max_v)
598 } else {
599 (0, 0)
600 };
601
602 (min_v, max_v)
603}