1mod errors;
4mod typesystem;
5
6pub use self::errors::BigQuerySourceError;
7use crate::{
8 data_order::DataOrder,
9 errors::ConnectorXError,
10 sources::{PartitionParser, Produce, Source, SourcePartition},
11 sql::{count_query, limit1_query, CXQuery},
12};
13use anyhow::anyhow;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use fehler::{throw, throws};
16use gcp_bigquery_client::{
17 model::{
18 get_query_results_parameters::GetQueryResultsParameters,
19 get_query_results_response::GetQueryResultsResponse, query_request::QueryRequest,
20 query_response::ResultSet,
21 },
22 Client,
23};
24use sqlparser::dialect::Dialect;
25use std::sync::Arc;
26use tokio::runtime::Runtime;
27pub use typesystem::BigQueryTypeSystem;
28use url::Url;
29
30#[derive(Debug)]
31pub struct BigQueryDialect {}
32
33impl Dialect for BigQueryDialect {
34 fn is_delimited_identifier_start(&self, ch: char) -> bool {
36 ch == '`'
37 }
38
39 fn is_identifier_start(&self, ch: char) -> bool {
40 ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' || ch == '-'
41 }
42
43 fn is_identifier_part(&self, ch: char) -> bool {
44 self.is_identifier_start(ch) || ch.is_ascii_digit()
45 }
46}
47
48pub struct BigQuerySource {
49 rt: Arc<Runtime>,
50 client: Arc<Client>,
51 project_id: String,
52 origin_query: Option<String>,
53 queries: Vec<CXQuery<String>>,
54 names: Vec<String>,
55 schema: Vec<BigQueryTypeSystem>,
56}
57
58impl BigQuerySource {
59 #[throws(BigQuerySourceError)]
60 pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
61 let url = Url::parse(conn)?;
62 let sa_key_path = url.path();
63 let client = Arc::new(rt.block_on(
64 gcp_bigquery_client::Client::from_service_account_key_file(sa_key_path),
65 )?);
66 let auth_data = std::fs::read_to_string(sa_key_path)?;
67 let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
68 let project_id = auth_json
69 .get("project_id")
70 .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
71 .as_str()
72 .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?
73 .to_string();
74 Self {
75 rt,
76 client,
77 project_id,
78 origin_query: None,
79 queries: vec![],
80 names: vec![],
81 schema: vec![],
82 }
83 }
84}
85
86impl Source for BigQuerySource
87where
88 BigQuerySourcePartition:
89 SourcePartition<TypeSystem = BigQueryTypeSystem, Error = BigQuerySourceError>,
90{
91 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
92 type Partition = BigQuerySourcePartition;
93 type TypeSystem = BigQueryTypeSystem;
94 type Error = BigQuerySourceError;
95
96 #[throws(BigQuerySourceError)]
97 fn set_data_order(&mut self, data_order: DataOrder) {
98 if !matches!(data_order, DataOrder::RowMajor) {
99 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
100 }
101 }
102
103 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
104 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
105 }
106
107 fn set_origin_query(&mut self, query: Option<String>) {
108 self.origin_query = query;
109 }
110
111 #[throws(BigQuerySourceError)]
112 fn fetch_metadata(&mut self) {
113 assert!(!self.queries.is_empty());
114 let job = self.client.job();
115 for (_, query) in self.queries.iter().enumerate() {
116 let l1query = limit1_query(query, &BigQueryDialect {})?;
117 let rs = self.rt.block_on(job.query(
118 self.project_id.as_str(),
119 QueryRequest::new(l1query.as_str()),
120 ))?;
121 let (names, types) = rs
122 .schema
123 .as_ref()
124 .ok_or_else(|| anyhow!("TableSchema is none"))?
125 .fields
126 .as_ref()
127 .ok_or_else(|| anyhow!("TableFieldSchema is none"))?
128 .iter()
129 .map(|col| {
130 (
131 col.clone().name,
132 BigQueryTypeSystem::from(&col.clone().r#type),
133 )
134 })
135 .unzip();
136 self.names = names;
137 self.schema = types;
138 }
139 }
140
141 #[throws(BigQuerySourceError)]
142 fn result_rows(&mut self) -> Option<usize> {
143 match &self.origin_query {
144 Some(q) => {
145 let cxq = CXQuery::Naked(q.clone());
146 let cquery = count_query(&cxq, &BigQueryDialect {})?;
147 let job = self.client.job();
148 let mut rs = ResultSet::new_from_query_response(self.rt.block_on(
149 job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
150 )?);
151 rs.next_row();
152 let nrows = rs
153 .get_i64(0)?
154 .ok_or_else(|| anyhow!("cannot get row number"))?;
155 Some(nrows as usize)
156 }
157 None => None,
158 }
159 }
160
161 fn names(&self) -> Vec<String> {
162 self.names.clone()
163 }
164
165 fn schema(&self) -> Vec<Self::TypeSystem> {
166 self.schema.clone()
167 }
168
169 #[throws(BigQuerySourceError)]
170 fn partition(self) -> Vec<Self::Partition> {
171 let mut ret = vec![];
172 for query in self.queries {
173 ret.push(BigQuerySourcePartition::new(
174 self.rt.clone(),
175 self.client.clone(),
176 self.project_id.clone(),
177 &query,
178 &self.schema,
179 ));
180 }
181 ret
182 }
183}
184
185pub struct BigQuerySourcePartition {
186 rt: Arc<Runtime>,
187 client: Arc<Client>,
188 project_id: String,
189 query: CXQuery<String>,
190 schema: Vec<BigQueryTypeSystem>,
191 nrows: usize,
192 ncols: usize,
193}
194
195impl BigQuerySourcePartition {
196 pub fn new(
197 handle: Arc<Runtime>,
198 client: Arc<Client>,
199 project_id: String,
200 query: &CXQuery<String>,
201 schema: &[BigQueryTypeSystem],
202 ) -> Self {
203 Self {
204 rt: handle,
205 client,
206 project_id: project_id.clone(),
207 query: query.clone(),
208 schema: schema.to_vec(),
209 nrows: 0,
210 ncols: schema.len(),
211 }
212 }
213}
214
215impl SourcePartition for BigQuerySourcePartition {
216 type TypeSystem = BigQueryTypeSystem;
217 type Parser<'a> = BigQuerySourceParser;
218 type Error = BigQuerySourceError;
219
220 #[throws(BigQuerySourceError)]
221 fn result_rows(&mut self) {
222 let cquery = count_query(&self.query, &BigQueryDialect {})?;
223 let job = self.client.job();
224 let mut rs =
225 ResultSet::new_from_query_response(self.rt.block_on(
226 job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
227 )?);
228 rs.next_row();
229 let nrows = rs
230 .get_i64(0)?
231 .ok_or_else(|| anyhow!("cannot get row number"))?;
232 self.nrows = nrows as usize;
233 }
234
235 #[throws(BigQuerySourceError)]
236 fn parser(&mut self) -> Self::Parser<'_> {
237 let job = self.client.job();
238 let qry = self.rt.block_on(job.query(
239 self.project_id.as_str(),
240 QueryRequest::new(self.query.as_str()),
241 ))?;
242 let job_info = qry
243 .job_reference
244 .as_ref()
245 .ok_or_else(|| anyhow!("job_reference is none"))?;
246 let params = GetQueryResultsParameters {
247 format_options: None,
248 location: job_info.location.clone(),
249 max_results: None,
250 page_token: None,
251 start_index: None,
252 timeout_ms: None,
253 };
254 let rs = self.rt.block_on(
255 job.get_query_results(
256 self.project_id.as_str(),
257 job_info
258 .job_id
259 .as_ref()
260 .ok_or_else(|| anyhow!("job_id is none"))?
261 .as_str(),
262 params,
263 ),
264 )?;
265 BigQuerySourceParser::new(self.rt.clone(), self.client.clone(), rs, &self.schema)
266 }
267
268 fn nrows(&self) -> usize {
269 self.nrows
270 }
271
272 fn ncols(&self) -> usize {
273 self.ncols
274 }
275}
276
277pub struct BigQuerySourceParser {
278 rt: Arc<Runtime>,
279 client: Arc<Client>,
280 response: GetQueryResultsResponse,
281 ncols: usize,
282 current_col: usize,
283 current_row: usize,
284 nrows: Option<usize>,
285}
286
287impl<'a> BigQuerySourceParser {
288 fn new(
289 rt: Arc<Runtime>,
290 client: Arc<Client>,
291 response: GetQueryResultsResponse,
292 schema: &[BigQueryTypeSystem],
293 ) -> Self {
294 Self {
295 rt,
296 client,
297 response,
298 ncols: schema.len(),
299 current_row: 0,
300 current_col: 0,
301 nrows: None,
302 }
303 }
304
305 #[throws(BigQuerySourceError)]
306 fn next_loc(&mut self) -> (usize, usize) {
307 let ret = (self.current_row, self.current_col);
308 self.current_row += (self.current_col + 1) / self.ncols;
309 self.current_col = (self.current_col + 1) % self.ncols;
310 ret
311 }
312}
313
314impl<'a> PartitionParser<'a> for BigQuerySourceParser {
315 type TypeSystem = BigQueryTypeSystem;
316 type Error = BigQuerySourceError;
317
318 #[throws(BigQuerySourceError)]
319 fn fetch_next(&mut self) -> (usize, bool) {
320 assert!(self.current_col == 0);
321 match self.nrows {
322 Some(total_rows) => (total_rows - self.current_row, true),
323 None => {
324 let total_rows = self
326 .response
327 .total_rows
328 .as_ref()
329 .ok_or_else(|| anyhow!("total_rows is none"))?
330 .parse::<usize>()?;
331 self.nrows = Some(total_rows);
332 (total_rows, true)
333 }
334 }
335 }
336}
337
338macro_rules! impl_produce {
339 ($($t: ty,)+) => {
340 $(
341 impl<'r> Produce<'r, $t> for BigQuerySourceParser {
342 type Error = BigQuerySourceError;
343
344 #[throws(BigQuerySourceError)]
345 fn produce(&'r mut self) -> $t {
346 let (mut ridx, cidx) = self.next_loc()?;
347 if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
348 let job = self.client.job();
349 let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
350 let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
351 self.response = self.rt.block_on(
352 job.get_query_results(
353 job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
354 job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
355 params,
356 ),
357 )?;
358 self.current_row = 0;
359 ridx = 0;
360 }
361 let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
362 let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
363 let v = columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value.as_ref().ok_or_else(|| anyhow!("value is none"))?;
364 let s = v
365 .as_str()
366 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
367 s.parse()
368 .map_err(|_| {
369 ConnectorXError::cannot_produce::<$t>(Some(s.into()))
370 })?
371 }
372 }
373
374 impl<'r> Produce<'r, Option<$t>> for BigQuerySourceParser {
375 type Error = BigQuerySourceError;
376
377 #[throws(BigQuerySourceError)]
378 fn produce(&'r mut self) -> Option<$t> {
379 let (mut ridx, cidx) = self.next_loc()?;
380 if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
381 let job = self.client.job();
382 let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
383 let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
384 self.response = self.rt.block_on(
385 job.get_query_results(
386 job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
387 job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
388 params,
389 ),
390 )?;
391 self.current_row = 0;
392 ridx = 0;
393 }
394 let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
395 let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
396 match &columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value {
397 None => None,
398 Some(v) => {
399 let s = v.as_str().ok_or_else(|| anyhow!("cannot get str from json value"))?;
400 Some(s.parse().map_err(|_| {
401 ConnectorXError::cannot_produce::<$t>(Some(s.into()))
402 })?)},
403 }
404 }
405 }
406 )+
407 };
408}
409
410impl_produce!(i64, f64, String,);
411
412impl<'r, 'a> Produce<'r, bool> for BigQuerySourceParser {
413 type Error = BigQuerySourceError;
414
415 #[throws(BigQuerySourceError)]
416 fn produce(&mut self) -> bool {
417 let (mut ridx, cidx) = self.next_loc()?;
418 if ridx
419 == (self
420 .response
421 .rows
422 .as_ref()
423 .ok_or_else(|| anyhow!("rows is none"))?
424 .len())
425 {
426 let job = self.client.job();
427 let job_info = self
428 .response
429 .job_reference
430 .as_ref()
431 .ok_or_else(|| anyhow!("job_reference is none"))?;
432 let params = GetQueryResultsParameters {
433 format_options: None,
434 location: job_info.location.clone(),
435 max_results: None,
436 page_token: self.response.page_token.clone(),
437 start_index: None,
438 timeout_ms: None,
439 };
440 self.response = self.rt.block_on(
441 job.get_query_results(
442 job_info
443 .project_id
444 .as_ref()
445 .ok_or_else(|| anyhow!("project_id is none"))?
446 .as_str(),
447 job_info
448 .job_id
449 .as_ref()
450 .ok_or_else(|| anyhow!("job_id is none"))?
451 .as_str(),
452 params,
453 ),
454 )?;
455 self.current_row = 0;
456 ridx = 0;
457 }
458 let rows = self
459 .response
460 .rows
461 .as_ref()
462 .ok_or_else(|| anyhow!("rows is none"))?;
463 let columns = rows[ridx]
464 .columns
465 .as_ref()
466 .ok_or_else(|| anyhow!("columns is none"))?;
467 let v = columns
468 .get(cidx)
469 .ok_or_else(|| anyhow!("Table Cell is none"))?
470 .value
471 .as_ref()
472 .ok_or_else(|| anyhow!("value is none"))?;
473 let s = v
474 .as_str()
475 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
476
477 let ret = match s {
478 "true" => true,
479 "false" => false,
480 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
481 };
482 ret
483 }
484}
485
486impl<'r, 'a> Produce<'r, Option<bool>> for BigQuerySourceParser {
487 type Error = BigQuerySourceError;
488
489 #[throws(BigQuerySourceError)]
490 fn produce(&mut self) -> Option<bool> {
491 let (mut ridx, cidx) = self.next_loc()?;
492 if ridx
493 == (self
494 .response
495 .rows
496 .as_ref()
497 .ok_or_else(|| anyhow!("rows is none"))?
498 .len())
499 {
500 let job = self.client.job();
501 let job_info = self
502 .response
503 .job_reference
504 .as_ref()
505 .ok_or_else(|| anyhow!("job_reference is none"))?;
506 let params = GetQueryResultsParameters {
507 format_options: None,
508 location: job_info.location.clone(),
509 max_results: None,
510 page_token: self.response.page_token.clone(),
511 start_index: None,
512 timeout_ms: None,
513 };
514 self.response = self.rt.block_on(
515 job.get_query_results(
516 job_info
517 .project_id
518 .as_ref()
519 .ok_or_else(|| anyhow!("project_id is none"))?
520 .as_str(),
521 job_info
522 .job_id
523 .as_ref()
524 .ok_or_else(|| anyhow!("job_id is none"))?
525 .as_str(),
526 params,
527 ),
528 )?;
529 self.current_row = 0;
530 ridx = 0;
531 }
532 let rows = self
533 .response
534 .rows
535 .as_ref()
536 .ok_or_else(|| anyhow!("rows is none"))?;
537 let columns = rows[ridx]
538 .columns
539 .as_ref()
540 .ok_or_else(|| anyhow!("columns is none"))?;
541 let ret = match &columns
542 .get(cidx)
543 .ok_or_else(|| anyhow!("Table Cell is none"))?
544 .value
545 {
546 None => None,
547 Some(v) => {
548 let s = v
549 .as_str()
550 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
551 match s {
552 "true" => Some(true),
553 "false" => Some(false),
554 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
555 }
556 }
557 };
558 ret
559 }
560}
561
562impl<'r, 'a> Produce<'r, NaiveDate> for BigQuerySourceParser {
563 type Error = BigQuerySourceError;
564
565 #[throws(BigQuerySourceError)]
566 fn produce(&mut self) -> NaiveDate {
567 let (mut ridx, cidx) = self.next_loc()?;
568 if ridx
569 == (self
570 .response
571 .rows
572 .as_ref()
573 .ok_or_else(|| anyhow!("rows is none"))?
574 .len())
575 {
576 let job = self.client.job();
577 let job_info = self
578 .response
579 .job_reference
580 .as_ref()
581 .ok_or_else(|| anyhow!("job_reference is none"))?;
582 let params = GetQueryResultsParameters {
583 format_options: None,
584 location: job_info.location.clone(),
585 max_results: None,
586 page_token: self.response.page_token.clone(),
587 start_index: None,
588 timeout_ms: None,
589 };
590 self.response = self.rt.block_on(
591 job.get_query_results(
592 job_info
593 .project_id
594 .as_ref()
595 .ok_or_else(|| anyhow!("project_id is none"))?
596 .as_str(),
597 job_info
598 .job_id
599 .as_ref()
600 .ok_or_else(|| anyhow!("job_id is none"))?
601 .as_str(),
602 params,
603 ),
604 )?;
605 self.current_row = 0;
606 ridx = 0;
607 }
608 let rows = self
609 .response
610 .rows
611 .as_ref()
612 .ok_or_else(|| anyhow!("rows is none"))?;
613 let columns = rows[ridx]
614 .columns
615 .as_ref()
616 .ok_or_else(|| anyhow!("columns is none"))?;
617 let v = columns
618 .get(cidx)
619 .ok_or_else(|| anyhow!("Table Cell is none"))?
620 .value
621 .as_ref()
622 .ok_or_else(|| anyhow!("value is none"))?;
623 let s = v
624 .as_str()
625 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
626 NaiveDate::parse_from_str(s, "%Y-%m-%d")
627 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into())))?
628 }
629}
630
631impl<'r, 'a> Produce<'r, Option<NaiveDate>> for BigQuerySourceParser {
632 type Error = BigQuerySourceError;
633
634 #[throws(BigQuerySourceError)]
635 fn produce(&mut self) -> Option<NaiveDate> {
636 let (mut ridx, cidx) = self.next_loc()?;
637 if ridx
638 == (self
639 .response
640 .rows
641 .as_ref()
642 .ok_or_else(|| anyhow!("rows is none"))?
643 .len())
644 {
645 let job = self.client.job();
646 let job_info = self
647 .response
648 .job_reference
649 .as_ref()
650 .ok_or_else(|| anyhow!("job_reference is none"))?;
651 let params = GetQueryResultsParameters {
652 format_options: None,
653 location: job_info.location.clone(),
654 max_results: None,
655 page_token: self.response.page_token.clone(),
656 start_index: None,
657 timeout_ms: None,
658 };
659 self.response = self.rt.block_on(
660 job.get_query_results(
661 job_info
662 .project_id
663 .as_ref()
664 .ok_or_else(|| anyhow!("project_id is none"))?
665 .as_str(),
666 job_info
667 .job_id
668 .as_ref()
669 .ok_or_else(|| anyhow!("job_id is none"))?
670 .as_str(),
671 params,
672 ),
673 )?;
674 self.current_row = 0;
675 ridx = 0;
676 }
677 let rows = self
678 .response
679 .rows
680 .as_ref()
681 .ok_or_else(|| anyhow!("rows is none"))?;
682 let columns = rows[ridx]
683 .columns
684 .as_ref()
685 .ok_or_else(|| anyhow!("columns is none"))?;
686 match &columns
687 .get(cidx)
688 .ok_or_else(|| anyhow!("Table Cell is none"))?
689 .value
690 {
691 None => None,
692 Some(v) => {
693 let s = v
694 .as_str()
695 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
696 Some(
697 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
698 ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
699 })?,
700 )
701 }
702 }
703 }
704}
705
706impl<'r, 'a> Produce<'r, NaiveDateTime> for BigQuerySourceParser {
707 type Error = BigQuerySourceError;
708
709 #[throws(BigQuerySourceError)]
710 fn produce(&mut self) -> NaiveDateTime {
711 let (mut ridx, cidx) = self.next_loc()?;
712 if ridx
713 == (self
714 .response
715 .rows
716 .as_ref()
717 .ok_or_else(|| anyhow!("rows is none"))?
718 .len())
719 {
720 let job = self.client.job();
721 let job_info = self
722 .response
723 .job_reference
724 .as_ref()
725 .ok_or_else(|| anyhow!("job_reference is none"))?;
726 let params = GetQueryResultsParameters {
727 format_options: None,
728 location: job_info.location.clone(),
729 max_results: None,
730 page_token: self.response.page_token.clone(),
731 start_index: None,
732 timeout_ms: None,
733 };
734 self.response = self.rt.block_on(
735 job.get_query_results(
736 job_info
737 .project_id
738 .as_ref()
739 .ok_or_else(|| anyhow!("project_id is none"))?
740 .as_str(),
741 job_info
742 .job_id
743 .as_ref()
744 .ok_or_else(|| anyhow!("job_id is none"))?
745 .as_str(),
746 params,
747 ),
748 )?;
749 self.current_row = 0;
750 ridx = 0;
751 }
752 let rows = self
753 .response
754 .rows
755 .as_ref()
756 .ok_or_else(|| anyhow!("rows is none"))?;
757 let columns = rows[ridx]
758 .columns
759 .as_ref()
760 .ok_or_else(|| anyhow!("columns is none"))?;
761 let v = columns
762 .get(cidx)
763 .ok_or_else(|| anyhow!("Table Cell is none"))?
764 .value
765 .as_ref()
766 .ok_or_else(|| anyhow!("value is none"))?;
767 let s = v
768 .as_str()
769 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
770 NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
771 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())))?
772 }
773}
774
775impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for BigQuerySourceParser {
776 type Error = BigQuerySourceError;
777
778 #[throws(BigQuerySourceError)]
779 fn produce(&mut self) -> Option<NaiveDateTime> {
780 let (mut ridx, cidx) = self.next_loc()?;
781 if ridx
782 == (self
783 .response
784 .rows
785 .as_ref()
786 .ok_or_else(|| anyhow!("rows is none"))?
787 .len())
788 {
789 let job = self.client.job();
790 let job_info = self
791 .response
792 .job_reference
793 .as_ref()
794 .ok_or_else(|| anyhow!("job_reference is none"))?;
795 let params = GetQueryResultsParameters {
796 format_options: None,
797 location: job_info.location.clone(),
798 max_results: None,
799 page_token: self.response.page_token.clone(),
800 start_index: None,
801 timeout_ms: None,
802 };
803 self.response = self.rt.block_on(
804 job.get_query_results(
805 job_info
806 .project_id
807 .as_ref()
808 .ok_or_else(|| anyhow!("project_id is none"))?
809 .as_str(),
810 job_info
811 .job_id
812 .as_ref()
813 .ok_or_else(|| anyhow!("job_id is none"))?
814 .as_str(),
815 params,
816 ),
817 )?;
818 self.current_row = 0;
819 ridx = 0;
820 }
821 let rows = self
822 .response
823 .rows
824 .as_ref()
825 .ok_or_else(|| anyhow!("rows is none"))?;
826 let columns = rows[ridx]
827 .columns
828 .as_ref()
829 .ok_or_else(|| anyhow!("columns is none"))?;
830 match &columns
831 .get(cidx)
832 .ok_or_else(|| anyhow!("Table Cell is none"))?
833 .value
834 {
835 None => None,
836 Some(v) => {
837 let s = v
838 .as_str()
839 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
840 Some(
841 NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").map_err(|_| {
842 ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into()))
843 })?,
844 )
845 }
846 }
847 }
848}
849
850impl<'r, 'a> Produce<'r, NaiveTime> for BigQuerySourceParser {
851 type Error = BigQuerySourceError;
852
853 #[throws(BigQuerySourceError)]
854 fn produce(&mut self) -> NaiveTime {
855 let (mut ridx, cidx) = self.next_loc()?;
856 if ridx
857 == (self
858 .response
859 .rows
860 .as_ref()
861 .ok_or_else(|| anyhow!("rows is none"))?
862 .len())
863 {
864 let job = self.client.job();
865 let job_info = self
866 .response
867 .job_reference
868 .as_ref()
869 .ok_or_else(|| anyhow!("job_reference is none"))?;
870 let params = GetQueryResultsParameters {
871 format_options: None,
872 location: job_info.location.clone(),
873 max_results: None,
874 page_token: self.response.page_token.clone(),
875 start_index: None,
876 timeout_ms: None,
877 };
878 self.response = self.rt.block_on(
879 job.get_query_results(
880 job_info
881 .project_id
882 .as_ref()
883 .ok_or_else(|| anyhow!("project_id is none"))?
884 .as_str(),
885 job_info
886 .job_id
887 .as_ref()
888 .ok_or_else(|| anyhow!("job_id is none"))?
889 .as_str(),
890 params,
891 ),
892 )?;
893 self.current_row = 0;
894 ridx = 0;
895 }
896 let rows = self
897 .response
898 .rows
899 .as_ref()
900 .ok_or_else(|| anyhow!("rows is none"))?;
901 let columns = rows[ridx]
902 .columns
903 .as_ref()
904 .ok_or_else(|| anyhow!("columns is none"))?;
905 let v = columns
906 .get(cidx)
907 .ok_or_else(|| anyhow!("Table Cell is none"))?
908 .value
909 .as_ref()
910 .ok_or_else(|| anyhow!("value is none"))?;
911 let s = v
912 .as_str()
913 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
914 NaiveTime::parse_from_str(s, "%H:%M:%S")
915 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?
916 }
917}
918
919impl<'r, 'a> Produce<'r, Option<NaiveTime>> for BigQuerySourceParser {
920 type Error = BigQuerySourceError;
921
922 #[throws(BigQuerySourceError)]
923 fn produce(&mut self) -> Option<NaiveTime> {
924 let (mut ridx, cidx) = self.next_loc()?;
925 if ridx
926 == (self
927 .response
928 .rows
929 .as_ref()
930 .ok_or_else(|| anyhow!("rows is none"))?
931 .len())
932 {
933 let job = self.client.job();
934 let job_info = self
935 .response
936 .job_reference
937 .as_ref()
938 .ok_or_else(|| anyhow!("job_reference is none"))?;
939 let params = GetQueryResultsParameters {
940 format_options: None,
941 location: job_info.location.clone(),
942 max_results: None,
943 page_token: self.response.page_token.clone(),
944 start_index: None,
945 timeout_ms: None,
946 };
947 self.response = self.rt.block_on(
948 job.get_query_results(
949 job_info
950 .project_id
951 .as_ref()
952 .ok_or_else(|| anyhow!("project_id is none"))?
953 .as_str(),
954 job_info
955 .job_id
956 .as_ref()
957 .ok_or_else(|| anyhow!("job_id is none"))?
958 .as_str(),
959 params,
960 ),
961 )?;
962 self.current_row = 0;
963 ridx = 0;
964 }
965 let rows = self
966 .response
967 .rows
968 .as_ref()
969 .ok_or_else(|| anyhow!("rows is none"))?;
970 let columns = rows[ridx]
971 .columns
972 .as_ref()
973 .ok_or_else(|| anyhow!("columns is none"))?;
974 match &columns
975 .get(cidx)
976 .ok_or_else(|| anyhow!("Table Cell is none"))?
977 .value
978 {
979 None => None,
980 Some(v) => {
981 let s = v
982 .as_str()
983 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
984 Some(
985 NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| {
986 ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into()))
987 })?,
988 )
989 }
990 }
991 }
992}
993
994impl<'r, 'a> Produce<'r, DateTime<Utc>> for BigQuerySourceParser {
995 type Error = BigQuerySourceError;
996
997 #[throws(BigQuerySourceError)]
998 fn produce(&mut self) -> DateTime<Utc> {
999 let (mut ridx, cidx) = self.next_loc()?;
1000 if ridx
1001 == (self
1002 .response
1003 .rows
1004 .as_ref()
1005 .ok_or_else(|| anyhow!("rows is none"))?
1006 .len())
1007 {
1008 let job = self.client.job();
1009 let job_info = self
1010 .response
1011 .job_reference
1012 .as_ref()
1013 .ok_or_else(|| anyhow!("job_reference is none"))?;
1014 let params = GetQueryResultsParameters {
1015 format_options: None,
1016 location: job_info.location.clone(),
1017 max_results: None,
1018 page_token: self.response.page_token.clone(),
1019 start_index: None,
1020 timeout_ms: None,
1021 };
1022 self.response = self.rt.block_on(
1023 job.get_query_results(
1024 job_info
1025 .project_id
1026 .as_ref()
1027 .ok_or_else(|| anyhow!("project_id is none"))?
1028 .as_str(),
1029 job_info
1030 .job_id
1031 .as_ref()
1032 .ok_or_else(|| anyhow!("job_id is none"))?
1033 .as_str(),
1034 params,
1035 ),
1036 )?;
1037 self.current_row = 0;
1038 ridx = 0;
1039 }
1040 let rows = self
1041 .response
1042 .rows
1043 .as_ref()
1044 .ok_or_else(|| anyhow!("rows is none"))?;
1045 let columns = rows[ridx]
1046 .columns
1047 .as_ref()
1048 .ok_or_else(|| anyhow!("columns is none"))?;
1049 let v = columns
1050 .get(cidx)
1051 .ok_or_else(|| anyhow!("Table Cell is none"))?
1052 .value
1053 .as_ref()
1054 .ok_or_else(|| anyhow!("value is none"))?;
1055 let timestamp_ns = (v
1056 .as_str()
1057 .ok_or_else(|| anyhow!("cannot get str from json value"))?
1058 .parse::<f64>()?
1059 * 1e9) as i64;
1060 let secs = timestamp_ns / 1000000000;
1061 let nsecs = (timestamp_ns % 1000000000) as u32;
1062 DateTime::from_timestamp(secs, nsecs)
1063 .unwrap_or_else(|| panic!("out of range number: {} {}", secs, nsecs))
1064 }
1065}
1066
1067impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for BigQuerySourceParser {
1068 type Error = BigQuerySourceError;
1069
1070 #[throws(BigQuerySourceError)]
1071 fn produce(&mut self) -> Option<DateTime<Utc>> {
1072 let (mut ridx, cidx) = self.next_loc()?;
1073 if ridx
1074 == (self
1075 .response
1076 .rows
1077 .as_ref()
1078 .ok_or_else(|| anyhow!("rows is none"))?
1079 .len())
1080 {
1081 let job = self.client.job();
1082 let job_info = self
1083 .response
1084 .job_reference
1085 .as_ref()
1086 .ok_or_else(|| anyhow!("job_reference is none"))?;
1087 let params = GetQueryResultsParameters {
1088 format_options: None,
1089 location: job_info.location.clone(),
1090 max_results: None,
1091 page_token: self.response.page_token.clone(),
1092 start_index: None,
1093 timeout_ms: None,
1094 };
1095 self.response = self.rt.block_on(
1096 job.get_query_results(
1097 job_info
1098 .project_id
1099 .as_ref()
1100 .ok_or_else(|| anyhow!("project_id is none"))?
1101 .as_str(),
1102 job_info
1103 .job_id
1104 .as_ref()
1105 .ok_or_else(|| anyhow!("job_id is none"))?
1106 .as_str(),
1107 params,
1108 ),
1109 )?;
1110 self.current_row = 0;
1111 ridx = 0;
1112 }
1113 let rows = self
1114 .response
1115 .rows
1116 .as_ref()
1117 .ok_or_else(|| anyhow!("rows is none"))?;
1118 let columns = rows[ridx]
1119 .columns
1120 .as_ref()
1121 .ok_or_else(|| anyhow!("columns is none"))?;
1122 match &columns
1123 .get(cidx)
1124 .ok_or_else(|| anyhow!("Table Cell is none"))?
1125 .value
1126 {
1127 None => None,
1128 Some(v) => {
1129 let timestamp_ns = (v
1130 .as_str()
1131 .ok_or_else(|| anyhow!("cannot get str from json value"))?
1132 .parse::<f64>()?
1133 * 1e9) as i64;
1134 let secs = timestamp_ns / 1000000000;
1135 let nsecs = (timestamp_ns % 1000000000) as u32;
1136 DateTime::from_timestamp(secs, nsecs)
1137 }
1138 }
1139 }
1140}