connectorx/sources/bigquery/
mod.rs

1//! Source implementation for Google BigQuery
2
3mod errors;
4mod typesystem;
5
6pub use self::errors::BigQuerySourceError;
7use crate::{
8    data_order::DataOrder,
9    errors::ConnectorXError,
10    sources::{PartitionParser, Produce, Source, SourcePartition},
11    sql::{count_query, limit1_query, CXQuery},
12};
13use anyhow::anyhow;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use fehler::{throw, throws};
16use gcp_bigquery_client::{
17    model::{
18        get_query_results_parameters::GetQueryResultsParameters,
19        get_query_results_response::GetQueryResultsResponse, query_request::QueryRequest,
20        query_response::ResultSet,
21    },
22    Client,
23};
24use sqlparser::dialect::Dialect;
25use std::sync::Arc;
26use tokio::runtime::Runtime;
27pub use typesystem::BigQueryTypeSystem;
28use url::Url;
29
30#[derive(Debug)]
31pub struct BigQueryDialect {}
32
33impl Dialect for BigQueryDialect {
34    // See https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
35    fn is_delimited_identifier_start(&self, ch: char) -> bool {
36        ch == '`'
37    }
38
39    fn is_identifier_start(&self, ch: char) -> bool {
40        ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' || ch == '-'
41    }
42
43    fn is_identifier_part(&self, ch: char) -> bool {
44        self.is_identifier_start(ch) || ch.is_ascii_digit()
45    }
46}
47
48pub struct BigQuerySource {
49    rt: Arc<Runtime>,
50    client: Arc<Client>,
51    project_id: String,
52    origin_query: Option<String>,
53    queries: Vec<CXQuery<String>>,
54    names: Vec<String>,
55    schema: Vec<BigQueryTypeSystem>,
56}
57
58impl BigQuerySource {
59    #[throws(BigQuerySourceError)]
60    pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
61        let url = Url::parse(conn)?;
62        let sa_key_path = url.path();
63        let client = Arc::new(rt.block_on(
64            gcp_bigquery_client::Client::from_service_account_key_file(sa_key_path),
65        )?);
66        let auth_data = std::fs::read_to_string(sa_key_path)?;
67        let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
68        let project_id = auth_json
69            .get("project_id")
70            .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
71            .as_str()
72            .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?
73            .to_string();
74        Self {
75            rt,
76            client,
77            project_id,
78            origin_query: None,
79            queries: vec![],
80            names: vec![],
81            schema: vec![],
82        }
83    }
84}
85
86impl Source for BigQuerySource
87where
88    BigQuerySourcePartition:
89        SourcePartition<TypeSystem = BigQueryTypeSystem, Error = BigQuerySourceError>,
90{
91    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
92    type Partition = BigQuerySourcePartition;
93    type TypeSystem = BigQueryTypeSystem;
94    type Error = BigQuerySourceError;
95
96    #[throws(BigQuerySourceError)]
97    fn set_data_order(&mut self, data_order: DataOrder) {
98        if !matches!(data_order, DataOrder::RowMajor) {
99            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
100        }
101    }
102
103    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
104        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
105    }
106
107    fn set_origin_query(&mut self, query: Option<String>) {
108        self.origin_query = query;
109    }
110
111    #[throws(BigQuerySourceError)]
112    fn fetch_metadata(&mut self) {
113        assert!(!self.queries.is_empty());
114        let job = self.client.job();
115        for (_, query) in self.queries.iter().enumerate() {
116            let l1query = limit1_query(query, &BigQueryDialect {})?;
117            let rs = self.rt.block_on(job.query(
118                self.project_id.as_str(),
119                QueryRequest::new(l1query.as_str()),
120            ))?;
121            let (names, types) = rs
122                .schema
123                .as_ref()
124                .ok_or_else(|| anyhow!("TableSchema is none"))?
125                .fields
126                .as_ref()
127                .ok_or_else(|| anyhow!("TableFieldSchema is none"))?
128                .iter()
129                .map(|col| {
130                    (
131                        col.clone().name,
132                        BigQueryTypeSystem::from(&col.clone().r#type),
133                    )
134                })
135                .unzip();
136            self.names = names;
137            self.schema = types;
138        }
139    }
140
141    #[throws(BigQuerySourceError)]
142    fn result_rows(&mut self) -> Option<usize> {
143        match &self.origin_query {
144            Some(q) => {
145                let cxq = CXQuery::Naked(q.clone());
146                let cquery = count_query(&cxq, &BigQueryDialect {})?;
147                let job = self.client.job();
148                let mut rs = ResultSet::new_from_query_response(self.rt.block_on(
149                    job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
150                )?);
151                rs.next_row();
152                let nrows = rs
153                    .get_i64(0)?
154                    .ok_or_else(|| anyhow!("cannot get row number"))?;
155                Some(nrows as usize)
156            }
157            None => None,
158        }
159    }
160
161    fn names(&self) -> Vec<String> {
162        self.names.clone()
163    }
164
165    fn schema(&self) -> Vec<Self::TypeSystem> {
166        self.schema.clone()
167    }
168
169    #[throws(BigQuerySourceError)]
170    fn partition(self) -> Vec<Self::Partition> {
171        let mut ret = vec![];
172        for query in self.queries {
173            ret.push(BigQuerySourcePartition::new(
174                self.rt.clone(),
175                self.client.clone(),
176                self.project_id.clone(),
177                &query,
178                &self.schema,
179            ));
180        }
181        ret
182    }
183}
184
185pub struct BigQuerySourcePartition {
186    rt: Arc<Runtime>,
187    client: Arc<Client>,
188    project_id: String,
189    query: CXQuery<String>,
190    schema: Vec<BigQueryTypeSystem>,
191    nrows: usize,
192    ncols: usize,
193}
194
195impl BigQuerySourcePartition {
196    pub fn new(
197        handle: Arc<Runtime>,
198        client: Arc<Client>,
199        project_id: String,
200        query: &CXQuery<String>,
201        schema: &[BigQueryTypeSystem],
202    ) -> Self {
203        Self {
204            rt: handle,
205            client,
206            project_id: project_id.clone(),
207            query: query.clone(),
208            schema: schema.to_vec(),
209            nrows: 0,
210            ncols: schema.len(),
211        }
212    }
213}
214
215impl SourcePartition for BigQuerySourcePartition {
216    type TypeSystem = BigQueryTypeSystem;
217    type Parser<'a> = BigQuerySourceParser;
218    type Error = BigQuerySourceError;
219
220    #[throws(BigQuerySourceError)]
221    fn result_rows(&mut self) {
222        let cquery = count_query(&self.query, &BigQueryDialect {})?;
223        let job = self.client.job();
224        let mut rs =
225            ResultSet::new_from_query_response(self.rt.block_on(
226                job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
227            )?);
228        rs.next_row();
229        let nrows = rs
230            .get_i64(0)?
231            .ok_or_else(|| anyhow!("cannot get row number"))?;
232        self.nrows = nrows as usize;
233    }
234
235    #[throws(BigQuerySourceError)]
236    fn parser(&mut self) -> Self::Parser<'_> {
237        let job = self.client.job();
238        let qry = self.rt.block_on(job.query(
239            self.project_id.as_str(),
240            QueryRequest::new(self.query.as_str()),
241        ))?;
242        let job_info = qry
243            .job_reference
244            .as_ref()
245            .ok_or_else(|| anyhow!("job_reference is none"))?;
246        let params = GetQueryResultsParameters {
247            format_options: None,
248            location: job_info.location.clone(),
249            max_results: None,
250            page_token: None,
251            start_index: None,
252            timeout_ms: None,
253        };
254        let rs = self.rt.block_on(
255            job.get_query_results(
256                self.project_id.as_str(),
257                job_info
258                    .job_id
259                    .as_ref()
260                    .ok_or_else(|| anyhow!("job_id is none"))?
261                    .as_str(),
262                params,
263            ),
264        )?;
265        BigQuerySourceParser::new(self.rt.clone(), self.client.clone(), rs, &self.schema)
266    }
267
268    fn nrows(&self) -> usize {
269        self.nrows
270    }
271
272    fn ncols(&self) -> usize {
273        self.ncols
274    }
275}
276
277pub struct BigQuerySourceParser {
278    rt: Arc<Runtime>,
279    client: Arc<Client>,
280    response: GetQueryResultsResponse,
281    ncols: usize,
282    current_col: usize,
283    current_row: usize,
284    nrows: Option<usize>,
285}
286
287impl<'a> BigQuerySourceParser {
288    fn new(
289        rt: Arc<Runtime>,
290        client: Arc<Client>,
291        response: GetQueryResultsResponse,
292        schema: &[BigQueryTypeSystem],
293    ) -> Self {
294        Self {
295            rt,
296            client,
297            response,
298            ncols: schema.len(),
299            current_row: 0,
300            current_col: 0,
301            nrows: None,
302        }
303    }
304
305    #[throws(BigQuerySourceError)]
306    fn next_loc(&mut self) -> (usize, usize) {
307        let ret = (self.current_row, self.current_col);
308        self.current_row += (self.current_col + 1) / self.ncols;
309        self.current_col = (self.current_col + 1) % self.ncols;
310        ret
311    }
312}
313
314impl<'a> PartitionParser<'a> for BigQuerySourceParser {
315    type TypeSystem = BigQueryTypeSystem;
316    type Error = BigQuerySourceError;
317
318    #[throws(BigQuerySourceError)]
319    fn fetch_next(&mut self) -> (usize, bool) {
320        assert!(self.current_col == 0);
321        match self.nrows {
322            Some(total_rows) => (total_rows - self.current_row, true),
323            None => {
324                // Get all number of rows
325                let total_rows = self
326                    .response
327                    .total_rows
328                    .as_ref()
329                    .ok_or_else(|| anyhow!("total_rows is none"))?
330                    .parse::<usize>()?;
331                self.nrows = Some(total_rows);
332                (total_rows, true)
333            }
334        }
335    }
336}
337
338macro_rules! impl_produce {
339    ($($t: ty,)+) => {
340        $(
341            impl<'r> Produce<'r, $t> for BigQuerySourceParser {
342                type Error = BigQuerySourceError;
343
344                #[throws(BigQuerySourceError)]
345                fn produce(&'r mut self) -> $t {
346                    let (mut ridx, cidx) = self.next_loc()?;
347                    if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
348                        let job = self.client.job();
349                        let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
350                        let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
351                        self.response = self.rt.block_on(
352                            job.get_query_results(
353                                job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
354                                job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
355                                params,
356                            ),
357                        )?;
358                        self.current_row = 0;
359                        ridx = 0;
360                    }
361                    let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
362                    let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
363                    let v = columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value.as_ref().ok_or_else(|| anyhow!("value is none"))?;
364                    let s = v
365                        .as_str()
366                        .ok_or_else(|| anyhow!("cannot get str from json value"))?;
367                    s.parse()
368                        .map_err(|_| {
369                            ConnectorXError::cannot_produce::<$t>(Some(s.into()))
370                        })?
371                }
372            }
373
374            impl<'r> Produce<'r, Option<$t>> for BigQuerySourceParser {
375                type Error = BigQuerySourceError;
376
377                #[throws(BigQuerySourceError)]
378                fn produce(&'r mut self) -> Option<$t> {
379                    let (mut ridx, cidx) = self.next_loc()?;
380                    if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
381                        let job = self.client.job();
382                        let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
383                        let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
384                        self.response = self.rt.block_on(
385                            job.get_query_results(
386                                job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
387                                job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
388                                params,
389                            ),
390                        )?;
391                        self.current_row = 0;
392                        ridx = 0;
393                    }
394                    let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
395                    let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
396                    match &columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value {
397                        None => None,
398                        Some(v) => {
399                            let s = v.as_str().ok_or_else(|| anyhow!("cannot get str from json value"))?;
400                            Some(s.parse().map_err(|_| {
401                            ConnectorXError::cannot_produce::<$t>(Some(s.into()))
402                        })?)},
403                    }
404                }
405            }
406        )+
407    };
408}
409
410impl_produce!(i64, f64, String,);
411
412impl<'r, 'a> Produce<'r, bool> for BigQuerySourceParser {
413    type Error = BigQuerySourceError;
414
415    #[throws(BigQuerySourceError)]
416    fn produce(&mut self) -> bool {
417        let (mut ridx, cidx) = self.next_loc()?;
418        if ridx
419            == (self
420                .response
421                .rows
422                .as_ref()
423                .ok_or_else(|| anyhow!("rows is none"))?
424                .len())
425        {
426            let job = self.client.job();
427            let job_info = self
428                .response
429                .job_reference
430                .as_ref()
431                .ok_or_else(|| anyhow!("job_reference is none"))?;
432            let params = GetQueryResultsParameters {
433                format_options: None,
434                location: job_info.location.clone(),
435                max_results: None,
436                page_token: self.response.page_token.clone(),
437                start_index: None,
438                timeout_ms: None,
439            };
440            self.response = self.rt.block_on(
441                job.get_query_results(
442                    job_info
443                        .project_id
444                        .as_ref()
445                        .ok_or_else(|| anyhow!("project_id is none"))?
446                        .as_str(),
447                    job_info
448                        .job_id
449                        .as_ref()
450                        .ok_or_else(|| anyhow!("job_id is none"))?
451                        .as_str(),
452                    params,
453                ),
454            )?;
455            self.current_row = 0;
456            ridx = 0;
457        }
458        let rows = self
459            .response
460            .rows
461            .as_ref()
462            .ok_or_else(|| anyhow!("rows is none"))?;
463        let columns = rows[ridx]
464            .columns
465            .as_ref()
466            .ok_or_else(|| anyhow!("columns is none"))?;
467        let v = columns
468            .get(cidx)
469            .ok_or_else(|| anyhow!("Table Cell is none"))?
470            .value
471            .as_ref()
472            .ok_or_else(|| anyhow!("value is none"))?;
473        let s = v
474            .as_str()
475            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
476
477        let ret = match s {
478            "true" => true,
479            "false" => false,
480            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
481        };
482        ret
483    }
484}
485
486impl<'r, 'a> Produce<'r, Option<bool>> for BigQuerySourceParser {
487    type Error = BigQuerySourceError;
488
489    #[throws(BigQuerySourceError)]
490    fn produce(&mut self) -> Option<bool> {
491        let (mut ridx, cidx) = self.next_loc()?;
492        if ridx
493            == (self
494                .response
495                .rows
496                .as_ref()
497                .ok_or_else(|| anyhow!("rows is none"))?
498                .len())
499        {
500            let job = self.client.job();
501            let job_info = self
502                .response
503                .job_reference
504                .as_ref()
505                .ok_or_else(|| anyhow!("job_reference is none"))?;
506            let params = GetQueryResultsParameters {
507                format_options: None,
508                location: job_info.location.clone(),
509                max_results: None,
510                page_token: self.response.page_token.clone(),
511                start_index: None,
512                timeout_ms: None,
513            };
514            self.response = self.rt.block_on(
515                job.get_query_results(
516                    job_info
517                        .project_id
518                        .as_ref()
519                        .ok_or_else(|| anyhow!("project_id is none"))?
520                        .as_str(),
521                    job_info
522                        .job_id
523                        .as_ref()
524                        .ok_or_else(|| anyhow!("job_id is none"))?
525                        .as_str(),
526                    params,
527                ),
528            )?;
529            self.current_row = 0;
530            ridx = 0;
531        }
532        let rows = self
533            .response
534            .rows
535            .as_ref()
536            .ok_or_else(|| anyhow!("rows is none"))?;
537        let columns = rows[ridx]
538            .columns
539            .as_ref()
540            .ok_or_else(|| anyhow!("columns is none"))?;
541        let ret = match &columns
542            .get(cidx)
543            .ok_or_else(|| anyhow!("Table Cell is none"))?
544            .value
545        {
546            None => None,
547            Some(v) => {
548                let s = v
549                    .as_str()
550                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
551                match s {
552                    "true" => Some(true),
553                    "false" => Some(false),
554                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
555                }
556            }
557        };
558        ret
559    }
560}
561
562impl<'r, 'a> Produce<'r, NaiveDate> for BigQuerySourceParser {
563    type Error = BigQuerySourceError;
564
565    #[throws(BigQuerySourceError)]
566    fn produce(&mut self) -> NaiveDate {
567        let (mut ridx, cidx) = self.next_loc()?;
568        if ridx
569            == (self
570                .response
571                .rows
572                .as_ref()
573                .ok_or_else(|| anyhow!("rows is none"))?
574                .len())
575        {
576            let job = self.client.job();
577            let job_info = self
578                .response
579                .job_reference
580                .as_ref()
581                .ok_or_else(|| anyhow!("job_reference is none"))?;
582            let params = GetQueryResultsParameters {
583                format_options: None,
584                location: job_info.location.clone(),
585                max_results: None,
586                page_token: self.response.page_token.clone(),
587                start_index: None,
588                timeout_ms: None,
589            };
590            self.response = self.rt.block_on(
591                job.get_query_results(
592                    job_info
593                        .project_id
594                        .as_ref()
595                        .ok_or_else(|| anyhow!("project_id is none"))?
596                        .as_str(),
597                    job_info
598                        .job_id
599                        .as_ref()
600                        .ok_or_else(|| anyhow!("job_id is none"))?
601                        .as_str(),
602                    params,
603                ),
604            )?;
605            self.current_row = 0;
606            ridx = 0;
607        }
608        let rows = self
609            .response
610            .rows
611            .as_ref()
612            .ok_or_else(|| anyhow!("rows is none"))?;
613        let columns = rows[ridx]
614            .columns
615            .as_ref()
616            .ok_or_else(|| anyhow!("columns is none"))?;
617        let v = columns
618            .get(cidx)
619            .ok_or_else(|| anyhow!("Table Cell is none"))?
620            .value
621            .as_ref()
622            .ok_or_else(|| anyhow!("value is none"))?;
623        let s = v
624            .as_str()
625            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
626        NaiveDate::parse_from_str(s, "%Y-%m-%d")
627            .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into())))?
628    }
629}
630
631impl<'r, 'a> Produce<'r, Option<NaiveDate>> for BigQuerySourceParser {
632    type Error = BigQuerySourceError;
633
634    #[throws(BigQuerySourceError)]
635    fn produce(&mut self) -> Option<NaiveDate> {
636        let (mut ridx, cidx) = self.next_loc()?;
637        if ridx
638            == (self
639                .response
640                .rows
641                .as_ref()
642                .ok_or_else(|| anyhow!("rows is none"))?
643                .len())
644        {
645            let job = self.client.job();
646            let job_info = self
647                .response
648                .job_reference
649                .as_ref()
650                .ok_or_else(|| anyhow!("job_reference is none"))?;
651            let params = GetQueryResultsParameters {
652                format_options: None,
653                location: job_info.location.clone(),
654                max_results: None,
655                page_token: self.response.page_token.clone(),
656                start_index: None,
657                timeout_ms: None,
658            };
659            self.response = self.rt.block_on(
660                job.get_query_results(
661                    job_info
662                        .project_id
663                        .as_ref()
664                        .ok_or_else(|| anyhow!("project_id is none"))?
665                        .as_str(),
666                    job_info
667                        .job_id
668                        .as_ref()
669                        .ok_or_else(|| anyhow!("job_id is none"))?
670                        .as_str(),
671                    params,
672                ),
673            )?;
674            self.current_row = 0;
675            ridx = 0;
676        }
677        let rows = self
678            .response
679            .rows
680            .as_ref()
681            .ok_or_else(|| anyhow!("rows is none"))?;
682        let columns = rows[ridx]
683            .columns
684            .as_ref()
685            .ok_or_else(|| anyhow!("columns is none"))?;
686        match &columns
687            .get(cidx)
688            .ok_or_else(|| anyhow!("Table Cell is none"))?
689            .value
690        {
691            None => None,
692            Some(v) => {
693                let s = v
694                    .as_str()
695                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
696                Some(
697                    NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
698                        ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
699                    })?,
700                )
701            }
702        }
703    }
704}
705
706impl<'r, 'a> Produce<'r, NaiveDateTime> for BigQuerySourceParser {
707    type Error = BigQuerySourceError;
708
709    #[throws(BigQuerySourceError)]
710    fn produce(&mut self) -> NaiveDateTime {
711        let (mut ridx, cidx) = self.next_loc()?;
712        if ridx
713            == (self
714                .response
715                .rows
716                .as_ref()
717                .ok_or_else(|| anyhow!("rows is none"))?
718                .len())
719        {
720            let job = self.client.job();
721            let job_info = self
722                .response
723                .job_reference
724                .as_ref()
725                .ok_or_else(|| anyhow!("job_reference is none"))?;
726            let params = GetQueryResultsParameters {
727                format_options: None,
728                location: job_info.location.clone(),
729                max_results: None,
730                page_token: self.response.page_token.clone(),
731                start_index: None,
732                timeout_ms: None,
733            };
734            self.response = self.rt.block_on(
735                job.get_query_results(
736                    job_info
737                        .project_id
738                        .as_ref()
739                        .ok_or_else(|| anyhow!("project_id is none"))?
740                        .as_str(),
741                    job_info
742                        .job_id
743                        .as_ref()
744                        .ok_or_else(|| anyhow!("job_id is none"))?
745                        .as_str(),
746                    params,
747                ),
748            )?;
749            self.current_row = 0;
750            ridx = 0;
751        }
752        let rows = self
753            .response
754            .rows
755            .as_ref()
756            .ok_or_else(|| anyhow!("rows is none"))?;
757        let columns = rows[ridx]
758            .columns
759            .as_ref()
760            .ok_or_else(|| anyhow!("columns is none"))?;
761        let v = columns
762            .get(cidx)
763            .ok_or_else(|| anyhow!("Table Cell is none"))?
764            .value
765            .as_ref()
766            .ok_or_else(|| anyhow!("value is none"))?;
767        let s = v
768            .as_str()
769            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
770        NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
771            .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())))?
772    }
773}
774
775impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for BigQuerySourceParser {
776    type Error = BigQuerySourceError;
777
778    #[throws(BigQuerySourceError)]
779    fn produce(&mut self) -> Option<NaiveDateTime> {
780        let (mut ridx, cidx) = self.next_loc()?;
781        if ridx
782            == (self
783                .response
784                .rows
785                .as_ref()
786                .ok_or_else(|| anyhow!("rows is none"))?
787                .len())
788        {
789            let job = self.client.job();
790            let job_info = self
791                .response
792                .job_reference
793                .as_ref()
794                .ok_or_else(|| anyhow!("job_reference is none"))?;
795            let params = GetQueryResultsParameters {
796                format_options: None,
797                location: job_info.location.clone(),
798                max_results: None,
799                page_token: self.response.page_token.clone(),
800                start_index: None,
801                timeout_ms: None,
802            };
803            self.response = self.rt.block_on(
804                job.get_query_results(
805                    job_info
806                        .project_id
807                        .as_ref()
808                        .ok_or_else(|| anyhow!("project_id is none"))?
809                        .as_str(),
810                    job_info
811                        .job_id
812                        .as_ref()
813                        .ok_or_else(|| anyhow!("job_id is none"))?
814                        .as_str(),
815                    params,
816                ),
817            )?;
818            self.current_row = 0;
819            ridx = 0;
820        }
821        let rows = self
822            .response
823            .rows
824            .as_ref()
825            .ok_or_else(|| anyhow!("rows is none"))?;
826        let columns = rows[ridx]
827            .columns
828            .as_ref()
829            .ok_or_else(|| anyhow!("columns is none"))?;
830        match &columns
831            .get(cidx)
832            .ok_or_else(|| anyhow!("Table Cell is none"))?
833            .value
834        {
835            None => None,
836            Some(v) => {
837                let s = v
838                    .as_str()
839                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
840                Some(
841                    NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").map_err(|_| {
842                        ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into()))
843                    })?,
844                )
845            }
846        }
847    }
848}
849
850impl<'r, 'a> Produce<'r, NaiveTime> for BigQuerySourceParser {
851    type Error = BigQuerySourceError;
852
853    #[throws(BigQuerySourceError)]
854    fn produce(&mut self) -> NaiveTime {
855        let (mut ridx, cidx) = self.next_loc()?;
856        if ridx
857            == (self
858                .response
859                .rows
860                .as_ref()
861                .ok_or_else(|| anyhow!("rows is none"))?
862                .len())
863        {
864            let job = self.client.job();
865            let job_info = self
866                .response
867                .job_reference
868                .as_ref()
869                .ok_or_else(|| anyhow!("job_reference is none"))?;
870            let params = GetQueryResultsParameters {
871                format_options: None,
872                location: job_info.location.clone(),
873                max_results: None,
874                page_token: self.response.page_token.clone(),
875                start_index: None,
876                timeout_ms: None,
877            };
878            self.response = self.rt.block_on(
879                job.get_query_results(
880                    job_info
881                        .project_id
882                        .as_ref()
883                        .ok_or_else(|| anyhow!("project_id is none"))?
884                        .as_str(),
885                    job_info
886                        .job_id
887                        .as_ref()
888                        .ok_or_else(|| anyhow!("job_id is none"))?
889                        .as_str(),
890                    params,
891                ),
892            )?;
893            self.current_row = 0;
894            ridx = 0;
895        }
896        let rows = self
897            .response
898            .rows
899            .as_ref()
900            .ok_or_else(|| anyhow!("rows is none"))?;
901        let columns = rows[ridx]
902            .columns
903            .as_ref()
904            .ok_or_else(|| anyhow!("columns is none"))?;
905        let v = columns
906            .get(cidx)
907            .ok_or_else(|| anyhow!("Table Cell is none"))?
908            .value
909            .as_ref()
910            .ok_or_else(|| anyhow!("value is none"))?;
911        let s = v
912            .as_str()
913            .ok_or_else(|| anyhow!("cannot get str from json value"))?;
914        NaiveTime::parse_from_str(s, "%H:%M:%S")
915            .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?
916    }
917}
918
919impl<'r, 'a> Produce<'r, Option<NaiveTime>> for BigQuerySourceParser {
920    type Error = BigQuerySourceError;
921
922    #[throws(BigQuerySourceError)]
923    fn produce(&mut self) -> Option<NaiveTime> {
924        let (mut ridx, cidx) = self.next_loc()?;
925        if ridx
926            == (self
927                .response
928                .rows
929                .as_ref()
930                .ok_or_else(|| anyhow!("rows is none"))?
931                .len())
932        {
933            let job = self.client.job();
934            let job_info = self
935                .response
936                .job_reference
937                .as_ref()
938                .ok_or_else(|| anyhow!("job_reference is none"))?;
939            let params = GetQueryResultsParameters {
940                format_options: None,
941                location: job_info.location.clone(),
942                max_results: None,
943                page_token: self.response.page_token.clone(),
944                start_index: None,
945                timeout_ms: None,
946            };
947            self.response = self.rt.block_on(
948                job.get_query_results(
949                    job_info
950                        .project_id
951                        .as_ref()
952                        .ok_or_else(|| anyhow!("project_id is none"))?
953                        .as_str(),
954                    job_info
955                        .job_id
956                        .as_ref()
957                        .ok_or_else(|| anyhow!("job_id is none"))?
958                        .as_str(),
959                    params,
960                ),
961            )?;
962            self.current_row = 0;
963            ridx = 0;
964        }
965        let rows = self
966            .response
967            .rows
968            .as_ref()
969            .ok_or_else(|| anyhow!("rows is none"))?;
970        let columns = rows[ridx]
971            .columns
972            .as_ref()
973            .ok_or_else(|| anyhow!("columns is none"))?;
974        match &columns
975            .get(cidx)
976            .ok_or_else(|| anyhow!("Table Cell is none"))?
977            .value
978        {
979            None => None,
980            Some(v) => {
981                let s = v
982                    .as_str()
983                    .ok_or_else(|| anyhow!("cannot get str from json value"))?;
984                Some(
985                    NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| {
986                        ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into()))
987                    })?,
988                )
989            }
990        }
991    }
992}
993
994impl<'r, 'a> Produce<'r, DateTime<Utc>> for BigQuerySourceParser {
995    type Error = BigQuerySourceError;
996
997    #[throws(BigQuerySourceError)]
998    fn produce(&mut self) -> DateTime<Utc> {
999        let (mut ridx, cidx) = self.next_loc()?;
1000        if ridx
1001            == (self
1002                .response
1003                .rows
1004                .as_ref()
1005                .ok_or_else(|| anyhow!("rows is none"))?
1006                .len())
1007        {
1008            let job = self.client.job();
1009            let job_info = self
1010                .response
1011                .job_reference
1012                .as_ref()
1013                .ok_or_else(|| anyhow!("job_reference is none"))?;
1014            let params = GetQueryResultsParameters {
1015                format_options: None,
1016                location: job_info.location.clone(),
1017                max_results: None,
1018                page_token: self.response.page_token.clone(),
1019                start_index: None,
1020                timeout_ms: None,
1021            };
1022            self.response = self.rt.block_on(
1023                job.get_query_results(
1024                    job_info
1025                        .project_id
1026                        .as_ref()
1027                        .ok_or_else(|| anyhow!("project_id is none"))?
1028                        .as_str(),
1029                    job_info
1030                        .job_id
1031                        .as_ref()
1032                        .ok_or_else(|| anyhow!("job_id is none"))?
1033                        .as_str(),
1034                    params,
1035                ),
1036            )?;
1037            self.current_row = 0;
1038            ridx = 0;
1039        }
1040        let rows = self
1041            .response
1042            .rows
1043            .as_ref()
1044            .ok_or_else(|| anyhow!("rows is none"))?;
1045        let columns = rows[ridx]
1046            .columns
1047            .as_ref()
1048            .ok_or_else(|| anyhow!("columns is none"))?;
1049        let v = columns
1050            .get(cidx)
1051            .ok_or_else(|| anyhow!("Table Cell is none"))?
1052            .value
1053            .as_ref()
1054            .ok_or_else(|| anyhow!("value is none"))?;
1055        let timestamp_ns = (v
1056            .as_str()
1057            .ok_or_else(|| anyhow!("cannot get str from json value"))?
1058            .parse::<f64>()?
1059            * 1e9) as i64;
1060        let secs = timestamp_ns / 1000000000;
1061        let nsecs = (timestamp_ns % 1000000000) as u32;
1062        DateTime::from_timestamp(secs, nsecs)
1063            .unwrap_or_else(|| panic!("out of range number: {} {}", secs, nsecs))
1064    }
1065}
1066
1067impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for BigQuerySourceParser {
1068    type Error = BigQuerySourceError;
1069
1070    #[throws(BigQuerySourceError)]
1071    fn produce(&mut self) -> Option<DateTime<Utc>> {
1072        let (mut ridx, cidx) = self.next_loc()?;
1073        if ridx
1074            == (self
1075                .response
1076                .rows
1077                .as_ref()
1078                .ok_or_else(|| anyhow!("rows is none"))?
1079                .len())
1080        {
1081            let job = self.client.job();
1082            let job_info = self
1083                .response
1084                .job_reference
1085                .as_ref()
1086                .ok_or_else(|| anyhow!("job_reference is none"))?;
1087            let params = GetQueryResultsParameters {
1088                format_options: None,
1089                location: job_info.location.clone(),
1090                max_results: None,
1091                page_token: self.response.page_token.clone(),
1092                start_index: None,
1093                timeout_ms: None,
1094            };
1095            self.response = self.rt.block_on(
1096                job.get_query_results(
1097                    job_info
1098                        .project_id
1099                        .as_ref()
1100                        .ok_or_else(|| anyhow!("project_id is none"))?
1101                        .as_str(),
1102                    job_info
1103                        .job_id
1104                        .as_ref()
1105                        .ok_or_else(|| anyhow!("job_id is none"))?
1106                        .as_str(),
1107                    params,
1108                ),
1109            )?;
1110            self.current_row = 0;
1111            ridx = 0;
1112        }
1113        let rows = self
1114            .response
1115            .rows
1116            .as_ref()
1117            .ok_or_else(|| anyhow!("rows is none"))?;
1118        let columns = rows[ridx]
1119            .columns
1120            .as_ref()
1121            .ok_or_else(|| anyhow!("columns is none"))?;
1122        match &columns
1123            .get(cidx)
1124            .ok_or_else(|| anyhow!("Table Cell is none"))?
1125            .value
1126        {
1127            None => None,
1128            Some(v) => {
1129                let timestamp_ns = (v
1130                    .as_str()
1131                    .ok_or_else(|| anyhow!("cannot get str from json value"))?
1132                    .parse::<f64>()?
1133                    * 1e9) as i64;
1134                let secs = timestamp_ns / 1000000000;
1135                let nsecs = (timestamp_ns % 1000000000) as u32;
1136                DateTime::from_timestamp(secs, nsecs)
1137            }
1138        }
1139    }
1140}