1mod errors;
4mod typesystem;
5
6pub use self::errors::BigQuerySourceError;
7use crate::{
8 data_order::DataOrder,
9 errors::ConnectorXError,
10 sources::{PartitionParser, Produce, Source, SourcePartition},
11 sql::{count_query, limit1_query, CXQuery},
12};
13use anyhow::anyhow;
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
15use fehler::{throw, throws};
16use gcp_bigquery_client::{
17 model::{
18 get_query_results_parameters::GetQueryResultsParameters,
19 get_query_results_response::GetQueryResultsResponse, query_request::QueryRequest,
20 query_response::ResultSet,
21 },
22 Client,
23};
24use sqlparser::dialect::Dialect;
25use std::sync::Arc;
26use tokio::runtime::Runtime;
27pub use typesystem::BigQueryTypeSystem;
28use url::Url;
29
30#[derive(Debug)]
31pub struct BigQueryDialect {}
32
33impl Dialect for BigQueryDialect {
34 fn is_delimited_identifier_start(&self, ch: char) -> bool {
36 ch == '`'
37 }
38
39 fn is_identifier_start(&self, ch: char) -> bool {
40 ch.is_ascii_lowercase() || ch.is_ascii_uppercase() || ch == '_' || ch == '-'
41 }
42
43 fn is_identifier_part(&self, ch: char) -> bool {
44 self.is_identifier_start(ch) || ch.is_ascii_digit()
45 }
46}
47
48pub struct BigQuerySource {
49 rt: Arc<Runtime>,
50 client: Arc<Client>,
51 project_id: String,
52 origin_query: Option<String>,
53 queries: Vec<CXQuery<String>>,
54 names: Vec<String>,
55 schema: Vec<BigQueryTypeSystem>,
56}
57
58impl BigQuerySource {
59 #[throws(BigQuerySourceError)]
60 pub fn new(rt: Arc<Runtime>, conn: &str) -> Self {
61 let url = Url::parse(conn)?;
62 let sa_key_path = url.path();
63 let client = Arc::new(rt.block_on(
64 gcp_bigquery_client::Client::from_service_account_key_file(sa_key_path),
65 )?);
66 let auth_data = std::fs::read_to_string(sa_key_path)?;
67 let auth_json: serde_json::Value = serde_json::from_str(&auth_data)?;
68 let project_id = auth_json
69 .get("project_id")
70 .ok_or_else(|| anyhow!("Cannot get project_id from auth file"))?
71 .as_str()
72 .ok_or_else(|| anyhow!("Cannot get project_id as string from auth file"))?
73 .to_string();
74 Self {
75 rt,
76 client,
77 project_id,
78 origin_query: None,
79 queries: vec![],
80 names: vec![],
81 schema: vec![],
82 }
83 }
84}
85
86impl Source for BigQuerySource
87where
88 BigQuerySourcePartition:
89 SourcePartition<TypeSystem = BigQueryTypeSystem, Error = BigQuerySourceError>,
90{
91 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
92 type Partition = BigQuerySourcePartition;
93 type TypeSystem = BigQueryTypeSystem;
94 type Error = BigQuerySourceError;
95
96 #[throws(BigQuerySourceError)]
97 fn set_data_order(&mut self, data_order: DataOrder) {
98 if !matches!(data_order, DataOrder::RowMajor) {
99 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
100 }
101 }
102
103 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
104 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
105 }
106
107 fn set_origin_query(&mut self, query: Option<String>) {
108 self.origin_query = query;
109 }
110
111 #[throws(BigQuerySourceError)]
112 fn fetch_metadata(&mut self) {
113 assert!(!self.queries.is_empty());
114 let job = self.client.job();
115 for (_, query) in self.queries.iter().enumerate() {
116 let l1query = limit1_query(query, &BigQueryDialect {})?;
117 let rs = self.rt.block_on(job.query(
118 self.project_id.as_str(),
119 QueryRequest::new(l1query.as_str()),
120 ))?;
121 let (names, types) = rs
122 .schema
123 .as_ref()
124 .ok_or_else(|| anyhow!("TableSchema is none"))?
125 .fields
126 .as_ref()
127 .ok_or_else(|| anyhow!("TableFieldSchema is none"))?
128 .iter()
129 .map(|col| {
130 (
131 col.clone().name,
132 BigQueryTypeSystem::from(&col.clone().r#type),
133 )
134 })
135 .unzip();
136 self.names = names;
137 self.schema = types;
138 }
139 }
140
141 #[throws(BigQuerySourceError)]
142 fn result_rows(&mut self) -> Option<usize> {
143 match &self.origin_query {
144 Some(q) => {
145 let cxq = CXQuery::Naked(q.clone());
146 let cquery = count_query(&cxq, &BigQueryDialect {})?;
147 let job = self.client.job();
148 let mut rs = ResultSet::new_from_query_response(self.rt.block_on(
149 job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
150 )?);
151 rs.next_row();
152 let nrows = rs
153 .get_i64(0)?
154 .ok_or_else(|| anyhow!("cannot get row number"))?;
155 Some(nrows as usize)
156 }
157 None => None,
158 }
159 }
160
161 fn names(&self) -> Vec<String> {
162 self.names.clone()
163 }
164
165 fn schema(&self) -> Vec<Self::TypeSystem> {
166 self.schema.clone()
167 }
168
169 #[throws(BigQuerySourceError)]
170 fn partition(self) -> Vec<Self::Partition> {
171 let mut ret = vec![];
172 for query in self.queries {
173 ret.push(BigQuerySourcePartition::new(
174 self.rt.clone(),
175 self.client.clone(),
176 self.project_id.clone(),
177 &query,
178 &self.schema,
179 ));
180 }
181 ret
182 }
183}
184
185pub struct BigQuerySourcePartition {
186 rt: Arc<Runtime>,
187 client: Arc<Client>,
188 project_id: String,
189 query: CXQuery<String>,
190 schema: Vec<BigQueryTypeSystem>,
191 nrows: usize,
192 ncols: usize,
193}
194
195impl BigQuerySourcePartition {
196 pub fn new(
197 handle: Arc<Runtime>,
198 client: Arc<Client>,
199 project_id: String,
200 query: &CXQuery<String>,
201 schema: &[BigQueryTypeSystem],
202 ) -> Self {
203 Self {
204 rt: handle,
205 client,
206 project_id: project_id.clone(),
207 query: query.clone(),
208 schema: schema.to_vec(),
209 nrows: 0,
210 ncols: schema.len(),
211 }
212 }
213}
214
215impl SourcePartition for BigQuerySourcePartition {
216 type TypeSystem = BigQueryTypeSystem;
217 type Parser<'a> = BigQuerySourceParser;
218 type Error = BigQuerySourceError;
219
220 #[throws(BigQuerySourceError)]
221 fn result_rows(&mut self) {
222 let cquery = count_query(&self.query, &BigQueryDialect {})?;
223 let job = self.client.job();
224 let mut rs =
225 ResultSet::new_from_query_response(self.rt.block_on(
226 job.query(self.project_id.as_str(), QueryRequest::new(cquery.as_str())),
227 )?);
228 rs.next_row();
229 let nrows = rs
230 .get_i64(0)?
231 .ok_or_else(|| anyhow!("cannot get row number"))?;
232 self.nrows = nrows as usize;
233 }
234
235 #[throws(BigQuerySourceError)]
236 fn parser(&mut self) -> Self::Parser<'_> {
237 let job = self.client.job();
238 let qry = self.rt.block_on(job.query(
239 self.project_id.as_str(),
240 QueryRequest::new(self.query.as_str()),
241 ))?;
242 let job_info = qry
243 .job_reference
244 .as_ref()
245 .ok_or_else(|| anyhow!("job_reference is none"))?;
246 let params = GetQueryResultsParameters {
247 format_options: None,
248 location: job_info.location.clone(),
249 max_results: None,
250 page_token: None,
251 start_index: None,
252 timeout_ms: None,
253 };
254 let rs = self.rt.block_on(
255 job.get_query_results(
256 self.project_id.as_str(),
257 job_info
258 .job_id
259 .as_ref()
260 .ok_or_else(|| anyhow!("job_id is none"))?
261 .as_str(),
262 params,
263 ),
264 )?;
265 BigQuerySourceParser::new(self.rt.clone(), self.client.clone(), rs, &self.schema)
266 }
267
268 fn nrows(&self) -> usize {
269 self.nrows
270 }
271
272 fn ncols(&self) -> usize {
273 self.ncols
274 }
275}
276
277pub struct BigQuerySourceParser {
278 rt: Arc<Runtime>,
279 client: Arc<Client>,
280 response: GetQueryResultsResponse,
281 ncols: usize,
282 current_col: usize,
283 current_row: usize,
284 nrows: Option<usize>,
285}
286
287impl<'a> BigQuerySourceParser {
288 fn new(
289 rt: Arc<Runtime>,
290 client: Arc<Client>,
291 response: GetQueryResultsResponse,
292 schema: &[BigQueryTypeSystem],
293 ) -> Self {
294 Self {
295 rt,
296 client,
297 response,
298 ncols: schema.len(),
299 current_row: 0,
300 current_col: 0,
301 nrows: None,
302 }
303 }
304
305 #[throws(BigQuerySourceError)]
306 fn next_loc(&mut self) -> (usize, usize) {
307 let ret = (self.current_row, self.current_col);
308 self.current_row += (self.current_col + 1) / self.ncols;
309 self.current_col = (self.current_col + 1) % self.ncols;
310 ret
311 }
312}
313
314impl<'a> PartitionParser<'a> for BigQuerySourceParser {
315 type TypeSystem = BigQueryTypeSystem;
316 type Error = BigQuerySourceError;
317
318 #[throws(BigQuerySourceError)]
319 fn fetch_next(&mut self) -> (usize, bool) {
320 assert!(self.current_col == 0);
321 match self.nrows {
322 Some(total_rows) => (total_rows - self.current_row, true),
323 None => {
324 let total_rows = self
326 .response
327 .total_rows
328 .as_ref()
329 .ok_or_else(|| anyhow!("total_rows is none"))?
330 .parse::<usize>()?;
331 self.nrows = Some(total_rows);
332 (total_rows, true)
333 }
334 }
335 }
336}
337
338macro_rules! impl_produce {
339 ($($t: ty,)+) => {
340 $(
341 impl<'r> Produce<'r, $t> for BigQuerySourceParser {
342 type Error = BigQuerySourceError;
343
344 #[throws(BigQuerySourceError)]
345 fn produce(&'r mut self) -> $t {
346 let (mut ridx, cidx) = self.next_loc()?;
347 if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
348 let job = self.client.job();
349 let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
350 let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
351 self.response = self.rt.block_on(
352 job.get_query_results(
353 job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
354 job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
355 params,
356 ),
357 )?;
358 self.current_row = 0;
359 ridx = 0;
360 }
361 let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
362 let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
363 let v = columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value.as_ref().ok_or_else(|| anyhow!("value is none"))?;
364 let s = v
365 .as_str()
366 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
367 s.parse()
368 .map_err(|_| {
369 ConnectorXError::cannot_produce::<$t>(Some(s.into()))
370 })?
371 }
372 }
373
374 impl<'r> Produce<'r, Option<$t>> for BigQuerySourceParser {
375 type Error = BigQuerySourceError;
376
377 #[throws(BigQuerySourceError)]
378 fn produce(&'r mut self) -> Option<$t> {
379 let (mut ridx, cidx) = self.next_loc()?;
380 if ridx == (self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?.len()) {
381 let job = self.client.job();
382 let job_info = self.response.job_reference.as_ref().ok_or_else(|| anyhow!("job_reference is none"))?;
383 let params = GetQueryResultsParameters { format_options: None, location: job_info.location.clone(), max_results: None, page_token: self.response.page_token.clone(), start_index: None, timeout_ms: None };
384 self.response = self.rt.block_on(
385 job.get_query_results(
386 job_info.project_id.as_ref().ok_or_else(|| anyhow!("project_id is none"))?.as_str(),
387 job_info.job_id.as_ref().ok_or_else(|| anyhow!("job_id is none"))?.as_str(),
388 params,
389 ),
390 )?;
391 self.current_row = 0;
392 ridx = 0;
393 }
394 let rows = self.response.rows.as_ref().ok_or_else(|| anyhow!("rows is none"))?;
395 let columns = rows[ridx].columns.as_ref().ok_or_else(|| anyhow!("columns is none"))?;
396 match &columns.get(cidx).ok_or_else(|| anyhow!("Table Cell is none"))?.value {
397 None => None,
398 Some(v) => {
399 let s = v.as_str().ok_or_else(|| anyhow!("cannot get str from json value"))?;
400 Some(s.parse().map_err(|_| {
401 ConnectorXError::cannot_produce::<$t>(Some(s.into()))
402 })?)},
403 }
404 }
405 }
406 )+
407 };
408}
409
410impl_produce!(i64, f64, String,);
411
412impl<'r, 'a> Produce<'r, bool> for BigQuerySourceParser {
413 type Error = BigQuerySourceError;
414
415 #[throws(BigQuerySourceError)]
416 fn produce(&mut self) -> bool {
417 let (mut ridx, cidx) = self.next_loc()?;
418 if ridx
419 == (self
420 .response
421 .rows
422 .as_ref()
423 .ok_or_else(|| anyhow!("rows is none"))?
424 .len())
425 {
426 let job = self.client.job();
427 let job_info = self
428 .response
429 .job_reference
430 .as_ref()
431 .ok_or_else(|| anyhow!("job_reference is none"))?;
432 let params = GetQueryResultsParameters {
433 format_options: None,
434 location: job_info.location.clone(),
435 max_results: None,
436 page_token: self.response.page_token.clone(),
437 start_index: None,
438 timeout_ms: None,
439 };
440 self.response = self.rt.block_on(
441 job.get_query_results(
442 job_info
443 .project_id
444 .as_ref()
445 .ok_or_else(|| anyhow!("project_id is none"))?
446 .as_str(),
447 job_info
448 .job_id
449 .as_ref()
450 .ok_or_else(|| anyhow!("job_id is none"))?
451 .as_str(),
452 params,
453 ),
454 )?;
455 self.current_row = 0;
456 ridx = 0;
457 }
458 let rows = self
459 .response
460 .rows
461 .as_ref()
462 .ok_or_else(|| anyhow!("rows is none"))?;
463 let columns = rows[ridx]
464 .columns
465 .as_ref()
466 .ok_or_else(|| anyhow!("columns is none"))?;
467 let v = columns
468 .get(cidx)
469 .ok_or_else(|| anyhow!("Table Cell is none"))?
470 .value
471 .as_ref()
472 .ok_or_else(|| anyhow!("value is none"))?;
473 let s = v
474 .as_str()
475 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
476
477 let ret = match s {
478 "true" => true,
479 "false" => false,
480 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
481 };
482 ret
483 }
484}
485
486impl<'r, 'a> Produce<'r, Option<bool>> for BigQuerySourceParser {
487 type Error = BigQuerySourceError;
488
489 #[throws(BigQuerySourceError)]
490 fn produce(&mut self) -> Option<bool> {
491 let (mut ridx, cidx) = self.next_loc()?;
492 if ridx
493 == (self
494 .response
495 .rows
496 .as_ref()
497 .ok_or_else(|| anyhow!("rows is none"))?
498 .len())
499 {
500 let job = self.client.job();
501 let job_info = self
502 .response
503 .job_reference
504 .as_ref()
505 .ok_or_else(|| anyhow!("job_reference is none"))?;
506 let params = GetQueryResultsParameters {
507 format_options: None,
508 location: job_info.location.clone(),
509 max_results: None,
510 page_token: self.response.page_token.clone(),
511 start_index: None,
512 timeout_ms: None,
513 };
514 self.response = self.rt.block_on(
515 job.get_query_results(
516 job_info
517 .project_id
518 .as_ref()
519 .ok_or_else(|| anyhow!("project_id is none"))?
520 .as_str(),
521 job_info
522 .job_id
523 .as_ref()
524 .ok_or_else(|| anyhow!("job_id is none"))?
525 .as_str(),
526 params,
527 ),
528 )?;
529 self.current_row = 0;
530 ridx = 0;
531 }
532 let rows = self
533 .response
534 .rows
535 .as_ref()
536 .ok_or_else(|| anyhow!("rows is none"))?;
537 let columns = rows[ridx]
538 .columns
539 .as_ref()
540 .ok_or_else(|| anyhow!("columns is none"))?;
541 let ret = match &columns
542 .get(cidx)
543 .ok_or_else(|| anyhow!("Table Cell is none"))?
544 .value
545 {
546 None => None,
547 Some(v) => {
548 let s = v
549 .as_str()
550 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
551 match s {
552 "true" => Some(true),
553 "false" => Some(false),
554 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
555 }
556 }
557 };
558 ret
559 }
560}
561
562impl<'r, 'a> Produce<'r, NaiveDate> for BigQuerySourceParser {
563 type Error = BigQuerySourceError;
564
565 #[throws(BigQuerySourceError)]
566 fn produce(&mut self) -> NaiveDate {
567 let (mut ridx, cidx) = self.next_loc()?;
568 if ridx
569 == (self
570 .response
571 .rows
572 .as_ref()
573 .ok_or_else(|| anyhow!("rows is none"))?
574 .len())
575 {
576 let job = self.client.job();
577 let job_info = self
578 .response
579 .job_reference
580 .as_ref()
581 .ok_or_else(|| anyhow!("job_reference is none"))?;
582 let params = GetQueryResultsParameters {
583 format_options: None,
584 location: job_info.location.clone(),
585 max_results: None,
586 page_token: self.response.page_token.clone(),
587 start_index: None,
588 timeout_ms: None,
589 };
590 self.response = self.rt.block_on(
591 job.get_query_results(
592 job_info
593 .project_id
594 .as_ref()
595 .ok_or_else(|| anyhow!("project_id is none"))?
596 .as_str(),
597 job_info
598 .job_id
599 .as_ref()
600 .ok_or_else(|| anyhow!("job_id is none"))?
601 .as_str(),
602 params,
603 ),
604 )?;
605 self.current_row = 0;
606 ridx = 0;
607 }
608 let rows = self
609 .response
610 .rows
611 .as_ref()
612 .ok_or_else(|| anyhow!("rows is none"))?;
613 let columns = rows[ridx]
614 .columns
615 .as_ref()
616 .ok_or_else(|| anyhow!("columns is none"))?;
617 let v = columns
618 .get(cidx)
619 .ok_or_else(|| anyhow!("Table Cell is none"))?
620 .value
621 .as_ref()
622 .ok_or_else(|| anyhow!("value is none"))?;
623 let s = v
624 .as_str()
625 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
626 NaiveDate::parse_from_str(s, "%Y-%m-%d")
627 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into())))?
628 }
629}
630
631impl<'r, 'a> Produce<'r, Option<NaiveDate>> for BigQuerySourceParser {
632 type Error = BigQuerySourceError;
633
634 #[throws(BigQuerySourceError)]
635 fn produce(&mut self) -> Option<NaiveDate> {
636 let (mut ridx, cidx) = self.next_loc()?;
637 if ridx
638 == (self
639 .response
640 .rows
641 .as_ref()
642 .ok_or_else(|| anyhow!("rows is none"))?
643 .len())
644 {
645 let job = self.client.job();
646 let job_info = self
647 .response
648 .job_reference
649 .as_ref()
650 .ok_or_else(|| anyhow!("job_reference is none"))?;
651 let params = GetQueryResultsParameters {
652 format_options: None,
653 location: job_info.location.clone(),
654 max_results: None,
655 page_token: self.response.page_token.clone(),
656 start_index: None,
657 timeout_ms: None,
658 };
659 self.response = self.rt.block_on(
660 job.get_query_results(
661 job_info
662 .project_id
663 .as_ref()
664 .ok_or_else(|| anyhow!("project_id is none"))?
665 .as_str(),
666 job_info
667 .job_id
668 .as_ref()
669 .ok_or_else(|| anyhow!("job_id is none"))?
670 .as_str(),
671 params,
672 ),
673 )?;
674 self.current_row = 0;
675 ridx = 0;
676 }
677 let rows = self
678 .response
679 .rows
680 .as_ref()
681 .ok_or_else(|| anyhow!("rows is none"))?;
682 let columns = rows[ridx]
683 .columns
684 .as_ref()
685 .ok_or_else(|| anyhow!("columns is none"))?;
686 match &columns
687 .get(cidx)
688 .ok_or_else(|| anyhow!("Table Cell is none"))?
689 .value
690 {
691 None => None,
692 Some(v) => {
693 let s = v
694 .as_str()
695 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
696 Some(
697 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
698 ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
699 })?,
700 )
701 }
702 }
703 }
704}
705
706impl<'r, 'a> Produce<'r, NaiveDateTime> for BigQuerySourceParser {
707 type Error = BigQuerySourceError;
708
709 #[throws(BigQuerySourceError)]
710 fn produce(&mut self) -> NaiveDateTime {
711 let (mut ridx, cidx) = self.next_loc()?;
712 if ridx
713 == (self
714 .response
715 .rows
716 .as_ref()
717 .ok_or_else(|| anyhow!("rows is none"))?
718 .len())
719 {
720 let job = self.client.job();
721 let job_info = self
722 .response
723 .job_reference
724 .as_ref()
725 .ok_or_else(|| anyhow!("job_reference is none"))?;
726 let params = GetQueryResultsParameters {
727 format_options: None,
728 location: job_info.location.clone(),
729 max_results: None,
730 page_token: self.response.page_token.clone(),
731 start_index: None,
732 timeout_ms: None,
733 };
734 self.response = self.rt.block_on(
735 job.get_query_results(
736 job_info
737 .project_id
738 .as_ref()
739 .ok_or_else(|| anyhow!("project_id is none"))?
740 .as_str(),
741 job_info
742 .job_id
743 .as_ref()
744 .ok_or_else(|| anyhow!("job_id is none"))?
745 .as_str(),
746 params,
747 ),
748 )?;
749 self.current_row = 0;
750 ridx = 0;
751 }
752 let rows = self
753 .response
754 .rows
755 .as_ref()
756 .ok_or_else(|| anyhow!("rows is none"))?;
757 let columns = rows[ridx]
758 .columns
759 .as_ref()
760 .ok_or_else(|| anyhow!("columns is none"))?;
761 let v = columns
762 .get(cidx)
763 .ok_or_else(|| anyhow!("Table Cell is none"))?
764 .value
765 .as_ref()
766 .ok_or_else(|| anyhow!("value is none"))?;
767 let s = v
768 .as_str()
769 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
770 NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
771 .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
772 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())))?
773 }
774}
775
776impl<'r, 'a> Produce<'r, Option<NaiveDateTime>> for BigQuerySourceParser {
777 type Error = BigQuerySourceError;
778
779 #[throws(BigQuerySourceError)]
780 fn produce(&mut self) -> Option<NaiveDateTime> {
781 let (mut ridx, cidx) = self.next_loc()?;
782 if ridx
783 == (self
784 .response
785 .rows
786 .as_ref()
787 .ok_or_else(|| anyhow!("rows is none"))?
788 .len())
789 {
790 let job = self.client.job();
791 let job_info = self
792 .response
793 .job_reference
794 .as_ref()
795 .ok_or_else(|| anyhow!("job_reference is none"))?;
796 let params = GetQueryResultsParameters {
797 format_options: None,
798 location: job_info.location.clone(),
799 max_results: None,
800 page_token: self.response.page_token.clone(),
801 start_index: None,
802 timeout_ms: None,
803 };
804 self.response = self.rt.block_on(
805 job.get_query_results(
806 job_info
807 .project_id
808 .as_ref()
809 .ok_or_else(|| anyhow!("project_id is none"))?
810 .as_str(),
811 job_info
812 .job_id
813 .as_ref()
814 .ok_or_else(|| anyhow!("job_id is none"))?
815 .as_str(),
816 params,
817 ),
818 )?;
819 self.current_row = 0;
820 ridx = 0;
821 }
822 let rows = self
823 .response
824 .rows
825 .as_ref()
826 .ok_or_else(|| anyhow!("rows is none"))?;
827 let columns = rows[ridx]
828 .columns
829 .as_ref()
830 .ok_or_else(|| anyhow!("columns is none"))?;
831 match &columns
832 .get(cidx)
833 .ok_or_else(|| anyhow!("Table Cell is none"))?
834 .value
835 {
836 None => None,
837 Some(v) => {
838 let s = v
839 .as_str()
840 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
841 Some(
842 NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
843 .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
844 .map_err(|_| {
845 ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into()))
846 })?,
847 )
848 }
849 }
850 }
851}
852
853impl<'r, 'a> Produce<'r, NaiveTime> for BigQuerySourceParser {
854 type Error = BigQuerySourceError;
855
856 #[throws(BigQuerySourceError)]
857 fn produce(&mut self) -> NaiveTime {
858 let (mut ridx, cidx) = self.next_loc()?;
859 if ridx
860 == (self
861 .response
862 .rows
863 .as_ref()
864 .ok_or_else(|| anyhow!("rows is none"))?
865 .len())
866 {
867 let job = self.client.job();
868 let job_info = self
869 .response
870 .job_reference
871 .as_ref()
872 .ok_or_else(|| anyhow!("job_reference is none"))?;
873 let params = GetQueryResultsParameters {
874 format_options: None,
875 location: job_info.location.clone(),
876 max_results: None,
877 page_token: self.response.page_token.clone(),
878 start_index: None,
879 timeout_ms: None,
880 };
881 self.response = self.rt.block_on(
882 job.get_query_results(
883 job_info
884 .project_id
885 .as_ref()
886 .ok_or_else(|| anyhow!("project_id is none"))?
887 .as_str(),
888 job_info
889 .job_id
890 .as_ref()
891 .ok_or_else(|| anyhow!("job_id is none"))?
892 .as_str(),
893 params,
894 ),
895 )?;
896 self.current_row = 0;
897 ridx = 0;
898 }
899 let rows = self
900 .response
901 .rows
902 .as_ref()
903 .ok_or_else(|| anyhow!("rows is none"))?;
904 let columns = rows[ridx]
905 .columns
906 .as_ref()
907 .ok_or_else(|| anyhow!("columns is none"))?;
908 let v = columns
909 .get(cidx)
910 .ok_or_else(|| anyhow!("Table Cell is none"))?
911 .value
912 .as_ref()
913 .ok_or_else(|| anyhow!("value is none"))?;
914 let s = v
915 .as_str()
916 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
917 NaiveTime::parse_from_str(s, "%H:%M:%S")
918 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?
919 }
920}
921
922impl<'r, 'a> Produce<'r, Option<NaiveTime>> for BigQuerySourceParser {
923 type Error = BigQuerySourceError;
924
925 #[throws(BigQuerySourceError)]
926 fn produce(&mut self) -> Option<NaiveTime> {
927 let (mut ridx, cidx) = self.next_loc()?;
928 if ridx
929 == (self
930 .response
931 .rows
932 .as_ref()
933 .ok_or_else(|| anyhow!("rows is none"))?
934 .len())
935 {
936 let job = self.client.job();
937 let job_info = self
938 .response
939 .job_reference
940 .as_ref()
941 .ok_or_else(|| anyhow!("job_reference is none"))?;
942 let params = GetQueryResultsParameters {
943 format_options: None,
944 location: job_info.location.clone(),
945 max_results: None,
946 page_token: self.response.page_token.clone(),
947 start_index: None,
948 timeout_ms: None,
949 };
950 self.response = self.rt.block_on(
951 job.get_query_results(
952 job_info
953 .project_id
954 .as_ref()
955 .ok_or_else(|| anyhow!("project_id is none"))?
956 .as_str(),
957 job_info
958 .job_id
959 .as_ref()
960 .ok_or_else(|| anyhow!("job_id is none"))?
961 .as_str(),
962 params,
963 ),
964 )?;
965 self.current_row = 0;
966 ridx = 0;
967 }
968 let rows = self
969 .response
970 .rows
971 .as_ref()
972 .ok_or_else(|| anyhow!("rows is none"))?;
973 let columns = rows[ridx]
974 .columns
975 .as_ref()
976 .ok_or_else(|| anyhow!("columns is none"))?;
977 match &columns
978 .get(cidx)
979 .ok_or_else(|| anyhow!("Table Cell is none"))?
980 .value
981 {
982 None => None,
983 Some(v) => {
984 let s = v
985 .as_str()
986 .ok_or_else(|| anyhow!("cannot get str from json value"))?;
987 Some(
988 NaiveTime::parse_from_str(s, "%H:%M:%S").map_err(|_| {
989 ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into()))
990 })?,
991 )
992 }
993 }
994 }
995}
996
997impl<'r, 'a> Produce<'r, DateTime<Utc>> for BigQuerySourceParser {
998 type Error = BigQuerySourceError;
999
1000 #[throws(BigQuerySourceError)]
1001 fn produce(&mut self) -> DateTime<Utc> {
1002 let (mut ridx, cidx) = self.next_loc()?;
1003 if ridx
1004 == (self
1005 .response
1006 .rows
1007 .as_ref()
1008 .ok_or_else(|| anyhow!("rows is none"))?
1009 .len())
1010 {
1011 let job = self.client.job();
1012 let job_info = self
1013 .response
1014 .job_reference
1015 .as_ref()
1016 .ok_or_else(|| anyhow!("job_reference is none"))?;
1017 let params = GetQueryResultsParameters {
1018 format_options: None,
1019 location: job_info.location.clone(),
1020 max_results: None,
1021 page_token: self.response.page_token.clone(),
1022 start_index: None,
1023 timeout_ms: None,
1024 };
1025 self.response = self.rt.block_on(
1026 job.get_query_results(
1027 job_info
1028 .project_id
1029 .as_ref()
1030 .ok_or_else(|| anyhow!("project_id is none"))?
1031 .as_str(),
1032 job_info
1033 .job_id
1034 .as_ref()
1035 .ok_or_else(|| anyhow!("job_id is none"))?
1036 .as_str(),
1037 params,
1038 ),
1039 )?;
1040 self.current_row = 0;
1041 ridx = 0;
1042 }
1043 let rows = self
1044 .response
1045 .rows
1046 .as_ref()
1047 .ok_or_else(|| anyhow!("rows is none"))?;
1048 let columns = rows[ridx]
1049 .columns
1050 .as_ref()
1051 .ok_or_else(|| anyhow!("columns is none"))?;
1052 let v = columns
1053 .get(cidx)
1054 .ok_or_else(|| anyhow!("Table Cell is none"))?
1055 .value
1056 .as_ref()
1057 .ok_or_else(|| anyhow!("value is none"))?;
1058 let timestamp_ns = (v
1059 .as_str()
1060 .ok_or_else(|| anyhow!("cannot get str from json value"))?
1061 .parse::<f64>()?
1062 * 1e9) as i64;
1063 let secs = timestamp_ns / 1000000000;
1064 let nsecs = (timestamp_ns % 1000000000) as u32;
1065 DateTime::from_timestamp(secs, nsecs)
1066 .unwrap_or_else(|| panic!("out of range number: {} {}", secs, nsecs))
1067 }
1068}
1069
1070impl<'r, 'a> Produce<'r, Option<DateTime<Utc>>> for BigQuerySourceParser {
1071 type Error = BigQuerySourceError;
1072
1073 #[throws(BigQuerySourceError)]
1074 fn produce(&mut self) -> Option<DateTime<Utc>> {
1075 let (mut ridx, cidx) = self.next_loc()?;
1076 if ridx
1077 == (self
1078 .response
1079 .rows
1080 .as_ref()
1081 .ok_or_else(|| anyhow!("rows is none"))?
1082 .len())
1083 {
1084 let job = self.client.job();
1085 let job_info = self
1086 .response
1087 .job_reference
1088 .as_ref()
1089 .ok_or_else(|| anyhow!("job_reference is none"))?;
1090 let params = GetQueryResultsParameters {
1091 format_options: None,
1092 location: job_info.location.clone(),
1093 max_results: None,
1094 page_token: self.response.page_token.clone(),
1095 start_index: None,
1096 timeout_ms: None,
1097 };
1098 self.response = self.rt.block_on(
1099 job.get_query_results(
1100 job_info
1101 .project_id
1102 .as_ref()
1103 .ok_or_else(|| anyhow!("project_id is none"))?
1104 .as_str(),
1105 job_info
1106 .job_id
1107 .as_ref()
1108 .ok_or_else(|| anyhow!("job_id is none"))?
1109 .as_str(),
1110 params,
1111 ),
1112 )?;
1113 self.current_row = 0;
1114 ridx = 0;
1115 }
1116 let rows = self
1117 .response
1118 .rows
1119 .as_ref()
1120 .ok_or_else(|| anyhow!("rows is none"))?;
1121 let columns = rows[ridx]
1122 .columns
1123 .as_ref()
1124 .ok_or_else(|| anyhow!("columns is none"))?;
1125 match &columns
1126 .get(cidx)
1127 .ok_or_else(|| anyhow!("Table Cell is none"))?
1128 .value
1129 {
1130 None => None,
1131 Some(v) => {
1132 let timestamp_ns = (v
1133 .as_str()
1134 .ok_or_else(|| anyhow!("cannot get str from json value"))?
1135 .parse::<f64>()?
1136 * 1e9) as i64;
1137 let secs = timestamp_ns / 1000000000;
1138 let nsecs = (timestamp_ns % 1000000000) as u32;
1139 DateTime::from_timestamp(secs, nsecs)
1140 }
1141 }
1142 }
1143}