connectorx/
get_arrow.rs

1#[cfg(feature = "src_mysql")]
2use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol};
3#[cfg(feature = "src_postgres")]
4use crate::sources::postgres::{
5    rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6    SimpleProtocol,
7};
8use crate::{
9    arrow_batch_iter::{ArrowBatchIter, RecordBatchIterator},
10    prelude::*,
11    sql::CXQuery,
12};
13use fehler::{throw, throws};
14use log::debug;
15#[cfg(feature = "src_postgres")]
16use postgres::NoTls;
17#[cfg(feature = "src_postgres")]
18use postgres_openssl::MakeTlsConnector;
19#[allow(unused_imports)]
20use std::sync::Arc;
21
22#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
23#[throws(ConnectorXOutError)]
24pub fn get_arrow(
25    source_conn: &SourceConn,
26    origin_query: Option<String>,
27    queries: &[CXQuery<String>],
28    pre_execution_queries: Option<&[String]>,
29) -> ArrowDestination {
30    let mut destination = ArrowDestination::new();
31    let protocol = source_conn.proto.as_str();
32    debug!("Protocol: {}", protocol);
33
34    match source_conn.ty {
35        #[cfg(feature = "src_postgres")]
36        SourceType::Postgres => {
37            let (config, tls) = rewrite_tls_args(&source_conn.conn)?;
38            match (protocol, tls) {
39                ("csv", Some(tls_conn)) => {
40                    let source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
41                        config,
42                        tls_conn,
43                        queries.len(),
44                    )?;
45                    let mut dispatcher = Dispatcher::<
46                        _,
47                        _,
48                        PostgresArrowTransport<CSVProtocol, MakeTlsConnector>,
49                    >::new(
50                        source, &mut destination, queries, origin_query
51                    );
52                    dispatcher.set_pre_execution_queries(pre_execution_queries);
53                    dispatcher.run()?;
54                }
55                ("csv", None) => {
56                    let source =
57                        PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())?;
58                    let mut dispatcher = Dispatcher::<
59                        _,
60                        _,
61                        PostgresArrowTransport<CSVProtocol, NoTls>,
62                    >::new(
63                        source, &mut destination, queries, origin_query
64                    );
65                    dispatcher.set_pre_execution_queries(pre_execution_queries);
66                    dispatcher.run()?;
67                }
68                ("binary", Some(tls_conn)) => {
69                    let source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
70                        config,
71                        tls_conn,
72                        queries.len(),
73                    )?;
74                    let mut dispatcher = Dispatcher::<
75                        _,
76                        _,
77                        PostgresArrowTransport<PgBinaryProtocol, MakeTlsConnector>,
78                    >::new(
79                        source, &mut destination, queries, origin_query
80                    );
81                    dispatcher.set_pre_execution_queries(pre_execution_queries);
82                    dispatcher.run()?;
83                }
84                ("binary", None) => {
85                    let source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
86                        config,
87                        NoTls,
88                        queries.len(),
89                    )?;
90                    let mut dispatcher = Dispatcher::<
91                        _,
92                        _,
93                        PostgresArrowTransport<PgBinaryProtocol, NoTls>,
94                    >::new(
95                        source, &mut destination, queries, origin_query
96                    );
97                    dispatcher.set_pre_execution_queries(pre_execution_queries);
98                    dispatcher.run()?;
99                }
100                ("cursor", Some(tls_conn)) => {
101                    let source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
102                        config,
103                        tls_conn,
104                        queries.len(),
105                    )?;
106                    let mut dispatcher = Dispatcher::<
107                        _,
108                        _,
109                        PostgresArrowTransport<CursorProtocol, MakeTlsConnector>,
110                    >::new(
111                        source, &mut destination, queries, origin_query
112                    );
113                    dispatcher.set_pre_execution_queries(pre_execution_queries);
114                    dispatcher.run()?;
115                }
116                ("cursor", None) => {
117                    let source =
118                        PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())?;
119                    let mut dispatcher = Dispatcher::<
120                        _,
121                        _,
122                        PostgresArrowTransport<CursorProtocol, NoTls>,
123                    >::new(
124                        source, &mut destination, queries, origin_query
125                    );
126                    dispatcher.set_pre_execution_queries(pre_execution_queries);
127                    dispatcher.run()?;
128                }
129                ("simple", Some(tls_conn)) => {
130                    let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
131                        config,
132                        tls_conn,
133                        queries.len(),
134                    )?;
135                    let mut dispatcher = Dispatcher::<
136                        _,
137                        _,
138                        PostgresArrowTransport<SimpleProtocol, MakeTlsConnector>,
139                    >::new(
140                        sb, &mut destination, queries, origin_query
141                    );
142                    debug!("Running dispatcher");
143                    dispatcher.set_pre_execution_queries(pre_execution_queries);
144                    dispatcher.run()?;
145                }
146                ("simple", None) => {
147                    let sb =
148                        PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
149                    let mut dispatcher = Dispatcher::<
150                        _,
151                        _,
152                        PostgresArrowTransport<SimpleProtocol, NoTls>,
153                    >::new(
154                        sb, &mut destination, queries, origin_query
155                    );
156                    debug!("Running dispatcher");
157                    dispatcher.set_pre_execution_queries(pre_execution_queries);
158                    dispatcher.run()?;
159                }
160                _ => unimplemented!("{} protocol not supported", protocol),
161            }
162        }
163        #[cfg(feature = "src_mysql")]
164        SourceType::MySQL => match protocol {
165            "binary" => {
166                let source =
167                    MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())?;
168                let mut dispatcher =
169                    Dispatcher::<_, _, MySQLArrowTransport<MySQLBinaryProtocol>>::new(
170                        source,
171                        &mut destination,
172                        queries,
173                        origin_query,
174                    );
175                dispatcher.set_pre_execution_queries(pre_execution_queries);
176                dispatcher.run()?;
177            }
178            "text" => {
179                let source =
180                    MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len())?;
181                let mut dispatcher = Dispatcher::<_, _, MySQLArrowTransport<TextProtocol>>::new(
182                    source,
183                    &mut destination,
184                    queries,
185                    origin_query,
186                );
187                dispatcher.set_pre_execution_queries(pre_execution_queries);
188                dispatcher.run()?;
189            }
190            _ => unimplemented!("{} protocol not supported", protocol),
191        },
192        #[cfg(feature = "src_sqlite")]
193        SourceType::SQLite => {
194            // remove the first "sqlite://" manually since url.path is not correct for windows
195            let path = &source_conn.conn.as_str()[9..];
196            let source = SQLiteSource::new(path, queries.len())?;
197            let dispatcher = Dispatcher::<_, _, SQLiteArrowTransport>::new(
198                source,
199                &mut destination,
200                queries,
201                origin_query,
202            );
203            dispatcher.run()?;
204        }
205        #[cfg(feature = "src_mssql")]
206        SourceType::MsSQL => {
207            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
208            let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len())?;
209            let dispatcher = Dispatcher::<_, _, MsSQLArrowTransport>::new(
210                source,
211                &mut destination,
212                queries,
213                origin_query,
214            );
215            dispatcher.run()?;
216        }
217        #[cfg(feature = "src_oracle")]
218        SourceType::Oracle => {
219            let source = OracleSource::new(&source_conn.conn[..], queries.len())?;
220            let dispatcher = Dispatcher::<_, _, OracleArrowTransport>::new(
221                source,
222                &mut destination,
223                queries,
224                origin_query,
225            );
226            dispatcher.run()?;
227        }
228        #[cfg(feature = "src_bigquery")]
229        SourceType::BigQuery => {
230            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
231            let source = BigQuerySource::new(rt, &source_conn.conn[..])?;
232            let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new(
233                source,
234                &mut destination,
235                queries,
236                origin_query,
237            );
238            dispatcher.run()?;
239        }
240        #[cfg(feature = "src_trino")]
241        SourceType::Trino => {
242            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
243            let source = TrinoSource::new(rt, &source_conn.conn[..])?;
244            let dispatcher = Dispatcher::<_, _, TrinoArrowTransport>::new(
245                source,
246                &mut destination,
247                queries,
248                origin_query,
249            );
250            dispatcher.run()?;
251        }
252        #[cfg(feature = "src_clickhouse")]
253        SourceType::ClickHouse => {
254            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
255            let source = ClickHouseSource::new(rt, &source_conn.conn[..])?;
256            let dispatcher = Dispatcher::<_, _, ClickHouseArrowTransport>::new(
257                source,
258                &mut destination,
259                queries,
260                origin_query,
261            );
262            dispatcher.run()?;
263        }
264        _ => throw!(ConnectorXOutError::SourceNotSupport(format!(
265            "{:?}",
266            source_conn.ty
267        ))),
268    }
269
270    destination
271}
272
273#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
274pub fn new_record_batch_iter(
275    source_conn: &SourceConn,
276    origin_query: Option<String>,
277    queries: &[CXQuery<String>],
278    batch_size: usize,
279    pre_execution_queries: Option<&[String]>,
280) -> Box<dyn RecordBatchIterator> {
281    let destination = ArrowStreamDestination::new_with_batch_size(batch_size);
282    let protocol = source_conn.proto.as_str();
283    debug!("Protocol: {}", protocol);
284
285    match source_conn.ty {
286        #[cfg(feature = "src_postgres")]
287        SourceType::Postgres => {
288            let (config, tls) = rewrite_tls_args(&source_conn.conn).unwrap();
289            match (protocol, tls) {
290                ("csv", Some(tls_conn)) => {
291                    let mut source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
292                        config,
293                        tls_conn,
294                        queries.len(),
295                    )
296                    .unwrap();
297
298                    source.set_pre_execution_queries(pre_execution_queries);
299
300                    let batch_iter =
301                        ArrowBatchIter::<
302                            _,
303                            PostgresArrowStreamTransport<CSVProtocol, MakeTlsConnector>,
304                        >::new(source, destination, origin_query, queries)
305                        .unwrap();
306                    return Box::new(batch_iter);
307                }
308                ("csv", None) => {
309                    let mut source =
310                        PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())
311                            .unwrap();
312
313                    source.set_pre_execution_queries(pre_execution_queries);
314
315                    let batch_iter = ArrowBatchIter::<
316                        _,
317                        PostgresArrowStreamTransport<CSVProtocol, NoTls>,
318                    >::new(
319                        source, destination, origin_query, queries
320                    )
321                    .unwrap();
322                    return Box::new(batch_iter);
323                }
324                ("binary", Some(tls_conn)) => {
325                    let mut source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
326                        config,
327                        tls_conn,
328                        queries.len(),
329                    )
330                    .unwrap();
331
332                    source.set_pre_execution_queries(pre_execution_queries);
333
334                    let batch_iter =
335                        ArrowBatchIter::<
336                            _,
337                            PostgresArrowStreamTransport<PgBinaryProtocol, MakeTlsConnector>,
338                        >::new(source, destination, origin_query, queries)
339                        .unwrap();
340                    return Box::new(batch_iter);
341                }
342                ("binary", None) => {
343                    let mut source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
344                        config,
345                        NoTls,
346                        queries.len(),
347                    )
348                    .unwrap();
349
350                    source.set_pre_execution_queries(pre_execution_queries);
351
352                    let batch_iter = ArrowBatchIter::<
353                        _,
354                        PostgresArrowStreamTransport<PgBinaryProtocol, NoTls>,
355                    >::new(
356                        source, destination, origin_query, queries
357                    )
358                    .unwrap();
359                    return Box::new(batch_iter);
360                }
361                ("cursor", Some(tls_conn)) => {
362                    let mut source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
363                        config,
364                        tls_conn,
365                        queries.len(),
366                    )
367                    .unwrap();
368
369                    source.set_pre_execution_queries(pre_execution_queries);
370
371                    let batch_iter =
372                        ArrowBatchIter::<
373                            _,
374                            PostgresArrowStreamTransport<CursorProtocol, MakeTlsConnector>,
375                        >::new(source, destination, origin_query, queries)
376                        .unwrap();
377                    return Box::new(batch_iter);
378                }
379                ("cursor", None) => {
380                    let mut source =
381                        PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())
382                            .unwrap();
383
384                    source.set_pre_execution_queries(pre_execution_queries);
385
386                    let batch_iter = ArrowBatchIter::<
387                        _,
388                        PostgresArrowStreamTransport<CursorProtocol, NoTls>,
389                    >::new(
390                        source, destination, origin_query, queries
391                    )
392                    .unwrap();
393                    return Box::new(batch_iter);
394                }
395                _ => unimplemented!("{} protocol not supported", protocol),
396            }
397        }
398        #[cfg(feature = "src_mysql")]
399        SourceType::MySQL => match protocol {
400            "binary" => {
401                let mut source =
402                    MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())
403                        .unwrap();
404
405                source.set_pre_execution_queries(pre_execution_queries);
406
407                let batch_iter =
408                    ArrowBatchIter::<_, MySQLArrowStreamTransport<MySQLBinaryProtocol>>::new(
409                        source,
410                        destination,
411                        origin_query,
412                        queries,
413                    )
414                    .unwrap();
415                return Box::new(batch_iter);
416            }
417            "text" => {
418                let mut source =
419                    MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len()).unwrap();
420
421                source.set_pre_execution_queries(pre_execution_queries);
422
423                let batch_iter = ArrowBatchIter::<_, MySQLArrowStreamTransport<TextProtocol>>::new(
424                    source,
425                    destination,
426                    origin_query,
427                    queries,
428                )
429                .unwrap();
430                return Box::new(batch_iter);
431            }
432            _ => unimplemented!("{} protocol not supported", protocol),
433        },
434        #[cfg(feature = "src_sqlite")]
435        SourceType::SQLite => {
436            // remove the first "sqlite://" manually since url.path is not correct for windows
437            let path = &source_conn.conn.as_str()[9..];
438            let source = SQLiteSource::new(path, queries.len()).unwrap();
439            let batch_iter = ArrowBatchIter::<_, SQLiteArrowStreamTransport>::new(
440                source,
441                destination,
442                origin_query,
443                queries,
444            )
445            .unwrap();
446            return Box::new(batch_iter);
447        }
448        #[cfg(feature = "src_mssql")]
449        SourceType::MsSQL => {
450            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
451            let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len()).unwrap();
452            let batch_iter = ArrowBatchIter::<_, MsSQLArrowStreamTransport>::new(
453                source,
454                destination,
455                origin_query,
456                queries,
457            )
458            .unwrap();
459            return Box::new(batch_iter);
460        }
461        #[cfg(feature = "src_oracle")]
462        SourceType::Oracle => {
463            let source = OracleSource::new(&source_conn.conn[..], queries.len()).unwrap();
464            let batch_iter = ArrowBatchIter::<_, OracleArrowStreamTransport>::new(
465                source,
466                destination,
467                origin_query,
468                queries,
469            )
470            .unwrap();
471            return Box::new(batch_iter);
472        }
473        #[cfg(feature = "src_bigquery")]
474        SourceType::BigQuery => {
475            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
476            let source = BigQuerySource::new(rt, &source_conn.conn[..]).unwrap();
477            let batch_iter = ArrowBatchIter::<_, BigQueryArrowStreamTransport>::new(
478                source,
479                destination,
480                origin_query,
481                queries,
482            )
483            .unwrap();
484            return Box::new(batch_iter);
485        }
486        #[cfg(feature = "src_clickhouse")]
487        SourceType::ClickHouse => {
488            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
489            let source = ClickHouseSource::new(rt, &source_conn.conn[..]).unwrap();
490            let batch_iter = ArrowBatchIter::<_, ClickHouseArrowStreamTransport>::new(
491                source,
492                destination,
493                origin_query,
494                queries,
495            )
496            .unwrap();
497            return Box::new(batch_iter);
498        }
499        _ => {}
500    }
501    panic!("not supported!");
502}