connectorx/sources/bigquery/
mod.rs

1//! Source implementation for Google BigQuery
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::BigQuerySourceError;
7use crate::{
8    data_order::DataOrder,
9    errors::ConnectorXError,
10    sources::{PartitionParser, Produce, Source, SourcePartition},
11    sql::{count_query, limit1_query, CXQuery},
12};
13use anyhow::anyhow;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use fehler::{throw, throws};
16use gcp_bigquery_client::{
17    model::{
18        get_query_results_parameters::GetQueryResultsParameters,
19        get_query_results_response::GetQueryResultsResponse, query_request::QueryRequest,
20        query_response::ResultSet,
21    },
22    Client,
23};
24use sqlparser::dialect::Dialect;
25use std::sync::Arc;
26use tokio::runtime::Runtime;
27pub use typesystem::BigQueryTypeSystem;
28use url::Url;
29
30#[derive(Debug)]
31pub struct BigQueryDialect {}
32
33impl Dialect for BigQueryDialect {
34    // See https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
35    fn is_delimited_identifier_start(&self, ch: char) -> bool {
36        ch == '`'
37    }
38
39    fn is_identifier_start(&self, ch: char) -> bool {
40        ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' || ch == '-'
41    }
42
43    fn is_identifier_part(&self, ch: char) -> bool {
44        self.is_identifier_start(ch) || ch.is_ascii_digit()
45    }
46}
47
48pub struct BigQuerySource {
49    rt: Arc<Runtime>,
50    client: Arc<Client>,
51    project_id: String,
52    origin_query: Option<String>,
53    queries: Vec<CXQuery<String>>,
54    names: Vec<String>,
55    schema: Vec<BigQueryTypeSystem>,
56}
57
58impl BigQuerySource {
59    #[throws(BigQuerySourceError)]
60    pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
61        let url = Url::parse(conn)?;
62        let sa_key_path = url.path();
63        let client = Arc::new(rt.block_on(
64            gcp_bigquery_client::Client::from_service_account_key_file(sa_key_path),
65        )?);
66        let auth_data = std::fs::read_to_string(sa_key_path)?;
67        let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
68        let project_id = auth_json
69            .get("project_id")
70            .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
71            .as_str()
72            .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?
73            .to_string();
74        Self {
75            rt,
76            client,
77            project_id,
78            origin_query: None,
79            queries: vec![],
80            names: vec![],
81            schema: vec![],
82        }
83    }
84}
85
86impl Source for BigQuerySource
87where
88    BigQuerySourcePartition:
89        SourcePartition<TypeSystem = BigQueryTypeSystem, Error = BigQuerySourceError>,
90{
91    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
92    type Partition = BigQuerySourcePartition;
93    type TypeSystem = BigQueryTypeSystem;
94    type Error = BigQuerySourceError;
95
96    #[throws(BigQuerySourceError)]
97    fn set_data_order(&mut self, data_order: DataOrder) {
98        if !matches!(data_order, DataOrder::RowMajor) {
99            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
100        }
101    }
102
103    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
104        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
105    }
106
107    fn set_origin_query(&mut self, query: Option<String>) {
108        self.origin_query = query;
109    }
110
111    #[throws(BigQuerySourceError)]
112    fn fetch_metadata(&mut self) {
113        assert!(!self.queries.is_empty());
114        let job = self.client.job();
115        for (_, query) in self.queries.iter().enumerate() {
116            let l1query = limit1_query(query, &BigQueryDialect {})?;
117            let rs = self.rt.block_on(job.query(
118                self.project_id.as_str(),
119                QueryRequest::new(l1query.as_str()),
120            ))?;
121            let (names, types) = rs
122                .schema
123                .as_ref()
124                .ok_or_else(|| anyhow!("TableSchema is none"))?
125                .fields
126                .as_ref()
127                .ok_or_else(|| anyhow!("TableFieldSchema is none"))?
128                .iter()
129                .map(|col| {
130                    (
131                        col.clone().name,
132                        BigQueryTypeSystem::from(&col.clone().r#type),
133                    )
134                })
135                .unzip();
136            self.names = names;
137            self.schema = types;
138        }
139    }
140
141    #[throws(BigQuerySourceError)]
142    fn result_rows(&mut self) -> Option<usize> {
143        match &self.origin_query {
144            Some(q) => {
145                let cxq = CXQuery::Naked(q.clone());
146                let cquery = count_query(&cxq, &BigQueryDialect {})?;
147                let job = self.client.job();
148                let mut rs = ResultSet::new_from_query_response(self.rt.block_on(
149                    job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
150                )?);
151                rs.next_row();
152                let nrows = rs
153                    .get_i64(0)?
154                    .ok_or_else(|| anyhow!("cannot get row number"))?;
155                Some(nrows as usize)
156            }
157            None => None,
158        }
159    }
160
161    fn names(&self) -> Vec<String> {
162        self.names.clone()
163    }
164
165    fn schema(&self) -> Vec<Self::TypeSystem> {
166        self.schema.clone()
167    }
168
169    #[throws(BigQuerySourceError)]
170    fn partition(self) -> Vec<Self::Partition> {
171        let mut ret = vec![];
172        for query in self.queries {
173            ret.push(BigQuerySourcePartition::new(
174                self.rt.clone(),
175                self.client.clone(),
176                self.project_id.clone(),
177                &query,
178                &self.schema,
179            ));
180        }
181        ret
182    }
183}
184
185pub struct BigQuerySourcePartition {
186    rt: Arc<Runtime>,
187    client: Arc<Client>,
188    project_id: String,
189    query: CXQuery<String>,
190    schema: Vec<BigQueryTypeSystem>,
191    nrows: usize,
192    ncols: usize,
193}
194
195impl BigQuerySourcePartition {
196    pub fn new(
197        handle: Arc<Runtime>,
198        client: Arc<Client>,
199        project_id: String,
200        query: &CXQuery<String>,
201        schema: &[BigQueryTypeSystem],
202    ) -> Self {
203        Self {
204            rt: handle,
205            client,
206            project_id: project_id.clone(),
207            query: query.clone(),
208            schema: schema.to_vec(),
209            nrows: 0,
210            ncols: schema.len(),
211        }
212    }
213}
214
215impl SourcePartition for BigQuerySourcePartition {
216    type TypeSystem = BigQueryTypeSystem;
217    type Parser<'a> = BigQuerySourceParser;
218    type Error = BigQuerySourceError;
219
220    #[throws(BigQuerySourceError)]
221    fn result_rows(&mut self) {
222        let cquery = count_query(&self.query, &BigQueryDialect {})?;
223        let job = self.client.job();
224        let mut rs =
225            ResultSet::new_from_query_response(self.rt.block_on(
226                job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
227            )?);
228        rs.next_row();
229        let nrows = rs
230            .get_i64(0)?
231            .ok_or_else(|| anyhow!("cannot get row number"))?;
232        self.nrows = nrows as usize;
233    }
234
235    #[throws(BigQuerySourceError)]
236    fn parser(&mut self) -> Self::Parser<'_> {
237        let job = self.client.job();
238        let qry = self.rt.block_on(job.query(
239            self.project_id.as_str(),
240            QueryRequest::new(self.query.as_str()),
241        ))?;
242        let job_info = qry
243            .job_reference
244            .as_ref()
245            .ok_or_else(|| anyhow!("job_reference is none"))?;
246        let params = GetQueryResultsParameters {
247            format_options: None,
248            location: job_info.location.clone(),
249            max_results: None,
250            page_token: None,
251            start_index: None,
252            timeout_ms: None,
253        };
254        let rs = self.rt.block_on(
255            job.get_query_results(
256                self.project_id.as_str(),
257                job_info
258                    .job_id
259                    .as_ref()
260                    .ok_or_else(|| anyhow!("job_id is none"))?
261                    .as_str(),
262                params,
263            ),
264        )?;
265        BigQuerySourceParser::new(self.rt.clone(), self.client.clone(), rs, &self.schema)
266    }
267
268    fn nrows(&self) -> usize {
269        self.nrows
270    }
271
272    fn ncols(&self) -> usize {
273        self.ncols
274    }
275}
276
277pub struct BigQuerySourceParser {
278    rt: Arc<Runtime>,
279    client: Arc<Client>,
280    response: GetQueryResultsResponse,
281    ncols: usize,
282    current_col: usize,
283    current_row: usize,
284    nrows: Option<usize>,
285}
286
287impl<'a> BigQuerySourceParser {
288    fn new(
289        rt: Arc<Runtime>,
290        client: Arc<Client>,
291        response: GetQueryResultsResponse,
292        schema: &[BigQueryTypeSystem],
293    ) -> Self {
294        Self {
295            rt,
296            client,
297            response,
298            ncols: schema.len(),
299            current_row: 0,
300            current_col: 0,
301            nrows: None,
302        }
303    }
304
305    #[throws(BigQuerySourceError)]
306    fn next_loc(&mut self) -> (usize, usize) {
307        let ret = (self.current_row, self.current_col);
308        self.current_row += (self.current_col + 1) / self.ncols;
309        self.current_col = (self.current_col + 1) % self.ncols;
310        ret
311    }
312}
313
314impl<'a> PartitionParser<'a> for BigQuerySourceParser {
315    type TypeSystem = BigQueryTypeSystem;
316    type Error = BigQuerySourceError;
317
318    #[throws(BigQuerySourceError)]
319    fn fetch_next(&mut self) -> (usize, bool) {
320        assert!(self.current_col == 0);
321        match self.nrows {
322            Some(total_rows) => (total_rows - self.current_row, true),
323            None => {
324                // Get all number of rows
325                let total_rows = self
326                    .response
327                    .total_rows
328                    .as_ref()
329                    .ok_or_else(|| anyhow!("total_rows is none"))?
330                    .parse::<usize>()?;
331                self.nrows = Some(total_rows);
332                (total_rows, true)
333            }
334        }
335    }
336}
337
338macro_rules! impl_produce {
339    ($($t: ty,)+) => {
340        $(
341            impl<'r> Produce<'r, $t> for BigQuerySourceParser {
342                type Error = BigQuerySourceError;
343
344                #[throws(BigQuerySourceError)]
345                fn produce(&'r mut self) -> $t {
346                    let (mut ridx, cidx) = self.next_loc()?;
347                    if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
348                        let job = self.client.job();
349                        let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
350                        let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
351                        self.response = self.rt.block_on(
352                            job.get_query_results(
353                                job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
354                                job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
355                                params,
356                            ),
357                        )?;
358                        self.current_row = 0;
359                        ridx = 0;
360                    }
361                    let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
362                    let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
363                    let v = columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value.as_ref().ok_or_else(|| anyhow!("value is none"))?;
364                    let s = v
365                        .as_str()
366                        .ok_or_else(|| anyhow!("cannot get str from json value"))?;
367                    s.parse()
368                        .map_err(|_| {
369                            ConnectorXError::cannot_produce::<$t>(Some(s.into()))
370                        })?
371                }
372            }
373
374            impl<'r> Produce<'r, Option<$t>> for BigQuerySourceParser {
375                type Error = BigQuerySourceError;
376
377                #[throws(BigQuerySourceError)]
378                fn produce(&'r mut self) -> Option<$t> {
379                    let (mut ridx, cidx) = self.next_loc()?;
380                    if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
381                        let job = self.client.job();
382                        let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
383                        let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
384                        self.response = self.rt.block_on(
385                            job.get_query_results(
386                                job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
387                                job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
388                                params,
389                            ),
390                        )?;
391                        self.current_row = 0;
392                        ridx = 0;
393                    }
394                    let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
395                    let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
396                    match &columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value {
397                        None => None,
398                        Some(v) => {
399                            let s = v.as_str().ok_or_else(|| anyhow!("cannot get str from json value"))?;
400                            Some(s.parse().map_err(|_| {
401                            ConnectorXError::cannot_produce::<$t>(Some(s.into()))
402                        })?)},
403                    }
404                }
405            }
406        )+
407    };
408}
409
410impl_produce!(i64, f64, String,);
411
412impl<'r, 'a> Produce<'r, bool> for BigQuerySourceParser {
413    type Error = BigQuerySourceError;
414
415    #[throws(BigQuerySourceError)]
416    fn produce(&mut self) -> bool {
417        let (mut ridx, cidx) = self.next_loc()?;
418        if ridx
419            == (self
420                .response
421                .rows
422                .as_ref()
423                .ok_or_else(|| anyhow!("rows is none"))?
424                .len())
425        {
426            let job = self.client.job();
427            let job_info = self
428                .response
429                .job_reference
430                .as_ref()
431                .ok_or_else(|| anyhow!("job_reference is none"))?;
432            let params = GetQueryResultsParameters {
433                format_options: None,
434                location: job_info.location.clone(),
435                max_results: None,
436                page_token: self.response.page_token.clone(),
437                start_index: None,
438                timeout_ms: None,
439            };
440            self.response = self.rt.block_on(
441                job.get_query_results(
442                    job_info
443                        .project_id
444                        .as_ref()
445                        .ok_or_else(|| anyhow!("project_id is none"))?
446                        .as_str(),
447                    job_info
448                        .job_id
449                        .as_ref()
450                        .ok_or_else(|| anyhow!("job_id is none"))?
451                        .as_str(),
452                    params,
453                ),
454            )?;
455            self.current_row = 0;
456            ridx = 0;
457        }
458        let rows = self
459            .response
460            .rows
461            .as_ref()
462            .ok_or_else(|| anyhow!("rows is none"))?;
463        let columns = rows[ridx]
464            .columns
465            .as_ref()
466            .ok_or_else(|| anyhow!("columns is none"))?;
467        let v = columns
468            .get(cidx)
469            .ok_or_else(|| anyhow!("Table Cell is none"))?
470            .value
471            .as_ref()
472            .ok_or_else(|| anyhow!("value is none"))?;
473        let s = v
474            .as_str()
475            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
476
477        let ret = match s {
478            "true" => true,
479            "false" => false,
480            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
481        };
482        ret
483    }
484}
485
486impl<'r, 'a> Produce<'r, Option<bool>> for BigQuerySourceParser {
487    type Error = BigQuerySourceError;
488
489    #[throws(BigQuerySourceError)]
490    fn produce(&mut self) -> Option<bool> {
491        let (mut ridx, cidx) = self.next_loc()?;
492        if ridx
493            == (self
494                .response
495                .rows
496                .as_ref()
497                .ok_or_else(|| anyhow!("rows is none"))?
498                .len())
499        {
500            let job = self.client.job();
501            let job_info = self
502                .response
503                .job_reference
504                .as_ref()
505                .ok_or_else(|| anyhow!("job_reference is none"))?;
506            let params = GetQueryResultsParameters {
507                format_options: None,
508                location: job_info.location.clone(),
509                max_results: None,
510                page_token: self.response.page_token.clone(),
511                start_index: None,
512                timeout_ms: None,
513            };
514            self.response = self.rt.block_on(
515                job.get_query_results(
516                    job_info
517                        .project_id
518                        .as_ref()
519                        .ok_or_else(|| anyhow!("project_id is none"))?
520                        .as_str(),
521                    job_info
522                        .job_id
523                        .as_ref()
524                        .ok_or_else(|| anyhow!("job_id is none"))?
525                        .as_str(),
526                    params,
527                ),
528            )?;
529            self.current_row = 0;
530            ridx = 0;
531        }
532        let rows = self
533            .response
534            .rows
535            .as_ref()
536            .ok_or_else(|| anyhow!("rows is none"))?;
537        let columns = rows[ridx]
538            .columns
539            .as_ref()
540            .ok_or_else(|| anyhow!("columns is none"))?;
541        let ret = match &columns
542            .get(cidx)
543            .ok_or_else(|| anyhow!("Table Cell is none"))?
544            .value
545        {
546            None => None,
547            Some(v) => {
548                let s = v
549                    .as_str()
550                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
551                match s {
552                    "true" => Some(true),
553                    "false" => Some(false),
554                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
555                }
556            }
557        };
558        ret
559    }
560}
561
562impl<'r, 'a> Produce<'r, NaiveDate> for BigQuerySourceParser {
563    type Error = BigQuerySourceError;
564
565    #[throws(BigQuerySourceError)]
566    fn produce(&mut self) -> NaiveDate {
567        let (mut ridx, cidx) = self.next_loc()?;
568        if ridx
569            == (self
570                .response
571                .rows
572                .as_ref()
573                .ok_or_else(|| anyhow!("rows is none"))?
574                .len())
575        {
576            let job = self.client.job();
577            let job_info = self
578                .response
579                .job_reference
580                .as_ref()
581                .ok_or_else(|| anyhow!("job_reference is none"))?;
582            let params = GetQueryResultsParameters {
583                format_options: None,
584                location: job_info.location.clone(),
585                max_results: None,
586                page_token: self.response.page_token.clone(),
587                start_index: None,
588                timeout_ms: None,
589            };
590            self.response = self.rt.block_on(
591                job.get_query_results(
592                    job_info
593                        .project_id
594                        .as_ref()
595                        .ok_or_else(|| anyhow!("project_id is none"))?
596                        .as_str(),
597                    job_info
598                        .job_id
599                        .as_ref()
600                        .ok_or_else(|| anyhow!("job_id is none"))?
601                        .as_str(),
602                    params,
603                ),
604            )?;
605            self.current_row = 0;
606            ridx = 0;
607        }
608        let rows = self
609            .response
610            .rows
611            .as_ref()
612            .ok_or_else(|| anyhow!("rows is none"))?;
613        let columns = rows[ridx]
614            .columns
615            .as_ref()
616            .ok_or_else(|| anyhow!("columns is none"))?;
617        let v = columns
618            .get(cidx)
619            .ok_or_else(|| anyhow!("Table Cell is none"))?
620            .value
621            .as_ref()
622            .ok_or_else(|| anyhow!("value is none"))?;
623        let s = v
624            .as_str()
625            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
626        NaiveDate::parse_from_str(s, "%Y-%m-%d")
627            .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into())))?
628    }
629}
630
631impl<'r, 'a> Produce<'r, Option<NaiveDate>> for BigQuerySourceParser {
632    type Error = BigQuerySourceError;
633
634    #[throws(BigQuerySourceError)]
635    fn produce(&mut self) -> Option<NaiveDate> {
636        let (mut ridx, cidx) = self.next_loc()?;
637        if ridx
638            == (self
639                .response
640                .rows
641                .as_ref()
642                .ok_or_else(|| anyhow!("rows is none"))?
643                .len())
644        {
645            let job = self.client.job();
646            let job_info = self
647                .response
648                .job_reference
649                .as_ref()
650                .ok_or_else(|| anyhow!("job_reference is none"))?;
651            let params = GetQueryResultsParameters {
652                format_options: None,
653                location: job_info.location.clone(),
654                max_results: None,
655                page_token: self.response.page_token.clone(),
656                start_index: None,
657                timeout_ms: None,
658            };
659            self.response = self.rt.block_on(
660                job.get_query_results(
661                    job_info
662                        .project_id
663                        .as_ref()
664                        .ok_or_else(|| anyhow!("project_id is none"))?
665                        .as_str(),
666                    job_info
667                        .job_id
668                        .as_ref()
669                        .ok_or_else(|| anyhow!("job_id is none"))?
670                        .as_str(),
671                    params,
672                ),
673            )?;
674            self.current_row = 0;
675            ridx = 0;
676        }
677        let rows = self
678            .response
679            .rows
680            .as_ref()
681            .ok_or_else(|| anyhow!("rows is none"))?;
682        let columns = rows[ridx]
683            .columns
684            .as_ref()
685            .ok_or_else(|| anyhow!("columns is none"))?;
686        match &columns
687            .get(cidx)
688            .ok_or_else(|| anyhow!("Table Cell is none"))?
689            .value
690        {
691            None => None,
692            Some(v) => {
693                let s = v
694                    .as_str()
695                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
696                Some(
697                    NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
698                        ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
699                    })?,
700                )
701            }
702        }
703    }
704}
705
706impl<'r, 'a> Produce<'r, NaiveDateTime> for BigQuerySourceParser {
707    type Error = BigQuerySourceError;
708
709    #[throws(BigQuerySourceError)]
710    fn produce(&mut self) -> NaiveDateTime {
711        let (mut ridx, cidx) = self.next_loc()?;
712        if ridx
713            == (self
714                .response
715                .rows
716                .as_ref()
717                .ok_or_else(|| anyhow!("rows is none"))?
718                .len())
719        {
720            let job = self.client.job();
721            let job_info = self
722                .response
723                .job_reference
724                .as_ref()
725                .ok_or_else(|| anyhow!("job_reference is none"))?;
726            let params = GetQueryResultsParameters {
727                format_options: None,
728                location: job_info.location.clone(),
729                max_results: None,
730                page_token: self.response.page_token.clone(),
731                start_index: None,
732                timeout_ms: None,
733            };
734            self.response = self.rt.block_on(
735                job.get_query_results(
736                    job_info
737                        .project_id
738                        .as_ref()
739                        .ok_or_else(|| anyhow!("project_id is none"))?
740                        .as_str(),
741                    job_info
742                        .job_id
743                        .as_ref()
744                        .ok_or_else(|| anyhow!("job_id is none"))?
745                        .as_str(),
746                    params,
747                ),
748            )?;
749            self.current_row = 0;
750            ridx = 0;
751        }
752        let rows = self
753            .response
754            .rows
755            .as_ref()
756            .ok_or_else(|| anyhow!("rows is none"))?;
757        let columns = rows[ridx]
758            .columns
759            .as_ref()
760            .ok_or_else(|| anyhow!("columns is none"))?;
761        let v = columns
762            .get(cidx)
763            .ok_or_else(|| anyhow!("Table Cell is none"))?
764            .value
765            .as_ref()
766            .ok_or_else(|| anyhow!("value is none"))?;
767        let s = v
768            .as_str()
769            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
770        NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
771            .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
772            .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())))?
773    }
774}
775
776impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for BigQuerySourceParser {
777    type Error = BigQuerySourceError;
778
779    #[throws(BigQuerySourceError)]
780    fn produce(&mut self) -> Option<NaiveDateTime> {
781        let (mut ridx, cidx) = self.next_loc()?;
782        if ridx
783            == (self
784                .response
785                .rows
786                .as_ref()
787                .ok_or_else(|| anyhow!("rows is none"))?
788                .len())
789        {
790            let job = self.client.job();
791            let job_info = self
792                .response
793                .job_reference
794                .as_ref()
795                .ok_or_else(|| anyhow!("job_reference is none"))?;
796            let params = GetQueryResultsParameters {
797                format_options: None,
798                location: job_info.location.clone(),
799                max_results: None,
800                page_token: self.response.page_token.clone(),
801                start_index: None,
802                timeout_ms: None,
803            };
804            self.response = self.rt.block_on(
805                job.get_query_results(
806                    job_info
807                        .project_id
808                        .as_ref()
809                        .ok_or_else(|| anyhow!("project_id is none"))?
810                        .as_str(),
811                    job_info
812                        .job_id
813                        .as_ref()
814                        .ok_or_else(|| anyhow!("job_id is none"))?
815                        .as_str(),
816                    params,
817                ),
818            )?;
819            self.current_row = 0;
820            ridx = 0;
821        }
822        let rows = self
823            .response
824            .rows
825            .as_ref()
826            .ok_or_else(|| anyhow!("rows is none"))?;
827        let columns = rows[ridx]
828            .columns
829            .as_ref()
830            .ok_or_else(|| anyhow!("columns is none"))?;
831        match &columns
832            .get(cidx)
833            .ok_or_else(|| anyhow!("Table Cell is none"))?
834            .value
835        {
836            None => None,
837            Some(v) => {
838                let s = v
839                    .as_str()
840                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
841                Some(
842                    NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
843                        .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
844                        .map_err(|_| {
845                            ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into()))
846                        })?,
847                )
848            }
849        }
850    }
851}
852
853impl<'r, 'a> Produce<'r, NaiveTime> for BigQuerySourceParser {
854    type Error = BigQuerySourceError;
855
856    #[throws(BigQuerySourceError)]
857    fn produce(&mut self) -> NaiveTime {
858        let (mut ridx, cidx) = self.next_loc()?;
859        if ridx
860            == (self
861                .response
862                .rows
863                .as_ref()
864                .ok_or_else(|| anyhow!("rows is none"))?
865                .len())
866        {
867            let job = self.client.job();
868            let job_info = self
869                .response
870                .job_reference
871                .as_ref()
872                .ok_or_else(|| anyhow!("job_reference is none"))?;
873            let params = GetQueryResultsParameters {
874                format_options: None,
875                location: job_info.location.clone(),
876                max_results: None,
877                page_token: self.response.page_token.clone(),
878                start_index: None,
879                timeout_ms: None,
880            };
881            self.response = self.rt.block_on(
882                job.get_query_results(
883                    job_info
884                        .project_id
885                        .as_ref()
886                        .ok_or_else(|| anyhow!("project_id is none"))?
887                        .as_str(),
888                    job_info
889                        .job_id
890                        .as_ref()
891                        .ok_or_else(|| anyhow!("job_id is none"))?
892                        .as_str(),
893                    params,
894                ),
895            )?;
896            self.current_row = 0;
897            ridx = 0;
898        }
899        let rows = self
900            .response
901            .rows
902            .as_ref()
903            .ok_or_else(|| anyhow!("rows is none"))?;
904        let columns = rows[ridx]
905            .columns
906            .as_ref()
907            .ok_or_else(|| anyhow!("columns is none"))?;
908        let v = columns
909            .get(cidx)
910            .ok_or_else(|| anyhow!("Table Cell is none"))?
911            .value
912            .as_ref()
913            .ok_or_else(|| anyhow!("value is none"))?;
914        let s = v
915            .as_str()
916            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
917        NaiveTime::parse_from_str(s, "%H:%M:%S")
918            .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?
919    }
920}
921
922impl<'r, 'a> Produce<'r, Option<NaiveTime>> for BigQuerySourceParser {
923    type Error = BigQuerySourceError;
924
925    #[throws(BigQuerySourceError)]
926    fn produce(&mut self) -> Option<NaiveTime> {
927        let (mut ridx, cidx) = self.next_loc()?;
928        if ridx
929            == (self
930                .response
931                .rows
932                .as_ref()
933                .ok_or_else(|| anyhow!("rows is none"))?
934                .len())
935        {
936            let job = self.client.job();
937            let job_info = self
938                .response
939                .job_reference
940                .as_ref()
941                .ok_or_else(|| anyhow!("job_reference is none"))?;
942            let params = GetQueryResultsParameters {
943                format_options: None,
944                location: job_info.location.clone(),
945                max_results: None,
946                page_token: self.response.page_token.clone(),
947                start_index: None,
948                timeout_ms: None,
949            };
950            self.response = self.rt.block_on(
951                job.get_query_results(
952                    job_info
953                        .project_id
954                        .as_ref()
955                        .ok_or_else(|| anyhow!("project_id is none"))?
956                        .as_str(),
957                    job_info
958                        .job_id
959                        .as_ref()
960                        .ok_or_else(|| anyhow!("job_id is none"))?
961                        .as_str(),
962                    params,
963                ),
964            )?;
965            self.current_row = 0;
966            ridx = 0;
967        }
968        let rows = self
969            .response
970            .rows
971            .as_ref()
972            .ok_or_else(|| anyhow!("rows is none"))?;
973        let columns = rows[ridx]
974            .columns
975            .as_ref()
976            .ok_or_else(|| anyhow!("columns is none"))?;
977        match &columns
978            .get(cidx)
979            .ok_or_else(|| anyhow!("Table Cell is none"))?
980            .value
981        {
982            None => None,
983            Some(v) => {
984                let s = v
985                    .as_str()
986                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
987                Some(
988                    NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| {
989                        ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into()))
990                    })?,
991                )
992            }
993        }
994    }
995}
996
997impl<'r, 'a> Produce<'r, DateTime<Utc>> for BigQuerySourceParser {
998    type Error = BigQuerySourceError;
999
1000    #[throws(BigQuerySourceError)]
1001    fn produce(&mut self) -> DateTime<Utc> {
1002        let (mut ridx, cidx) = self.next_loc()?;
1003        if ridx
1004            == (self
1005                .response
1006                .rows
1007                .as_ref()
1008                .ok_or_else(|| anyhow!("rows is none"))?
1009                .len())
1010        {
1011            let job = self.client.job();
1012            let job_info = self
1013                .response
1014                .job_reference
1015                .as_ref()
1016                .ok_or_else(|| anyhow!("job_reference is none"))?;
1017            let params = GetQueryResultsParameters {
1018                format_options: None,
1019                location: job_info.location.clone(),
1020                max_results: None,
1021                page_token: self.response.page_token.clone(),
1022                start_index: None,
1023                timeout_ms: None,
1024            };
1025            self.response = self.rt.block_on(
1026                job.get_query_results(
1027                    job_info
1028                        .project_id
1029                        .as_ref()
1030                        .ok_or_else(|| anyhow!("project_id is none"))?
1031                        .as_str(),
1032                    job_info
1033                        .job_id
1034                        .as_ref()
1035                        .ok_or_else(|| anyhow!("job_id is none"))?
1036                        .as_str(),
1037                    params,
1038                ),
1039            )?;
1040            self.current_row = 0;
1041            ridx = 0;
1042        }
1043        let rows = self
1044            .response
1045            .rows
1046            .as_ref()
1047            .ok_or_else(|| anyhow!("rows is none"))?;
1048        let columns = rows[ridx]
1049            .columns
1050            .as_ref()
1051            .ok_or_else(|| anyhow!("columns is none"))?;
1052        let v = columns
1053            .get(cidx)
1054            .ok_or_else(|| anyhow!("Table Cell is none"))?
1055            .value
1056            .as_ref()
1057            .ok_or_else(|| anyhow!("value is none"))?;
1058        let timestamp_ns = (v
1059            .as_str()
1060            .ok_or_else(|| anyhow!("cannot get str from json value"))?
1061            .parse::<f64>()?
1062            * 1e9) as i64;
1063        let secs = timestamp_ns / 1000000000;
1064        let nsecs = (timestamp_ns % 1000000000) as u32;
1065        DateTime::from_timestamp(secs, nsecs)
1066            .unwrap_or_else(|| panic!("out of range number: {} {}", secs, nsecs))
1067    }
1068}
1069
1070impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for BigQuerySourceParser {
1071    type Error = BigQuerySourceError;
1072
1073    #[throws(BigQuerySourceError)]
1074    fn produce(&mut self) -> Option<DateTime<Utc>> {
1075        let (mut ridx, cidx) = self.next_loc()?;
1076        if ridx
1077            == (self
1078                .response
1079                .rows
1080                .as_ref()
1081                .ok_or_else(|| anyhow!("rows is none"))?
1082                .len())
1083        {
1084            let job = self.client.job();
1085            let job_info = self
1086                .response
1087                .job_reference
1088                .as_ref()
1089                .ok_or_else(|| anyhow!("job_reference is none"))?;
1090            let params = GetQueryResultsParameters {
1091                format_options: None,
1092                location: job_info.location.clone(),
1093                max_results: None,
1094                page_token: self.response.page_token.clone(),
1095                start_index: None,
1096                timeout_ms: None,
1097            };
1098            self.response = self.rt.block_on(
1099                job.get_query_results(
1100                    job_info
1101                        .project_id
1102                        .as_ref()
1103                        .ok_or_else(|| anyhow!("project_id is none"))?
1104                        .as_str(),
1105                    job_info
1106                        .job_id
1107                        .as_ref()
1108                        .ok_or_else(|| anyhow!("job_id is none"))?
1109                        .as_str(),
1110                    params,
1111                ),
1112            )?;
1113            self.current_row = 0;
1114            ridx = 0;
1115        }
1116        let rows = self
1117            .response
1118            .rows
1119            .as_ref()
1120            .ok_or_else(|| anyhow!("rows is none"))?;
1121        let columns = rows[ridx]
1122            .columns
1123            .as_ref()
1124            .ok_or_else(|| anyhow!("columns is none"))?;
1125        match &columns
1126            .get(cidx)
1127            .ok_or_else(|| anyhow!("Table Cell is none"))?
1128            .value
1129        {
1130            None => None,
1131            Some(v) => {
1132                let timestamp_ns = (v
1133                    .as_str()
1134                    .ok_or_else(|| anyhow!("cannot get str from json value"))?
1135                    .parse::<f64>()?
1136                    * 1e9) as i64;
1137                let secs = timestamp_ns / 1000000000;
1138                let nsecs = (timestamp_ns % 1000000000) as u32;
1139                DateTime::from_timestamp(secs, nsecs)
1140            }
1141        }
1142    }
1143}