1#[cfg(feature = "src_mysql")]
2use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol};
3#[cfg(feature = "src_postgres")]
4use crate::sources::postgres::{
5 rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6 SimpleProtocol,
7};
8use crate::{
9 arrow_batch_iter::{ArrowBatchIter, RecordBatchIterator},
10 prelude::*,
11 sql::CXQuery,
12};
13use fehler::{throw, throws};
14use log::debug;
15#[cfg(feature = "src_postgres")]
16use postgres::NoTls;
17#[cfg(feature = "src_postgres")]
18use postgres_openssl::MakeTlsConnector;
19#[allow(unused_imports)]
20use std::sync::Arc;
21
22#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
23#[throws(ConnectorXOutError)]
24pub fn get_arrow(
25 source_conn: &SourceConn,
26 origin_query: Option<String>,
27 queries: &[CXQuery<String>],
28 pre_execution_queries: Option<&[String]>,
29) -> ArrowDestination {
30 let mut destination = ArrowDestination::new();
31 let protocol = source_conn.proto.as_str();
32 debug!("Protocol: {}", protocol);
33
34 match source_conn.ty {
35 #[cfg(feature = "src_postgres")]
36 SourceType::Postgres => {
37 let (config, tls) = rewrite_tls_args(&source_conn.conn)?;
38 match (protocol, tls) {
39 ("csv", Some(tls_conn)) => {
40 let source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
41 config,
42 tls_conn,
43 queries.len(),
44 )?;
45 let mut dispatcher = Dispatcher::<
46 _,
47 _,
48 PostgresArrowTransport<CSVProtocol, MakeTlsConnector>,
49 >::new(
50 source, &mut destination, queries, origin_query
51 );
52 dispatcher.set_pre_execution_queries(pre_execution_queries);
53 dispatcher.run()?;
54 }
55 ("csv", None) => {
56 let source =
57 PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())?;
58 let mut dispatcher = Dispatcher::<
59 _,
60 _,
61 PostgresArrowTransport<CSVProtocol, NoTls>,
62 >::new(
63 source, &mut destination, queries, origin_query
64 );
65 dispatcher.set_pre_execution_queries(pre_execution_queries);
66 dispatcher.run()?;
67 }
68 ("binary", Some(tls_conn)) => {
69 let source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
70 config,
71 tls_conn,
72 queries.len(),
73 )?;
74 let mut dispatcher = Dispatcher::<
75 _,
76 _,
77 PostgresArrowTransport<PgBinaryProtocol, MakeTlsConnector>,
78 >::new(
79 source, &mut destination, queries, origin_query
80 );
81 dispatcher.set_pre_execution_queries(pre_execution_queries);
82 dispatcher.run()?;
83 }
84 ("binary", None) => {
85 let source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
86 config,
87 NoTls,
88 queries.len(),
89 )?;
90 let mut dispatcher = Dispatcher::<
91 _,
92 _,
93 PostgresArrowTransport<PgBinaryProtocol, NoTls>,
94 >::new(
95 source, &mut destination, queries, origin_query
96 );
97 dispatcher.set_pre_execution_queries(pre_execution_queries);
98 dispatcher.run()?;
99 }
100 ("cursor", Some(tls_conn)) => {
101 let source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
102 config,
103 tls_conn,
104 queries.len(),
105 )?;
106 let mut dispatcher = Dispatcher::<
107 _,
108 _,
109 PostgresArrowTransport<CursorProtocol, MakeTlsConnector>,
110 >::new(
111 source, &mut destination, queries, origin_query
112 );
113 dispatcher.set_pre_execution_queries(pre_execution_queries);
114 dispatcher.run()?;
115 }
116 ("cursor", None) => {
117 let source =
118 PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())?;
119 let mut dispatcher = Dispatcher::<
120 _,
121 _,
122 PostgresArrowTransport<CursorProtocol, NoTls>,
123 >::new(
124 source, &mut destination, queries, origin_query
125 );
126 dispatcher.set_pre_execution_queries(pre_execution_queries);
127 dispatcher.run()?;
128 }
129 ("simple", Some(tls_conn)) => {
130 let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
131 config,
132 tls_conn,
133 queries.len(),
134 )?;
135 let mut dispatcher = Dispatcher::<
136 _,
137 _,
138 PostgresArrowTransport<SimpleProtocol, MakeTlsConnector>,
139 >::new(
140 sb, &mut destination, queries, origin_query
141 );
142 debug!("Running dispatcher");
143 dispatcher.set_pre_execution_queries(pre_execution_queries);
144 dispatcher.run()?;
145 }
146 ("simple", None) => {
147 let sb =
148 PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
149 let mut dispatcher = Dispatcher::<
150 _,
151 _,
152 PostgresArrowTransport<SimpleProtocol, NoTls>,
153 >::new(
154 sb, &mut destination, queries, origin_query
155 );
156 debug!("Running dispatcher");
157 dispatcher.set_pre_execution_queries(pre_execution_queries);
158 dispatcher.run()?;
159 }
160 _ => unimplemented!("{} protocol not supported", protocol),
161 }
162 }
163 #[cfg(feature = "src_mysql")]
164 SourceType::MySQL => match protocol {
165 "binary" => {
166 let source =
167 MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())?;
168 let mut dispatcher =
169 Dispatcher::<_, _, MySQLArrowTransport<MySQLBinaryProtocol>>::new(
170 source,
171 &mut destination,
172 queries,
173 origin_query,
174 );
175 dispatcher.set_pre_execution_queries(pre_execution_queries);
176 dispatcher.run()?;
177 }
178 "text" => {
179 let source =
180 MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len())?;
181 let mut dispatcher = Dispatcher::<_, _, MySQLArrowTransport<TextProtocol>>::new(
182 source,
183 &mut destination,
184 queries,
185 origin_query,
186 );
187 dispatcher.set_pre_execution_queries(pre_execution_queries);
188 dispatcher.run()?;
189 }
190 _ => unimplemented!("{} protocol not supported", protocol),
191 },
192 #[cfg(feature = "src_sqlite")]
193 SourceType::SQLite => {
194 let path = &source_conn.conn.as_str()[9..];
196 let source = SQLiteSource::new(path, queries.len())?;
197 let dispatcher = Dispatcher::<_, _, SQLiteArrowTransport>::new(
198 source,
199 &mut destination,
200 queries,
201 origin_query,
202 );
203 dispatcher.run()?;
204 }
205 #[cfg(feature = "src_mssql")]
206 SourceType::MsSQL => {
207 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
208 let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len())?;
209 let dispatcher = Dispatcher::<_, _, MsSQLArrowTransport>::new(
210 source,
211 &mut destination,
212 queries,
213 origin_query,
214 );
215 dispatcher.run()?;
216 }
217 #[cfg(feature = "src_oracle")]
218 SourceType::Oracle => {
219 let source = OracleSource::new(&source_conn.conn[..], queries.len())?;
220 let dispatcher = Dispatcher::<_, _, OracleArrowTransport>::new(
221 source,
222 &mut destination,
223 queries,
224 origin_query,
225 );
226 dispatcher.run()?;
227 }
228 #[cfg(feature = "src_bigquery")]
229 SourceType::BigQuery => {
230 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
231 let source = BigQuerySource::new(rt, &source_conn.conn[..])?;
232 let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new(
233 source,
234 &mut destination,
235 queries,
236 origin_query,
237 );
238 dispatcher.run()?;
239 }
240 #[cfg(feature = "src_trino")]
241 SourceType::Trino => {
242 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
243 let source = TrinoSource::new(rt, &source_conn.conn[..])?;
244 let dispatcher = Dispatcher::<_, _, TrinoArrowTransport>::new(
245 source,
246 &mut destination,
247 queries,
248 origin_query,
249 );
250 dispatcher.run()?;
251 }
252 #[cfg(feature = "src_clickhouse")]
253 SourceType::ClickHouse => {
254 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
255 let source = ClickHouseSource::new(rt, &source_conn.conn[..])?;
256 let dispatcher = Dispatcher::<_, _, ClickHouseArrowTransport>::new(
257 source,
258 &mut destination,
259 queries,
260 origin_query,
261 );
262 dispatcher.run()?;
263 }
264 _ => throw!(ConnectorXOutError::SourceNotSupport(format!(
265 "{:?}",
266 source_conn.ty
267 ))),
268 }
269
270 destination
271}
272
273#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
274pub fn new_record_batch_iter(
275 source_conn: &SourceConn,
276 origin_query: Option<String>,
277 queries: &[CXQuery<String>],
278 batch_size: usize,
279 pre_execution_queries: Option<&[String]>,
280) -> Box<dyn RecordBatchIterator> {
281 let destination = ArrowStreamDestination::new_with_batch_size(batch_size);
282 let protocol = source_conn.proto.as_str();
283 debug!("Protocol: {}", protocol);
284
285 match source_conn.ty {
286 #[cfg(feature = "src_postgres")]
287 SourceType::Postgres => {
288 let (config, tls) = rewrite_tls_args(&source_conn.conn).unwrap();
289 match (protocol, tls) {
290 ("csv", Some(tls_conn)) => {
291 let mut source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
292 config,
293 tls_conn,
294 queries.len(),
295 )
296 .unwrap();
297
298 source.set_pre_execution_queries(pre_execution_queries);
299
300 let batch_iter =
301 ArrowBatchIter::<
302 _,
303 PostgresArrowStreamTransport<CSVProtocol, MakeTlsConnector>,
304 >::new(source, destination, origin_query, queries)
305 .unwrap();
306 return Box::new(batch_iter);
307 }
308 ("csv", None) => {
309 let mut source =
310 PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())
311 .unwrap();
312
313 source.set_pre_execution_queries(pre_execution_queries);
314
315 let batch_iter = ArrowBatchIter::<
316 _,
317 PostgresArrowStreamTransport<CSVProtocol, NoTls>,
318 >::new(
319 source, destination, origin_query, queries
320 )
321 .unwrap();
322 return Box::new(batch_iter);
323 }
324 ("binary", Some(tls_conn)) => {
325 let mut source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
326 config,
327 tls_conn,
328 queries.len(),
329 )
330 .unwrap();
331
332 source.set_pre_execution_queries(pre_execution_queries);
333
334 let batch_iter =
335 ArrowBatchIter::<
336 _,
337 PostgresArrowStreamTransport<PgBinaryProtocol, MakeTlsConnector>,
338 >::new(source, destination, origin_query, queries)
339 .unwrap();
340 return Box::new(batch_iter);
341 }
342 ("binary", None) => {
343 let mut source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
344 config,
345 NoTls,
346 queries.len(),
347 )
348 .unwrap();
349
350 source.set_pre_execution_queries(pre_execution_queries);
351
352 let batch_iter = ArrowBatchIter::<
353 _,
354 PostgresArrowStreamTransport<PgBinaryProtocol, NoTls>,
355 >::new(
356 source, destination, origin_query, queries
357 )
358 .unwrap();
359 return Box::new(batch_iter);
360 }
361 ("cursor", Some(tls_conn)) => {
362 let mut source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
363 config,
364 tls_conn,
365 queries.len(),
366 )
367 .unwrap();
368
369 source.set_pre_execution_queries(pre_execution_queries);
370
371 let batch_iter =
372 ArrowBatchIter::<
373 _,
374 PostgresArrowStreamTransport<CursorProtocol, MakeTlsConnector>,
375 >::new(source, destination, origin_query, queries)
376 .unwrap();
377 return Box::new(batch_iter);
378 }
379 ("cursor", None) => {
380 let mut source =
381 PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())
382 .unwrap();
383
384 source.set_pre_execution_queries(pre_execution_queries);
385
386 let batch_iter = ArrowBatchIter::<
387 _,
388 PostgresArrowStreamTransport<CursorProtocol, NoTls>,
389 >::new(
390 source, destination, origin_query, queries
391 )
392 .unwrap();
393 return Box::new(batch_iter);
394 }
395 _ => unimplemented!("{} protocol not supported", protocol),
396 }
397 }
398 #[cfg(feature = "src_mysql")]
399 SourceType::MySQL => match protocol {
400 "binary" => {
401 let mut source =
402 MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())
403 .unwrap();
404
405 source.set_pre_execution_queries(pre_execution_queries);
406
407 let batch_iter =
408 ArrowBatchIter::<_, MySQLArrowStreamTransport<MySQLBinaryProtocol>>::new(
409 source,
410 destination,
411 origin_query,
412 queries,
413 )
414 .unwrap();
415 return Box::new(batch_iter);
416 }
417 "text" => {
418 let mut source =
419 MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len()).unwrap();
420
421 source.set_pre_execution_queries(pre_execution_queries);
422
423 let batch_iter = ArrowBatchIter::<_, MySQLArrowStreamTransport<TextProtocol>>::new(
424 source,
425 destination,
426 origin_query,
427 queries,
428 )
429 .unwrap();
430 return Box::new(batch_iter);
431 }
432 _ => unimplemented!("{} protocol not supported", protocol),
433 },
434 #[cfg(feature = "src_sqlite")]
435 SourceType::SQLite => {
436 let path = &source_conn.conn.as_str()[9..];
438 let source = SQLiteSource::new(path, queries.len()).unwrap();
439 let batch_iter = ArrowBatchIter::<_, SQLiteArrowStreamTransport>::new(
440 source,
441 destination,
442 origin_query,
443 queries,
444 )
445 .unwrap();
446 return Box::new(batch_iter);
447 }
448 #[cfg(feature = "src_mssql")]
449 SourceType::MsSQL => {
450 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
451 let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len()).unwrap();
452 let batch_iter = ArrowBatchIter::<_, MsSQLArrowStreamTransport>::new(
453 source,
454 destination,
455 origin_query,
456 queries,
457 )
458 .unwrap();
459 return Box::new(batch_iter);
460 }
461 #[cfg(feature = "src_oracle")]
462 SourceType::Oracle => {
463 let source = OracleSource::new(&source_conn.conn[..], queries.len()).unwrap();
464 let batch_iter = ArrowBatchIter::<_, OracleArrowStreamTransport>::new(
465 source,
466 destination,
467 origin_query,
468 queries,
469 )
470 .unwrap();
471 return Box::new(batch_iter);
472 }
473 #[cfg(feature = "src_bigquery")]
474 SourceType::BigQuery => {
475 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
476 let source = BigQuerySource::new(rt, &source_conn.conn[..]).unwrap();
477 let batch_iter = ArrowBatchIter::<_, BigQueryArrowStreamTransport>::new(
478 source,
479 destination,
480 origin_query,
481 queries,
482 )
483 .unwrap();
484 return Box::new(batch_iter);
485 }
486 #[cfg(feature = "src_clickhouse")]
487 SourceType::ClickHouse => {
488 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
489 let source = ClickHouseSource::new(rt, &source_conn.conn[..]).unwrap();
490 let batch_iter = ArrowBatchIter::<_, ClickHouseArrowStreamTransport>::new(
491 source,
492 destination,
493 origin_query,
494 queries,
495 )
496 .unwrap();
497 return Box::new(batch_iter);
498 }
499 _ => {}
500 }
501 panic!("not supported!");
502}