connectorx/
get_arrow.rs

1#[cfg(feature = "src_mysql")]
2use crate::sources::mysql::{BinaryProtocol as MySQLBinaryProtocol, TextProtocol};
3#[cfg(feature = "src_postgres")]
4use crate::sources::postgres::{
5    rewrite_tls_args, BinaryProtocol as PgBinaryProtocol, CSVProtocol, CursorProtocol,
6    SimpleProtocol,
7};
8use crate::{
9    arrow_batch_iter::{ArrowBatchIter, RecordBatchIterator},
10    prelude::*,
11    sql::CXQuery,
12};
13use fehler::{throw, throws};
14use log::debug;
15#[cfg(feature = "src_postgres")]
16use postgres::NoTls;
17#[cfg(feature = "src_postgres")]
18use postgres_openssl::MakeTlsConnector;
19#[allow(unused_imports)]
20use std::sync::Arc;
21
22#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
23#[throws(ConnectorXOutError)]
24pub fn get_arrow(
25    source_conn: &SourceConn,
26    origin_query: Option<String>,
27    queries: &[CXQuery<String>],
28    pre_execution_queries: Option<&[String]>,
29) -> ArrowDestination {
30    let mut destination = ArrowDestination::new();
31    let protocol = source_conn.proto.as_str();
32    debug!("Protocol: {}", protocol);
33
34    match source_conn.ty {
35        #[cfg(feature = "src_postgres")]
36        SourceType::Postgres => {
37            let (config, tls) = rewrite_tls_args(&source_conn.conn)?;
38            match (protocol, tls) {
39                ("csv", Some(tls_conn)) => {
40                    let source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
41                        config,
42                        tls_conn,
43                        queries.len(),
44                    )?;
45                    let mut dispatcher = Dispatcher::<
46                        _,
47                        _,
48                        PostgresArrowTransport<CSVProtocol, MakeTlsConnector>,
49                    >::new(
50                        source, &mut destination, queries, origin_query
51                    );
52                    dispatcher.set_pre_execution_queries(pre_execution_queries);
53                    dispatcher.run()?;
54                }
55                ("csv", None) => {
56                    let source =
57                        PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())?;
58                    let mut dispatcher = Dispatcher::<
59                        _,
60                        _,
61                        PostgresArrowTransport<CSVProtocol, NoTls>,
62                    >::new(
63                        source, &mut destination, queries, origin_query
64                    );
65                    dispatcher.set_pre_execution_queries(pre_execution_queries);
66                    dispatcher.run()?;
67                }
68                ("binary", Some(tls_conn)) => {
69                    let source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
70                        config,
71                        tls_conn,
72                        queries.len(),
73                    )?;
74                    let mut dispatcher = Dispatcher::<
75                        _,
76                        _,
77                        PostgresArrowTransport<PgBinaryProtocol, MakeTlsConnector>,
78                    >::new(
79                        source, &mut destination, queries, origin_query
80                    );
81                    dispatcher.set_pre_execution_queries(pre_execution_queries);
82                    dispatcher.run()?;
83                }
84                ("binary", None) => {
85                    let source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
86                        config,
87                        NoTls,
88                        queries.len(),
89                    )?;
90                    let mut dispatcher = Dispatcher::<
91                        _,
92                        _,
93                        PostgresArrowTransport<PgBinaryProtocol, NoTls>,
94                    >::new(
95                        source, &mut destination, queries, origin_query
96                    );
97                    dispatcher.set_pre_execution_queries(pre_execution_queries);
98                    dispatcher.run()?;
99                }
100                ("cursor", Some(tls_conn)) => {
101                    let source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
102                        config,
103                        tls_conn,
104                        queries.len(),
105                    )?;
106                    let mut dispatcher = Dispatcher::<
107                        _,
108                        _,
109                        PostgresArrowTransport<CursorProtocol, MakeTlsConnector>,
110                    >::new(
111                        source, &mut destination, queries, origin_query
112                    );
113                    dispatcher.set_pre_execution_queries(pre_execution_queries);
114                    dispatcher.run()?;
115                }
116                ("cursor", None) => {
117                    let source =
118                        PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())?;
119                    let mut dispatcher = Dispatcher::<
120                        _,
121                        _,
122                        PostgresArrowTransport<CursorProtocol, NoTls>,
123                    >::new(
124                        source, &mut destination, queries, origin_query
125                    );
126                    dispatcher.set_pre_execution_queries(pre_execution_queries);
127                    dispatcher.run()?;
128                }
129                ("simple", Some(tls_conn)) => {
130                    let sb = PostgresSource::<SimpleProtocol, MakeTlsConnector>::new(
131                        config,
132                        tls_conn,
133                        queries.len(),
134                    )?;
135                    let mut dispatcher = Dispatcher::<
136                        _,
137                        _,
138                        PostgresArrowTransport<SimpleProtocol, MakeTlsConnector>,
139                    >::new(
140                        sb, &mut destination, queries, origin_query
141                    );
142                    debug!("Running dispatcher");
143                    dispatcher.set_pre_execution_queries(pre_execution_queries);
144                    dispatcher.run()?;
145                }
146                ("simple", None) => {
147                    let sb =
148                        PostgresSource::<SimpleProtocol, NoTls>::new(config, NoTls, queries.len())?;
149                    let mut dispatcher = Dispatcher::<
150                        _,
151                        _,
152                        PostgresArrowTransport<SimpleProtocol, NoTls>,
153                    >::new(
154                        sb, &mut destination, queries, origin_query
155                    );
156                    debug!("Running dispatcher");
157                    dispatcher.set_pre_execution_queries(pre_execution_queries);
158                    dispatcher.run()?;
159                }
160                _ => unimplemented!("{} protocol not supported", protocol),
161            }
162        }
163        #[cfg(feature = "src_mysql")]
164        SourceType::MySQL => match protocol {
165            "binary" => {
166                let source =
167                    MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())?;
168                let mut dispatcher =
169                    Dispatcher::<_, _, MySQLArrowTransport<MySQLBinaryProtocol>>::new(
170                        source,
171                        &mut destination,
172                        queries,
173                        origin_query,
174                    );
175                dispatcher.set_pre_execution_queries(pre_execution_queries);
176                dispatcher.run()?;
177            }
178            "text" => {
179                let source =
180                    MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len())?;
181                let mut dispatcher = Dispatcher::<_, _, MySQLArrowTransport<TextProtocol>>::new(
182                    source,
183                    &mut destination,
184                    queries,
185                    origin_query,
186                );
187                dispatcher.set_pre_execution_queries(pre_execution_queries);
188                dispatcher.run()?;
189            }
190            _ => unimplemented!("{} protocol not supported", protocol),
191        },
192        #[cfg(feature = "src_sqlite")]
193        SourceType::SQLite => {
194            // remove the first "sqlite://" manually since url.path is not correct for windows
195            let path = &source_conn.conn.as_str()[9..];
196            let source = SQLiteSource::new(path, queries.len())?;
197            let dispatcher = Dispatcher::<_, _, SQLiteArrowTransport>::new(
198                source,
199                &mut destination,
200                queries,
201                origin_query,
202            );
203            dispatcher.run()?;
204        }
205        #[cfg(feature = "src_mssql")]
206        SourceType::MsSQL => {
207            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
208            let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len())?;
209            let dispatcher = Dispatcher::<_, _, MsSQLArrowTransport>::new(
210                source,
211                &mut destination,
212                queries,
213                origin_query,
214            );
215            dispatcher.run()?;
216        }
217        #[cfg(feature = "src_oracle")]
218        SourceType::Oracle => {
219            let source = OracleSource::new(&source_conn.conn[..], queries.len())?;
220            let dispatcher = Dispatcher::<_, _, OracleArrowTransport>::new(
221                source,
222                &mut destination,
223                queries,
224                origin_query,
225            );
226            dispatcher.run()?;
227        }
228        #[cfg(feature = "src_bigquery")]
229        SourceType::BigQuery => {
230            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
231            let source = BigQuerySource::new(rt, &source_conn.conn[..])?;
232            let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new(
233                source,
234                &mut destination,
235                queries,
236                origin_query,
237            );
238            dispatcher.run()?;
239        }
240        #[cfg(feature = "src_trino")]
241        SourceType::Trino => {
242            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
243            let source = TrinoSource::new(rt, &source_conn.conn[..])?;
244            let dispatcher = Dispatcher::<_, _, TrinoArrowTransport>::new(
245                source,
246                &mut destination,
247                queries,
248                origin_query,
249            );
250            dispatcher.run()?;
251        }
252        _ => throw!(ConnectorXOutError::SourceNotSupport(format!(
253            "{:?}",
254            source_conn.ty
255        ))),
256    }
257
258    destination
259}
260
261#[allow(unreachable_code, unreachable_patterns, unused_variables, unused_mut)]
262pub fn new_record_batch_iter(
263    source_conn: &SourceConn,
264    origin_query: Option<String>,
265    queries: &[CXQuery<String>],
266    batch_size: usize,
267    pre_execution_queries: Option<&[String]>,
268) -> Box<dyn RecordBatchIterator> {
269    let destination = ArrowStreamDestination::new_with_batch_size(batch_size);
270    let protocol = source_conn.proto.as_str();
271    debug!("Protocol: {}", protocol);
272
273    match source_conn.ty {
274        #[cfg(feature = "src_postgres")]
275        SourceType::Postgres => {
276            let (config, tls) = rewrite_tls_args(&source_conn.conn).unwrap();
277            match (protocol, tls) {
278                ("csv", Some(tls_conn)) => {
279                    let mut source = PostgresSource::<CSVProtocol, MakeTlsConnector>::new(
280                        config,
281                        tls_conn,
282                        queries.len(),
283                    )
284                    .unwrap();
285
286                    source.set_pre_execution_queries(pre_execution_queries);
287
288                    let batch_iter =
289                        ArrowBatchIter::<
290                            _,
291                            PostgresArrowStreamTransport<CSVProtocol, MakeTlsConnector>,
292                        >::new(source, destination, origin_query, queries)
293                        .unwrap();
294                    return Box::new(batch_iter);
295                }
296                ("csv", None) => {
297                    let mut source =
298                        PostgresSource::<CSVProtocol, NoTls>::new(config, NoTls, queries.len())
299                            .unwrap();
300
301                    source.set_pre_execution_queries(pre_execution_queries);
302
303                    let batch_iter = ArrowBatchIter::<
304                        _,
305                        PostgresArrowStreamTransport<CSVProtocol, NoTls>,
306                    >::new(
307                        source, destination, origin_query, queries
308                    )
309                    .unwrap();
310                    return Box::new(batch_iter);
311                }
312                ("binary", Some(tls_conn)) => {
313                    let mut source = PostgresSource::<PgBinaryProtocol, MakeTlsConnector>::new(
314                        config,
315                        tls_conn,
316                        queries.len(),
317                    )
318                    .unwrap();
319
320                    source.set_pre_execution_queries(pre_execution_queries);
321
322                    let batch_iter =
323                        ArrowBatchIter::<
324                            _,
325                            PostgresArrowStreamTransport<PgBinaryProtocol, MakeTlsConnector>,
326                        >::new(source, destination, origin_query, queries)
327                        .unwrap();
328                    return Box::new(batch_iter);
329                }
330                ("binary", None) => {
331                    let mut source = PostgresSource::<PgBinaryProtocol, NoTls>::new(
332                        config,
333                        NoTls,
334                        queries.len(),
335                    )
336                    .unwrap();
337
338                    source.set_pre_execution_queries(pre_execution_queries);
339
340                    let batch_iter = ArrowBatchIter::<
341                        _,
342                        PostgresArrowStreamTransport<PgBinaryProtocol, NoTls>,
343                    >::new(
344                        source, destination, origin_query, queries
345                    )
346                    .unwrap();
347                    return Box::new(batch_iter);
348                }
349                ("cursor", Some(tls_conn)) => {
350                    let mut source = PostgresSource::<CursorProtocol, MakeTlsConnector>::new(
351                        config,
352                        tls_conn,
353                        queries.len(),
354                    )
355                    .unwrap();
356
357                    source.set_pre_execution_queries(pre_execution_queries);
358
359                    let batch_iter =
360                        ArrowBatchIter::<
361                            _,
362                            PostgresArrowStreamTransport<CursorProtocol, MakeTlsConnector>,
363                        >::new(source, destination, origin_query, queries)
364                        .unwrap();
365                    return Box::new(batch_iter);
366                }
367                ("cursor", None) => {
368                    let mut source =
369                        PostgresSource::<CursorProtocol, NoTls>::new(config, NoTls, queries.len())
370                            .unwrap();
371
372                    source.set_pre_execution_queries(pre_execution_queries);
373
374                    let batch_iter = ArrowBatchIter::<
375                        _,
376                        PostgresArrowStreamTransport<CursorProtocol, NoTls>,
377                    >::new(
378                        source, destination, origin_query, queries
379                    )
380                    .unwrap();
381                    return Box::new(batch_iter);
382                }
383                _ => unimplemented!("{} protocol not supported", protocol),
384            }
385        }
386        #[cfg(feature = "src_mysql")]
387        SourceType::MySQL => match protocol {
388            "binary" => {
389                let mut source =
390                    MySQLSource::<MySQLBinaryProtocol>::new(&source_conn.conn[..], queries.len())
391                        .unwrap();
392
393                source.set_pre_execution_queries(pre_execution_queries);
394
395                let batch_iter =
396                    ArrowBatchIter::<_, MySQLArrowStreamTransport<MySQLBinaryProtocol>>::new(
397                        source,
398                        destination,
399                        origin_query,
400                        queries,
401                    )
402                    .unwrap();
403                return Box::new(batch_iter);
404            }
405            "text" => {
406                let mut source =
407                    MySQLSource::<TextProtocol>::new(&source_conn.conn[..], queries.len()).unwrap();
408
409                source.set_pre_execution_queries(pre_execution_queries);
410
411                let batch_iter = ArrowBatchIter::<_, MySQLArrowStreamTransport<TextProtocol>>::new(
412                    source,
413                    destination,
414                    origin_query,
415                    queries,
416                )
417                .unwrap();
418                return Box::new(batch_iter);
419            }
420            _ => unimplemented!("{} protocol not supported", protocol),
421        },
422        #[cfg(feature = "src_sqlite")]
423        SourceType::SQLite => {
424            // remove the first "sqlite://" manually since url.path is not correct for windows
425            let path = &source_conn.conn.as_str()[9..];
426            let source = SQLiteSource::new(path, queries.len()).unwrap();
427            let batch_iter = ArrowBatchIter::<_, SQLiteArrowStreamTransport>::new(
428                source,
429                destination,
430                origin_query,
431                queries,
432            )
433            .unwrap();
434            return Box::new(batch_iter);
435        }
436        #[cfg(feature = "src_mssql")]
437        SourceType::MsSQL => {
438            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
439            let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len()).unwrap();
440            let batch_iter = ArrowBatchIter::<_, MsSQLArrowStreamTransport>::new(
441                source,
442                destination,
443                origin_query,
444                queries,
445            )
446            .unwrap();
447            return Box::new(batch_iter);
448        }
449        #[cfg(feature = "src_oracle")]
450        SourceType::Oracle => {
451            let source = OracleSource::new(&source_conn.conn[..], queries.len()).unwrap();
452            let batch_iter = ArrowBatchIter::<_, OracleArrowStreamTransport>::new(
453                source,
454                destination,
455                origin_query,
456                queries,
457            )
458            .unwrap();
459            return Box::new(batch_iter);
460        }
461        #[cfg(feature = "src_bigquery")]
462        SourceType::BigQuery => {
463            let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
464            let source = BigQuerySource::new(rt, &source_conn.conn[..]).unwrap();
465            let batch_iter = ArrowBatchIter::<_, BigQueryArrowStreamTransport>::new(
466                source,
467                destination,
468                origin_query,
469                queries,
470            )
471            .unwrap();
472            return Box::new(batch_iter);
473        }
474        _ => {}
475    }
476    panic!("not supported!");
477}