connectorx/
partition.rs

1use std::sync::Arc;
2
3use crate::errors::{ConnectorXOutError, OutResult};
4use crate::source_router::{SourceConn, SourceType};
5#[cfg(feature = "src_bigquery")]
6use crate::sources::bigquery::BigQueryDialect;
7#[cfg(feature = "src_clickhouse")]
8use crate::sources::clickhouse::{ClickHouseSource, ClickHouseSourceError};
9#[cfg(feature = "src_mssql")]
10use crate::sources::mssql::{mssql_config, FloatN, IntN, MsSQLTypeSystem};
11#[cfg(feature = "src_mysql")]
12use crate::sources::mysql::{MySQLSourceError, MySQLTypeSystem};
13#[cfg(feature = "src_oracle")]
14use crate::sources::oracle::{OracleDialect, OracleSource};
15#[cfg(feature = "src_postgres")]
16use crate::sources::postgres::{rewrite_tls_args, PostgresTypeSystem};
17#[cfg(feature = "src_trino")]
18use crate::sources::trino::TrinoDialect;
19#[cfg(feature = "src_sqlite")]
20use crate::sql::get_partition_range_query_sep;
21use crate::sql::{get_partition_range_query, single_col_partition_query, CXQuery};
22use anyhow::anyhow;
23use fehler::{throw, throws};
24#[cfg(feature = "src_bigquery")]
25use gcp_bigquery_client;
26#[cfg(feature = "src_mysql")]
27use r2d2_mysql::mysql::{prelude::Queryable, Opts, Pool, Row};
28#[cfg(feature = "src_sqlite")]
29use rusqlite::{types::Type, Connection};
30#[cfg(feature = "src_postgres")]
31use rust_decimal::{prelude::ToPrimitive, Decimal};
32#[cfg(feature = "src_postgres")]
33use rust_decimal_macros::dec;
34#[cfg(feature = "src_clickhouse")]
35use serde::Deserialize;
36#[cfg(feature = "src_clickhouse")]
37use serde_json::Value as JsonValue;
38#[cfg(feature = "src_clickhouse")]
39use sqlparser::dialect::ClickHouseDialect;
40#[cfg(feature = "src_mssql")]
41use sqlparser::dialect::MsSqlDialect;
42#[cfg(feature = "src_mysql")]
43use sqlparser::dialect::MySqlDialect;
44#[cfg(feature = "src_postgres")]
45use sqlparser::dialect::PostgreSqlDialect;
46#[cfg(feature = "src_sqlite")]
47use sqlparser::dialect::SQLiteDialect;
48#[cfg(feature = "src_mssql")]
49use tiberius::Client;
50#[cfg(any(feature = "src_bigquery", feature = "src_mssql", feature = "src_trino"))]
51use tokio::{net::TcpStream, runtime::Runtime};
52#[cfg(feature = "src_mssql")]
53use tokio_util::compat::TokioAsyncWriteCompatExt;
54use url::Url;
55
56pub struct PartitionQuery {
57    query: String,
58    column: String,
59    min: Option<i64>,
60    max: Option<i64>,
61    num: usize,
62}
63
64impl PartitionQuery {
65    pub fn new(query: &str, column: &str, min: Option<i64>, max: Option<i64>, num: usize) -> Self {
66        Self {
67            query: query.into(),
68            column: column.into(),
69            min,
70            max,
71            num,
72        }
73    }
74}
75
76pub fn partition(part: &PartitionQuery, source_conn: &SourceConn) -> OutResult<Vec<CXQuery>> {
77    let mut queries = vec![];
78    let num = part.num as i64;
79    let (min, max) = match (part.min, part.max) {
80        (None, None) => get_col_range(source_conn, &part.query, &part.column)?,
81        (Some(min), Some(max)) => (min, max),
82        _ => throw!(anyhow!(
83            "partition_query range can not be partially specified",
84        )),
85    };
86
87    let partition_size = (max - min + 1) / num;
88
89    for i in 0..num {
90        let lower = min + i * partition_size;
91        let upper = match i == num - 1 {
92            true => max + 1,
93            false => min + (i + 1) * partition_size,
94        };
95        let partition_query = get_part_query(source_conn, &part.query, &part.column, lower, upper)?;
96        queries.push(partition_query);
97    }
98    Ok(queries)
99}
100
101pub fn get_col_range(source_conn: &SourceConn, query: &str, col: &str) -> OutResult<(i64, i64)> {
102    match source_conn.ty {
103        #[cfg(feature = "src_postgres")]
104        SourceType::Postgres => pg_get_partition_range(&source_conn.conn, query, col),
105        #[cfg(feature = "src_sqlite")]
106        SourceType::SQLite => sqlite_get_partition_range(&source_conn.conn, query, col),
107        #[cfg(feature = "src_mysql")]
108        SourceType::MySQL => mysql_get_partition_range(&source_conn.conn, query, col),
109        #[cfg(feature = "src_mssql")]
110        SourceType::MsSQL => mssql_get_partition_range(&source_conn.conn, query, col),
111        #[cfg(feature = "src_oracle")]
112        SourceType::Oracle => oracle_get_partition_range(&source_conn.conn, query, col),
113        #[cfg(feature = "src_bigquery")]
114        SourceType::BigQuery => bigquery_get_partition_range(&source_conn.conn, query, col),
115        #[cfg(feature = "src_trino")]
116        SourceType::Trino => trino_get_partition_range(&source_conn.conn, query, col),
117        #[cfg(feature = "src_clickhouse")]
118        SourceType::ClickHouse => clickhouse_get_partition_range(&source_conn.conn, query, col),
119        _ => unimplemented!("{:?} not implemented!", source_conn.ty),
120    }
121}
122
123#[throws(ConnectorXOutError)]
124pub fn get_part_query(
125    source_conn: &SourceConn,
126    query: &str,
127    col: &str,
128    lower: i64,
129    upper: i64,
130) -> CXQuery<String> {
131    let query = match source_conn.ty {
132        #[cfg(feature = "src_postgres")]
133        SourceType::Postgres => {
134            single_col_partition_query(query, col, lower, upper, &PostgreSqlDialect {})?
135        }
136        #[cfg(feature = "src_sqlite")]
137        SourceType::SQLite => {
138            single_col_partition_query(query, col, lower, upper, &SQLiteDialect {})?
139        }
140        #[cfg(feature = "src_mysql")]
141        SourceType::MySQL => {
142            single_col_partition_query(query, col, lower, upper, &MySqlDialect {})?
143        }
144        #[cfg(feature = "src_mssql")]
145        SourceType::MsSQL => {
146            single_col_partition_query(query, col, lower, upper, &MsSqlDialect {})?
147        }
148        #[cfg(feature = "src_oracle")]
149        SourceType::Oracle => {
150            single_col_partition_query(query, col, lower, upper, &OracleDialect {})?
151        }
152        #[cfg(feature = "src_bigquery")]
153        SourceType::BigQuery => {
154            single_col_partition_query(query, col, lower, upper, &BigQueryDialect {})?
155        }
156        #[cfg(feature = "src_trino")]
157        SourceType::Trino => {
158            single_col_partition_query(query, col, lower, upper, &TrinoDialect {})?
159        }
160        #[cfg(feature = "src_clickhouse")]
161        SourceType::ClickHouse => {
162            single_col_partition_query(query, col, lower, upper, &ClickHouseDialect {})?
163        }
164        _ => unimplemented!("{:?} not implemented!", source_conn.ty),
165    };
166    CXQuery::Wrapped(query)
167}
168
169#[cfg(feature = "src_postgres")]
170#[throws(ConnectorXOutError)]
171fn pg_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
172    let (config, tls) = rewrite_tls_args(conn)?;
173    let mut client = match tls {
174        None => config.connect(postgres::NoTls)?,
175        Some(tls_conn) => config.connect(tls_conn)?,
176    };
177    let range_query = get_partition_range_query(query, col, &PostgreSqlDialect {})?;
178    let row = client.query_one(range_query.as_str(), &[])?;
179
180    let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
181    let (min_v, max_v) = match col_type {
182        PostgresTypeSystem::Int2(_) => {
183            let min_v: Option<i16> = row.get(0);
184            let max_v: Option<i16> = row.get(1);
185            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
186        }
187        PostgresTypeSystem::Int4(_) => {
188            let min_v: Option<i32> = row.get(0);
189            let max_v: Option<i32> = row.get(1);
190            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
191        }
192        PostgresTypeSystem::Int8(_) => {
193            let min_v: Option<i64> = row.get(0);
194            let max_v: Option<i64> = row.get(1);
195            (min_v.unwrap_or(0), max_v.unwrap_or(0))
196        }
197        PostgresTypeSystem::Float4(_) => {
198            let min_v: Option<f32> = row.get(0);
199            let max_v: Option<f32> = row.get(1);
200            (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
201        }
202        PostgresTypeSystem::Float8(_) => {
203            let min_v: Option<f64> = row.get(0);
204            let max_v: Option<f64> = row.get(1);
205            (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
206        }
207        PostgresTypeSystem::Numeric(_) => {
208            let min_v: Option<Decimal> = row.get(0);
209            let max_v: Option<Decimal> = row.get(1);
210            (
211                min_v.unwrap_or(dec!(0.0)).to_i64().unwrap_or(0),
212                max_v.unwrap_or(dec!(0.0)).to_i64().unwrap_or(0),
213            )
214        }
215        _ => throw!(anyhow!(
216            "Partition can only be done on int or float columns"
217        )),
218    };
219
220    (min_v, max_v)
221}
222
223#[cfg(feature = "src_sqlite")]
224#[throws(ConnectorXOutError)]
225fn sqlite_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
226    // remove the first "sqlite://" manually since url.path is not correct for windows and for relative path
227    let conn = Connection::open(&conn.as_str()[9..])?;
228    // SQLite only optimize min max queries when there is only one aggregation
229    // https://www.sqlite.org/optoverview.html#minmax
230    let (min_query, max_query) = get_partition_range_query_sep(query, col, &SQLiteDialect {})?;
231    let mut error = None;
232    let min_v = conn.query_row(min_query.as_str(), [], |row| {
233        // declare type for count query will be None, only need to check the returned value type
234        let col_type = row.get_ref(0)?.data_type();
235        match col_type {
236            Type::Integer => row.get(0),
237            Type::Real => {
238                let v: f64 = row.get(0)?;
239                Ok(v as i64)
240            }
241            Type::Null => Ok(0),
242            _ => {
243                error = Some(anyhow!("Partition can only be done on integer columns"));
244                Ok(0)
245            }
246        }
247    })?;
248    match error {
249        None => {}
250        Some(e) => throw!(e),
251    }
252    let max_v = conn.query_row(max_query.as_str(), [], |row| {
253        let col_type = row.get_ref(0)?.data_type();
254        match col_type {
255            Type::Integer => row.get(0),
256            Type::Real => {
257                let v: f64 = row.get(0)?;
258                Ok(v as i64)
259            }
260            Type::Null => Ok(0),
261            _ => {
262                error = Some(anyhow!("Partition can only be done on integer columns"));
263                Ok(0)
264            }
265        }
266    })?;
267    match error {
268        None => {}
269        Some(e) => throw!(e),
270    }
271
272    (min_v, max_v)
273}
274
275#[cfg(feature = "src_mysql")]
276#[throws(ConnectorXOutError)]
277fn mysql_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
278    let pool = Pool::new(Opts::from_url(conn.as_str()).map_err(MySQLSourceError::MySQLUrlError)?)?;
279    let mut conn = pool.get_conn()?;
280    let range_query = get_partition_range_query(query, col, &MySqlDialect {})?;
281    let row: Row = conn
282        .query_first(range_query)?
283        .ok_or_else(|| anyhow!("mysql range: no row returns"))?;
284
285    let col_type =
286        MySQLTypeSystem::from((&row.columns()[0].column_type(), &row.columns()[0].flags()));
287
288    let (min_v, max_v) = match col_type {
289        MySQLTypeSystem::Tiny(_) => {
290            let min_v: Option<i8> = row
291                .get(0)
292                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
293            let max_v: Option<i8> = row
294                .get(1)
295                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
296            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
297        }
298        MySQLTypeSystem::Short(_) => {
299            let min_v: Option<i16> = row
300                .get(0)
301                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
302            let max_v: Option<i16> = row
303                .get(1)
304                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
305            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
306        }
307        MySQLTypeSystem::Int24(_) => {
308            let min_v: Option<i32> = row
309                .get(0)
310                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
311            let max_v: Option<i32> = row
312                .get(1)
313                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
314            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
315        }
316        MySQLTypeSystem::Long(_) => {
317            let min_v: Option<i64> = row
318                .get(0)
319                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
320            let max_v: Option<i64> = row
321                .get(1)
322                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
323            (min_v.unwrap_or(0), max_v.unwrap_or(0))
324        }
325        MySQLTypeSystem::LongLong(_) => {
326            let min_v: Option<i64> = row
327                .get(0)
328                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
329            let max_v: Option<i64> = row
330                .get(1)
331                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
332            (min_v.unwrap_or(0), max_v.unwrap_or(0))
333        }
334        MySQLTypeSystem::UTiny(_) => {
335            let min_v: Option<u8> = row
336                .get(0)
337                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
338            let max_v: Option<u8> = row
339                .get(1)
340                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
341            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
342        }
343        MySQLTypeSystem::UShort(_) => {
344            let min_v: Option<u16> = row
345                .get(0)
346                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
347            let max_v: Option<u16> = row
348                .get(1)
349                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
350            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
351        }
352        MySQLTypeSystem::UInt24(_) => {
353            let min_v: Option<u32> = row
354                .get(0)
355                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
356            let max_v: Option<u32> = row
357                .get(1)
358                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
359            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
360        }
361        MySQLTypeSystem::ULong(_) => {
362            let min_v: Option<u32> = row
363                .get(0)
364                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
365            let max_v: Option<u32> = row
366                .get(1)
367                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
368            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
369        }
370        MySQLTypeSystem::ULongLong(_) => {
371            let min_v: Option<u64> = row
372                .get(0)
373                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
374            let max_v: Option<u64> = row
375                .get(1)
376                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
377            (min_v.unwrap_or(0) as i64, max_v.unwrap_or(0) as i64)
378        }
379        MySQLTypeSystem::Float(_) => {
380            let min_v: Option<f32> = row
381                .get(0)
382                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
383            let max_v: Option<f32> = row
384                .get(1)
385                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
386            (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
387        }
388        MySQLTypeSystem::Double(_) => {
389            let min_v: Option<f64> = row
390                .get(0)
391                .ok_or_else(|| anyhow!("mysql range: cannot get min value"))?;
392            let max_v: Option<f64> = row
393                .get(1)
394                .ok_or_else(|| anyhow!("mysql range: cannot get max value"))?;
395            (min_v.unwrap_or(0.0) as i64, max_v.unwrap_or(0.0) as i64)
396        }
397        _ => throw!(anyhow!("Partition can only be done on int columns")),
398    };
399
400    (min_v, max_v)
401}
402
403#[cfg(feature = "src_mssql")]
404#[throws(ConnectorXOutError)]
405fn mssql_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
406    let rt = Runtime::new().expect("Failed to create runtime");
407    let config = mssql_config(conn)?;
408    let tcp = rt.block_on(TcpStream::connect(config.get_addr()))?;
409    tcp.set_nodelay(true)?;
410
411    let mut client = rt.block_on(Client::connect(config, tcp.compat_write()))?;
412
413    let range_query = get_partition_range_query(query, col, &MsSqlDialect {})?;
414    let query_result = rt.block_on(client.query(range_query.as_str(), &[]))?;
415    let row = rt.block_on(query_result.into_row())?.unwrap();
416
417    let col_type = MsSQLTypeSystem::from(&row.columns()[0].column_type());
418    let (min_v, max_v) = match col_type {
419        MsSQLTypeSystem::Tinyint(_) => {
420            let min_v: u8 = row.get(0).unwrap_or(0);
421            let max_v: u8 = row.get(1).unwrap_or(0);
422            (min_v as i64, max_v as i64)
423        }
424        MsSQLTypeSystem::Smallint(_) => {
425            let min_v: i16 = row.get(0).unwrap_or(0);
426            let max_v: i16 = row.get(1).unwrap_or(0);
427            (min_v as i64, max_v as i64)
428        }
429        MsSQLTypeSystem::Int(_) => {
430            let min_v: i32 = row.get(0).unwrap_or(0);
431            let max_v: i32 = row.get(1).unwrap_or(0);
432            (min_v as i64, max_v as i64)
433        }
434        MsSQLTypeSystem::Bigint(_) => {
435            let min_v: i64 = row.get(0).unwrap_or(0);
436            let max_v: i64 = row.get(1).unwrap_or(0);
437            (min_v, max_v)
438        }
439        MsSQLTypeSystem::Intn(_) => {
440            let min_v: IntN = row.get(0).unwrap_or(IntN(0));
441            let max_v: IntN = row.get(1).unwrap_or(IntN(0));
442            (min_v.0, max_v.0)
443        }
444        MsSQLTypeSystem::Float24(_) => {
445            let min_v: f32 = row.get(0).unwrap_or(0.0);
446            let max_v: f32 = row.get(1).unwrap_or(0.0);
447            (min_v as i64, max_v as i64)
448        }
449        MsSQLTypeSystem::Float53(_) => {
450            let min_v: f64 = row.get(0).unwrap_or(0.0);
451            let max_v: f64 = row.get(1).unwrap_or(0.0);
452            (min_v as i64, max_v as i64)
453        }
454        MsSQLTypeSystem::Floatn(_) => {
455            let min_v: FloatN = row.get(0).unwrap_or(FloatN(0.0));
456            let max_v: FloatN = row.get(1).unwrap_or(FloatN(0.0));
457            (min_v.0 as i64, max_v.0 as i64)
458        }
459        _ => throw!(anyhow!(
460            "Partition can only be done on int or float columns"
461        )),
462    };
463
464    (min_v, max_v)
465}
466
467#[cfg(feature = "src_oracle")]
468#[throws(ConnectorXOutError)]
469fn oracle_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
470    let source = OracleSource::new(conn.as_str(), 1)?;
471    let conn = source.get_conn()?;
472    let range_query = get_partition_range_query(query, col, &OracleDialect {})?;
473    let row = conn.query_row(range_query.as_str(), &[])?;
474    let min_v: i64 = row.get(0).unwrap_or(0);
475    let max_v: i64 = row.get(1).unwrap_or(0);
476    (min_v, max_v)
477}
478
479#[cfg(feature = "src_bigquery")]
480#[throws(ConnectorXOutError)] // TODO
481fn bigquery_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
482    let rt = Runtime::new().expect("Failed to create runtime");
483    let url = Url::parse(conn.as_str())?;
484    let sa_key_path = url.path();
485    let client = rt.block_on(gcp_bigquery_client::Client::from_service_account_key_file(
486        sa_key_path,
487    ))?;
488
489    let auth_data = std::fs::read_to_string(sa_key_path)?;
490    let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
491    let project_id = auth_json
492        .get("project_id")
493        .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
494        .as_str()
495        .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?;
496    let range_query = get_partition_range_query(query, col, &BigQueryDialect {})?;
497
498    let query_result = rt.block_on(client.job().query(
499        project_id,
500        gcp_bigquery_client::model::query_request::QueryRequest::new(range_query.as_str()),
501    ))?;
502    let mut rs = gcp_bigquery_client::model::query_response::ResultSet::new_from_query_response(
503        query_result,
504    );
505    rs.next_row();
506    let min_v = rs.get_i64(0)?.unwrap_or(0);
507    let max_v = rs.get_i64(1)?.unwrap_or(0);
508
509    (min_v, max_v)
510}
511
512#[cfg(feature = "src_trino")]
513#[throws(ConnectorXOutError)]
514fn trino_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
515    use prusto::{auth::Auth, ClientBuilder};
516
517    use crate::sources::trino::{TrinoDialect, TrinoPartitionQueryResult};
518
519    let rt = Runtime::new().expect("Failed to create runtime");
520
521    let username = match conn.username() {
522        "" => "connectorx",
523        username => username,
524    };
525
526    let builder = ClientBuilder::new(username, conn.host().unwrap().to_owned())
527        .port(conn.port().unwrap_or(8080))
528        .ssl(prusto::ssl::Ssl { root_cert: None })
529        .secure(conn.scheme() == "trino+https")
530        .catalog(conn.path_segments().unwrap().last().unwrap_or("hive"));
531
532    let builder = match conn.password() {
533        None => builder,
534        Some(password) => builder.auth(Auth::Basic(username.to_owned(), Some(password.to_owned()))),
535    };
536
537    let client = builder
538        .build()
539        .map_err(|e| anyhow!("Failed to build client: {}", e))?;
540
541    let range_query = get_partition_range_query(query, col, &TrinoDialect {})?;
542    let query_result = rt.block_on(client.get_all::<TrinoPartitionQueryResult>(range_query));
543
544    let query_result = match query_result {
545        Ok(query_result) => Ok(query_result.into_vec()),
546        Err(e) => match e {
547            prusto::error::Error::EmptyData => {
548                Ok(vec![TrinoPartitionQueryResult { _col0: 0, _col1: 0 }])
549            }
550            _ => Err(anyhow!("Failed to get query result: {}", e)),
551        },
552    }?;
553
554    let result = query_result
555        .first()
556        .unwrap_or(&TrinoPartitionQueryResult { _col0: 0, _col1: 0 });
557
558    (result._col0, result._col1)
559}
560
561#[cfg(feature = "src_clickhouse")]
562#[throws(ConnectorXOutError)]
563fn clickhouse_get_partition_range(conn: &Url, query: &str, col: &str) -> (i64, i64) {
564    use sqlparser::dialect::ClickHouseDialect;
565
566    let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
567    let clickhouse_source = ClickHouseSource::new(rt.clone(), conn.as_str())
568        .expect("Failed to create ClickHouse client");
569
570    let range_query = get_partition_range_query(query, col, &ClickHouseDialect {})?;
571
572    let response = rt.block_on(async {
573        let mut cursor = clickhouse_source
574            .client
575            .query(range_query.as_str())
576            .fetch_bytes("JSONCompact")
577            .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
578        let bytes = cursor
579            .collect()
580            .await
581            .map_err(|e| anyhow!("ClickHouse error: {}", e))?;
582        Ok::<_, ClickHouseSourceError>(bytes)
583    })?;
584
585    #[derive(Debug, Deserialize)]
586    struct MinMaxResponse {
587        data: Vec<Vec<JsonValue>>,
588    }
589
590    let parsed: MinMaxResponse = serde_json::from_slice(&response)
591        .map_err(|e| anyhow!("Failed to parse min max response: {}", e))?;
592
593    let (min_v, max_v) = if let Some(row) = parsed.data.first() {
594        let min_v = row.get(0).and_then(|v| v.as_i64()).unwrap_or(0);
595        let max_v = row.get(1).and_then(|v| v.as_i64()).unwrap_or(0);
596
597        (min_v, max_v)
598    } else {
599        (0, 0)
600    };
601
602    (min_v, max_v)
603}