1#[cfg(feature = "src_mysql")]
2use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol};
3#[cfg(feature = "src_postgres")]
4use crate::sources::postgres::{
5 rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6 SimpleProtocol,
7};
8use crate::{
9 arrow_batch_iter::{ArrowBatchIter, RecordBatchIterator},
10 prelude::*,
11 sql::CXQuery,
12};
13use fehler::{throw, throws};
14use log::debug;
15#[cfg(feature = "src_postgres")]
16use postgres::NoTls;
17#[cfg(feature = "src_postgres")]
18use postgres_openssl::MakeTlsConnector;
19#[allow(unused_imports)]
20use std::sync::Arc;
21
22#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
23#[throws(ConnectorXOutError)]
24pub fn get_arrow(
25 source_conn: &SourceConn,
26 origin_query: Option<String>,
27 queries: &[CXQuery<String>],
28 pre_execution_queries: Option<&[String]>,
29) -> ArrowDestination {
30 let mut destination = ArrowDestination::new();
31 let protocol = source_conn.proto.as_str();
32 debug!("Protocol: {}", protocol);
33
34 match source_conn.ty {
35 #[cfg(feature = "src_postgres")]
36 SourceType::Postgres => {
37 let (config, tls) = rewrite_tls_args(&source_conn.conn)?;
38 match (protocol, tls) {
39 ("csv", Some(tls_conn)) => {
40 let source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
41 config,
42 tls_conn,
43 queries.len(),
44 )?;
45 let mut dispatcher = Dispatcher::<
46 _,
47 _,
48 PostgresArrowTransport<CSVProtocol, MakeTlsConnector>,
49 >::new(
50 source, &mut destination, queries, origin_query
51 );
52 dispatcher.set_pre_execution_queries(pre_execution_queries);
53 dispatcher.run()?;
54 }
55 ("csv", None) => {
56 let source =
57 PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())?;
58 let mut dispatcher = Dispatcher::<
59 _,
60 _,
61 PostgresArrowTransport<CSVProtocol, NoTls>,
62 >::new(
63 source, &mut destination, queries, origin_query
64 );
65 dispatcher.set_pre_execution_queries(pre_execution_queries);
66 dispatcher.run()?;
67 }
68 ("binary", Some(tls_conn)) => {
69 let source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
70 config,
71 tls_conn,
72 queries.len(),
73 )?;
74 let mut dispatcher = Dispatcher::<
75 _,
76 _,
77 PostgresArrowTransport<PgBinaryProtocol, MakeTlsConnector>,
78 >::new(
79 source, &mut destination, queries, origin_query
80 );
81 dispatcher.set_pre_execution_queries(pre_execution_queries);
82 dispatcher.run()?;
83 }
84 ("binary", None) => {
85 let source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
86 config,
87 NoTls,
88 queries.len(),
89 )?;
90 let mut dispatcher = Dispatcher::<
91 _,
92 _,
93 PostgresArrowTransport<PgBinaryProtocol, NoTls>,
94 >::new(
95 source, &mut destination, queries, origin_query
96 );
97 dispatcher.set_pre_execution_queries(pre_execution_queries);
98 dispatcher.run()?;
99 }
100 ("cursor", Some(tls_conn)) => {
101 let source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
102 config,
103 tls_conn,
104 queries.len(),
105 )?;
106 let mut dispatcher = Dispatcher::<
107 _,
108 _,
109 PostgresArrowTransport<CursorProtocol, MakeTlsConnector>,
110 >::new(
111 source, &mut destination, queries, origin_query
112 );
113 dispatcher.set_pre_execution_queries(pre_execution_queries);
114 dispatcher.run()?;
115 }
116 ("cursor", None) => {
117 let source =
118 PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())?;
119 let mut dispatcher = Dispatcher::<
120 _,
121 _,
122 PostgresArrowTransport<CursorProtocol, NoTls>,
123 >::new(
124 source, &mut destination, queries, origin_query
125 );
126 dispatcher.set_pre_execution_queries(pre_execution_queries);
127 dispatcher.run()?;
128 }
129 ("simple", Some(tls_conn)) => {
130 let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
131 config,
132 tls_conn,
133 queries.len(),
134 )?;
135 let mut dispatcher = Dispatcher::<
136 _,
137 _,
138 PostgresArrowTransport<SimpleProtocol, MakeTlsConnector>,
139 >::new(
140 sb, &mut destination, queries, origin_query
141 );
142 debug!("Running dispatcher");
143 dispatcher.set_pre_execution_queries(pre_execution_queries);
144 dispatcher.run()?;
145 }
146 ("simple", None) => {
147 let sb =
148 PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
149 let mut dispatcher = Dispatcher::<
150 _,
151 _,
152 PostgresArrowTransport<SimpleProtocol, NoTls>,
153 >::new(
154 sb, &mut destination, queries, origin_query
155 );
156 debug!("Running dispatcher");
157 dispatcher.set_pre_execution_queries(pre_execution_queries);
158 dispatcher.run()?;
159 }
160 _ => unimplemented!("{} protocol not supported", protocol),
161 }
162 }
163 #[cfg(feature = "src_mysql")]
164 SourceType::MySQL => match protocol {
165 "binary" => {
166 let source =
167 MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())?;
168 let mut dispatcher =
169 Dispatcher::<_, _, MySQLArrowTransport<MySQLBinaryProtocol>>::new(
170 source,
171 &mut destination,
172 queries,
173 origin_query,
174 );
175 dispatcher.set_pre_execution_queries(pre_execution_queries);
176 dispatcher.run()?;
177 }
178 "text" => {
179 let source =
180 MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len())?;
181 let mut dispatcher = Dispatcher::<_, _, MySQLArrowTransport<TextProtocol>>::new(
182 source,
183 &mut destination,
184 queries,
185 origin_query,
186 );
187 dispatcher.set_pre_execution_queries(pre_execution_queries);
188 dispatcher.run()?;
189 }
190 _ => unimplemented!("{} protocol not supported", protocol),
191 },
192 #[cfg(feature = "src_sqlite")]
193 SourceType::SQLite => {
194 let path = &source_conn.conn.as_str()[9..];
196 let source = SQLiteSource::new(path, queries.len())?;
197 let dispatcher = Dispatcher::<_, _, SQLiteArrowTransport>::new(
198 source,
199 &mut destination,
200 queries,
201 origin_query,
202 );
203 dispatcher.run()?;
204 }
205 #[cfg(feature = "src_mssql")]
206 SourceType::MsSQL => {
207 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
208 let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len())?;
209 let dispatcher = Dispatcher::<_, _, MsSQLArrowTransport>::new(
210 source,
211 &mut destination,
212 queries,
213 origin_query,
214 );
215 dispatcher.run()?;
216 }
217 #[cfg(feature = "src_oracle")]
218 SourceType::Oracle => {
219 let source = OracleSource::new(&source_conn.conn[..], queries.len())?;
220 let dispatcher = Dispatcher::<_, _, OracleArrowTransport>::new(
221 source,
222 &mut destination,
223 queries,
224 origin_query,
225 );
226 dispatcher.run()?;
227 }
228 #[cfg(feature = "src_bigquery")]
229 SourceType::BigQuery => {
230 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
231 let source = BigQuerySource::new(rt, &source_conn.conn[..])?;
232 let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new(
233 source,
234 &mut destination,
235 queries,
236 origin_query,
237 );
238 dispatcher.run()?;
239 }
240 #[cfg(feature = "src_trino")]
241 SourceType::Trino => {
242 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
243 let source = TrinoSource::new(rt, &source_conn.conn[..])?;
244 let dispatcher = Dispatcher::<_, _, TrinoArrowTransport>::new(
245 source,
246 &mut destination,
247 queries,
248 origin_query,
249 );
250 dispatcher.run()?;
251 }
252 _ => throw!(ConnectorXOutError::SourceNotSupport(format!(
253 "{:?}",
254 source_conn.ty
255 ))),
256 }
257
258 destination
259}
260
261#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
262pub fn new_record_batch_iter(
263 source_conn: &SourceConn,
264 origin_query: Option<String>,
265 queries: &[CXQuery<String>],
266 batch_size: usize,
267 pre_execution_queries: Option<&[String]>,
268) -> Box<dyn RecordBatchIterator> {
269 let destination = ArrowStreamDestination::new_with_batch_size(batch_size);
270 let protocol = source_conn.proto.as_str();
271 debug!("Protocol: {}", protocol);
272
273 match source_conn.ty {
274 #[cfg(feature = "src_postgres")]
275 SourceType::Postgres => {
276 let (config, tls) = rewrite_tls_args(&source_conn.conn).unwrap();
277 match (protocol, tls) {
278 ("csv", Some(tls_conn)) => {
279 let mut source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
280 config,
281 tls_conn,
282 queries.len(),
283 )
284 .unwrap();
285
286 source.set_pre_execution_queries(pre_execution_queries);
287
288 let batch_iter =
289 ArrowBatchIter::<
290 _,
291 PostgresArrowStreamTransport<CSVProtocol, MakeTlsConnector>,
292 >::new(source, destination, origin_query, queries)
293 .unwrap();
294 return Box::new(batch_iter);
295 }
296 ("csv", None) => {
297 let mut source =
298 PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())
299 .unwrap();
300
301 source.set_pre_execution_queries(pre_execution_queries);
302
303 let batch_iter = ArrowBatchIter::<
304 _,
305 PostgresArrowStreamTransport<CSVProtocol, NoTls>,
306 >::new(
307 source, destination, origin_query, queries
308 )
309 .unwrap();
310 return Box::new(batch_iter);
311 }
312 ("binary", Some(tls_conn)) => {
313 let mut source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
314 config,
315 tls_conn,
316 queries.len(),
317 )
318 .unwrap();
319
320 source.set_pre_execution_queries(pre_execution_queries);
321
322 let batch_iter =
323 ArrowBatchIter::<
324 _,
325 PostgresArrowStreamTransport<PgBinaryProtocol, MakeTlsConnector>,
326 >::new(source, destination, origin_query, queries)
327 .unwrap();
328 return Box::new(batch_iter);
329 }
330 ("binary", None) => {
331 let mut source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
332 config,
333 NoTls,
334 queries.len(),
335 )
336 .unwrap();
337
338 source.set_pre_execution_queries(pre_execution_queries);
339
340 let batch_iter = ArrowBatchIter::<
341 _,
342 PostgresArrowStreamTransport<PgBinaryProtocol, NoTls>,
343 >::new(
344 source, destination, origin_query, queries
345 )
346 .unwrap();
347 return Box::new(batch_iter);
348 }
349 ("cursor", Some(tls_conn)) => {
350 let mut source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
351 config,
352 tls_conn,
353 queries.len(),
354 )
355 .unwrap();
356
357 source.set_pre_execution_queries(pre_execution_queries);
358
359 let batch_iter =
360 ArrowBatchIter::<
361 _,
362 PostgresArrowStreamTransport<CursorProtocol, MakeTlsConnector>,
363 >::new(source, destination, origin_query, queries)
364 .unwrap();
365 return Box::new(batch_iter);
366 }
367 ("cursor", None) => {
368 let mut source =
369 PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())
370 .unwrap();
371
372 source.set_pre_execution_queries(pre_execution_queries);
373
374 let batch_iter = ArrowBatchIter::<
375 _,
376 PostgresArrowStreamTransport<CursorProtocol, NoTls>,
377 >::new(
378 source, destination, origin_query, queries
379 )
380 .unwrap();
381 return Box::new(batch_iter);
382 }
383 _ => unimplemented!("{} protocol not supported", protocol),
384 }
385 }
386 #[cfg(feature = "src_mysql")]
387 SourceType::MySQL => match protocol {
388 "binary" => {
389 let mut source =
390 MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())
391 .unwrap();
392
393 source.set_pre_execution_queries(pre_execution_queries);
394
395 let batch_iter =
396 ArrowBatchIter::<_, MySQLArrowStreamTransport<MySQLBinaryProtocol>>::new(
397 source,
398 destination,
399 origin_query,
400 queries,
401 )
402 .unwrap();
403 return Box::new(batch_iter);
404 }
405 "text" => {
406 let mut source =
407 MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len()).unwrap();
408
409 source.set_pre_execution_queries(pre_execution_queries);
410
411 let batch_iter = ArrowBatchIter::<_, MySQLArrowStreamTransport<TextProtocol>>::new(
412 source,
413 destination,
414 origin_query,
415 queries,
416 )
417 .unwrap();
418 return Box::new(batch_iter);
419 }
420 _ => unimplemented!("{} protocol not supported", protocol),
421 },
422 #[cfg(feature = "src_sqlite")]
423 SourceType::SQLite => {
424 let path = &source_conn.conn.as_str()[9..];
426 let source = SQLiteSource::new(path, queries.len()).unwrap();
427 let batch_iter = ArrowBatchIter::<_, SQLiteArrowStreamTransport>::new(
428 source,
429 destination,
430 origin_query,
431 queries,
432 )
433 .unwrap();
434 return Box::new(batch_iter);
435 }
436 #[cfg(feature = "src_mssql")]
437 SourceType::MsSQL => {
438 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
439 let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len()).unwrap();
440 let batch_iter = ArrowBatchIter::<_, MsSQLArrowStreamTransport>::new(
441 source,
442 destination,
443 origin_query,
444 queries,
445 )
446 .unwrap();
447 return Box::new(batch_iter);
448 }
449 #[cfg(feature = "src_oracle")]
450 SourceType::Oracle => {
451 let source = OracleSource::new(&source_conn.conn[..], queries.len()).unwrap();
452 let batch_iter = ArrowBatchIter::<_, OracleArrowStreamTransport>::new(
453 source,
454 destination,
455 origin_query,
456 queries,
457 )
458 .unwrap();
459 return Box::new(batch_iter);
460 }
461 #[cfg(feature = "src_bigquery")]
462 SourceType::BigQuery => {
463 let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
464 let source = BigQuerySource::new(rt, &source_conn.conn[..]).unwrap();
465 let batch_iter = ArrowBatchIter::<_, BigQueryArrowStreamTransport>::new(
466 source,
467 destination,
468 origin_query,
469 queries,
470 )
471 .unwrap();
472 return Box::new(batch_iter);
473 }
474 _ => {}
475 }
476 panic!("not supported!");
477}