1mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
11
12use crate::constants::DB_BUFFER_SIZE;
13use crate::{
14 data_order::DataOrder,
15 errors::ConnectorXError,
16 sources::{PartitionParser, Produce, Source, SourcePartition},
17 sql::{count_query, CXQuery},
18};
19use anyhow::anyhow;
20use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
21use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
22use fehler::{throw, throws};
23use hex::decode;
24use postgres::{
25 binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
26 fallible_iterator::FallibleIterator,
27 tls::{MakeTlsConnect, TlsConnect},
28 Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
29};
30use r2d2::{Pool, PooledConnection};
31use r2d2_postgres::PostgresConnectionManager;
32use rust_decimal::Decimal;
33use serde_json::{from_str, Value};
34use sqlparser::dialect::PostgreSqlDialect;
35use std::collections::HashMap;
36use std::convert::TryFrom;
37use std::marker::PhantomData;
38use uuid::Uuid;
39
40pub enum BinaryProtocol {}
42
43pub enum CSVProtocol {}
45
46pub enum CursorProtocol {}
48
49pub enum SimpleProtocol {}
51
52type PgManager<C> = PostgresConnectionManager<C>;
53type PgConn<C> = PooledConnection<PgManager<C>>;
54
55fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
57 let nrows: Option<R> = row.get(0);
58 nrows.expect("Could not parse int result from count_query")
59}
60
61#[throws(PostgresSourceError)]
62fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
63where
64 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
65 C::TlsConnect: Send,
66 C::Stream: Send,
67 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
68{
69 let dialect = PostgreSqlDialect {};
70
71 let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
72 let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
73 match col_type {
74 PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
75 PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
76 PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
77 _ => throw!(anyhow!(
78 "The result of the count query was not an int, aborting."
79 )),
80 }
81}
82
83pub struct PostgresSource<P, C>
84where
85 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
86 C::TlsConnect: Send,
87 C::Stream: Send,
88 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
89{
90 pool: Pool<PgManager<C>>,
91 origin_query: Option<String>,
92 queries: Vec<CXQuery<String>>,
93 names: Vec<String>,
94 schema: Vec<PostgresTypeSystem>,
95 pg_schema: Vec<postgres::types::Type>,
96 pre_execution_queries: Option<Vec<String>>,
97 _protocol: PhantomData<P>,
98}
99
100impl<P, C> PostgresSource<P, C>
101where
102 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
103 C::TlsConnect: Send,
104 C::Stream: Send,
105 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
106{
107 #[throws(PostgresSourceError)]
108 pub fn new(config: Config, tls: C, nconn: usize) -> Self {
109 let manager = PostgresConnectionManager::new(config, tls);
110 let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
111
112 Self {
113 pool,
114 origin_query: None,
115 queries: vec![],
116 names: vec![],
117 schema: vec![],
118 pg_schema: vec![],
119 pre_execution_queries: None,
120 _protocol: PhantomData,
121 }
122 }
123}
124
125impl<P, C> Source for PostgresSource<P, C>
126where
127 PostgresSourcePartition<P, C>:
128 SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
129 P: Send,
130 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
131 C::TlsConnect: Send,
132 C::Stream: Send,
133 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
134{
135 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
136 type Partition = PostgresSourcePartition<P, C>;
137 type TypeSystem = PostgresTypeSystem;
138 type Error = PostgresSourceError;
139
140 #[throws(PostgresSourceError)]
141 fn set_data_order(&mut self, data_order: DataOrder) {
142 if !matches!(data_order, DataOrder::RowMajor) {
143 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
144 }
145 }
146
147 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
148 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
149 }
150
151 fn set_origin_query(&mut self, query: Option<String>) {
152 self.origin_query = query;
153 }
154
155 fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
156 self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
157 }
158
159 #[throws(PostgresSourceError)]
160 fn fetch_metadata(&mut self) {
161 assert!(!self.queries.is_empty());
162
163 let mut conn = self.pool.get()?;
164 let first_query = &self.queries[0];
165
166 let stmt = conn.prepare(first_query.as_str())?;
167
168 let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
169 .columns()
170 .iter()
171 .map(|col| (col.name().to_string(), col.type_().clone()))
172 .unzip();
173
174 self.names = names;
175 self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
176 self.pg_schema = self
177 .schema
178 .iter()
179 .zip(pg_types.iter())
180 .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
181 .collect();
182 }
183
184 #[throws(PostgresSourceError)]
185 fn result_rows(&mut self) -> Option<usize> {
186 match &self.origin_query {
187 Some(q) => {
188 let cxq = CXQuery::Naked(q.clone());
189 let mut conn = self.pool.get()?;
190 let nrows = get_total_rows(&mut conn, &cxq)?;
191 Some(nrows)
192 }
193 None => None,
194 }
195 }
196
197 fn names(&self) -> Vec<String> {
198 self.names.clone()
199 }
200
201 fn schema(&self) -> Vec<Self::TypeSystem> {
202 self.schema.clone()
203 }
204
205 #[throws(PostgresSourceError)]
206 fn partition(self) -> Vec<Self::Partition> {
207 let mut ret = vec![];
208 for query in self.queries {
209 let mut conn = self.pool.get()?;
210
211 if let Some(pre_queries) = &self.pre_execution_queries {
212 for pre_query in pre_queries {
213 conn.query(pre_query, &[])?;
214 }
215 }
216
217 ret.push(PostgresSourcePartition::<P, C>::new(
218 conn,
219 &query,
220 &self.schema,
221 &self.pg_schema,
222 ));
223 }
224 ret
225 }
226}
227
228pub struct PostgresSourcePartition<P, C>
229where
230 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
231 C::TlsConnect: Send,
232 C::Stream: Send,
233 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
234{
235 conn: PgConn<C>,
236 query: CXQuery<String>,
237 schema: Vec<PostgresTypeSystem>,
238 pg_schema: Vec<postgres::types::Type>,
239 nrows: usize,
240 ncols: usize,
241 _protocol: PhantomData<P>,
242}
243
244impl<P, C> PostgresSourcePartition<P, C>
245where
246 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
247 C::TlsConnect: Send,
248 C::Stream: Send,
249 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
250{
251 pub fn new(
252 conn: PgConn<C>,
253 query: &CXQuery<String>,
254 schema: &[PostgresTypeSystem],
255 pg_schema: &[postgres::types::Type],
256 ) -> Self {
257 Self {
258 conn,
259 query: query.clone(),
260 schema: schema.to_vec(),
261 pg_schema: pg_schema.to_vec(),
262 nrows: 0,
263 ncols: schema.len(),
264 _protocol: PhantomData,
265 }
266 }
267}
268
269impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
270where
271 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
272 C::TlsConnect: Send,
273 C::Stream: Send,
274 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
275{
276 type TypeSystem = PostgresTypeSystem;
277 type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
278 type Error = PostgresSourceError;
279
280 #[throws(PostgresSourceError)]
281 fn result_rows(&mut self) -> () {
282 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
283 }
284
285 #[throws(PostgresSourceError)]
286 fn parser(&mut self) -> Self::Parser<'_> {
287 let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
288 let reader = self.conn.copy_out(&*query)?; let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
290
291 PostgresBinarySourcePartitionParser::new(iter, &self.schema)
292 }
293
294 fn nrows(&self) -> usize {
295 self.nrows
296 }
297
298 fn ncols(&self) -> usize {
299 self.ncols
300 }
301}
302
303impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
304where
305 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
306 C::TlsConnect: Send,
307 C::Stream: Send,
308 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
309{
310 type TypeSystem = PostgresTypeSystem;
311 type Parser<'a> = PostgresCSVSourceParser<'a>;
312 type Error = PostgresSourceError;
313
314 #[throws(PostgresSourceError)]
315 fn result_rows(&mut self) {
316 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
317 }
318
319 #[throws(PostgresSourceError)]
320 fn parser(&mut self) -> Self::Parser<'_> {
321 let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
322 let reader = self.conn.copy_out(&*query)?; let iter = ReaderBuilder::new()
324 .has_headers(false)
325 .from_reader(reader)
326 .into_records();
327
328 PostgresCSVSourceParser::new(iter, &self.schema)
329 }
330
331 fn nrows(&self) -> usize {
332 self.nrows
333 }
334
335 fn ncols(&self) -> usize {
336 self.ncols
337 }
338}
339
340impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
341where
342 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
343 C::TlsConnect: Send,
344 C::Stream: Send,
345 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
346{
347 type TypeSystem = PostgresTypeSystem;
348 type Parser<'a> = PostgresRawSourceParser<'a>;
349 type Error = PostgresSourceError;
350
351 #[throws(PostgresSourceError)]
352 fn result_rows(&mut self) {
353 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
354 }
355
356 #[throws(PostgresSourceError)]
357 fn parser(&mut self) -> Self::Parser<'_> {
358 let iter = self
359 .conn
360 .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; PostgresRawSourceParser::new(iter, &self.schema)
362 }
363
364 fn nrows(&self) -> usize {
365 self.nrows
366 }
367
368 fn ncols(&self) -> usize {
369 self.ncols
370 }
371}
372pub struct PostgresBinarySourcePartitionParser<'a> {
373 iter: BinaryCopyOutIter<'a>,
374 rowbuf: Vec<BinaryCopyOutRow>,
375 ncols: usize,
376 current_col: usize,
377 current_row: usize,
378 is_finished: bool,
379}
380
381impl<'a> PostgresBinarySourcePartitionParser<'a> {
382 pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
383 Self {
384 iter,
385 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
386 ncols: schema.len(),
387 current_row: 0,
388 current_col: 0,
389 is_finished: false,
390 }
391 }
392
393 #[throws(PostgresSourceError)]
394 fn next_loc(&mut self) -> (usize, usize) {
395 let ret = (self.current_row, self.current_col);
396 self.current_row += (self.current_col + 1) / self.ncols;
397 self.current_col = (self.current_col + 1) % self.ncols;
398 ret
399 }
400}
401
402impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
403 type TypeSystem = PostgresTypeSystem;
404 type Error = PostgresSourceError;
405
406 #[throws(PostgresSourceError)]
407 fn fetch_next(&mut self) -> (usize, bool) {
408 assert!(self.current_col == 0);
409 let remaining_rows = self.rowbuf.len() - self.current_row;
410 if remaining_rows > 0 {
411 return (remaining_rows, self.is_finished);
412 } else if self.is_finished {
413 return (0, self.is_finished);
414 }
415
416 if !self.rowbuf.is_empty() {
418 self.rowbuf.drain(..);
419 }
420 for _ in 0..DB_BUFFER_SIZE {
421 match self.iter.next()? {
422 Some(row) => {
423 self.rowbuf.push(row);
424 }
425 None => {
426 self.is_finished = true;
427 break;
428 }
429 }
430 }
431
432 self.current_row = 0;
434 self.current_col = 0;
435
436 (self.rowbuf.len(), self.is_finished)
437 }
438}
439
440macro_rules! impl_produce {
441 ($($t: ty,)+) => {
442 $(
443 impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
444 type Error = PostgresSourceError;
445
446 #[throws(PostgresSourceError)]
447 fn produce(&'r mut self) -> $t {
448 let (ridx, cidx) = self.next_loc()?;
449 let row = &self.rowbuf[ridx];
450 let val = row.try_get(cidx)?;
451 val
452 }
453 }
454
455 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
456 type Error = PostgresSourceError;
457
458 #[throws(PostgresSourceError)]
459 fn produce(&'r mut self) -> Option<$t> {
460 let (ridx, cidx) = self.next_loc()?;
461 let row = &self.rowbuf[ridx];
462 let val = row.try_get(cidx)?;
463 val
464 }
465 }
466 )+
467 };
468}
469
470impl_produce!(
471 i8,
472 i16,
473 i32,
474 i64,
475 f32,
476 f64,
477 Decimal,
478 bool,
479 &'r str,
480 Vec<u8>,
481 NaiveTime,
482 Uuid,
483 Value,
484 IpInet,
485 Vec<Option<bool>>,
486 Vec<Option<i16>>,
487 Vec<Option<i32>>,
488 Vec<Option<i64>>,
489 Vec<Option<Decimal>>,
490 Vec<Option<f32>>,
491 Vec<Option<f64>>,
492 Vec<Option<String>>,
493);
494
495impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
496 type Error = PostgresSourceError;
497
498 #[throws(PostgresSourceError)]
499 fn produce(&'r mut self) -> NaiveDateTime {
500 let (ridx, cidx) = self.next_loc()?;
501 let row = &self.rowbuf[ridx];
502 let val = row.try_get(cidx)?;
503 match val {
504 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
505 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
506 postgres::types::Timestamp::Value(t) => t,
507 }
508 }
509}
510
511impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
512 type Error = PostgresSourceError;
513
514 #[throws(PostgresSourceError)]
515 fn produce(&'r mut self) -> Option<NaiveDateTime> {
516 let (ridx, cidx) = self.next_loc()?;
517 let row = &self.rowbuf[ridx];
518 let val = row.try_get(cidx)?;
519 match val {
520 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
521 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
522 Some(postgres::types::Timestamp::Value(t)) => t,
523 None => None,
524 }
525 }
526}
527
528impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
529 type Error = PostgresSourceError;
530
531 #[throws(PostgresSourceError)]
532 fn produce(&'r mut self) -> DateTime<Utc> {
533 let (ridx, cidx) = self.next_loc()?;
534 let row = &self.rowbuf[ridx];
535 let val = row.try_get(cidx)?;
536 match val {
537 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
538 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
539 postgres::types::Timestamp::Value(t) => t,
540 }
541 }
542}
543
544impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
545 type Error = PostgresSourceError;
546
547 #[throws(PostgresSourceError)]
548 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
549 let (ridx, cidx) = self.next_loc()?;
550 let row = &self.rowbuf[ridx];
551 let val = row.try_get(cidx)?;
552 match val {
553 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
554 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
555 Some(postgres::types::Timestamp::Value(t)) => t,
556 None => None,
557 }
558 }
559}
560
561impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
562 type Error = PostgresSourceError;
563
564 #[throws(PostgresSourceError)]
565 fn produce(&'r mut self) -> NaiveDate {
566 let (ridx, cidx) = self.next_loc()?;
567 let row = &self.rowbuf[ridx];
568 let val = row.try_get(cidx)?;
569 match val {
570 postgres::types::Date::PosInfinity => NaiveDate::MAX,
571 postgres::types::Date::NegInfinity => NaiveDate::MIN,
572 postgres::types::Date::Value(t) => t,
573 }
574 }
575}
576
577impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
578 type Error = PostgresSourceError;
579
580 #[throws(PostgresSourceError)]
581 fn produce(&'r mut self) -> Option<NaiveDate> {
582 let (ridx, cidx) = self.next_loc()?;
583 let row = &self.rowbuf[ridx];
584 let val = row.try_get(cidx)?;
585 match val {
586 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
587 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
588 Some(postgres::types::Date::Value(t)) => t,
589 None => None,
590 }
591 }
592}
593
594impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
595 type Error = PostgresSourceError;
596 #[throws(PostgresSourceError)]
597 fn produce(&mut self) -> HashMap<String, Option<String>> {
598 unimplemented!("Please use `cursor` protocol for hstore type");
599 }
600}
601
602impl Produce<'_, Option<HashMap<String, Option<String>>>>
603 for PostgresBinarySourcePartitionParser<'_>
604{
605 type Error = PostgresSourceError;
606 #[throws(PostgresSourceError)]
607 fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
608 unimplemented!("Please use `cursor` protocol for hstore type");
609 }
610}
611
612pub struct PostgresCSVSourceParser<'a> {
613 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
614 rowbuf: Vec<StringRecord>,
615 ncols: usize,
616 current_col: usize,
617 current_row: usize,
618 is_finished: bool,
619}
620
621impl<'a> PostgresCSVSourceParser<'a> {
622 pub fn new(
623 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
624 schema: &[PostgresTypeSystem],
625 ) -> Self {
626 Self {
627 iter,
628 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
629 ncols: schema.len(),
630 current_row: 0,
631 current_col: 0,
632 is_finished: false,
633 }
634 }
635
636 #[throws(PostgresSourceError)]
637 fn next_loc(&mut self) -> (usize, usize) {
638 let ret = (self.current_row, self.current_col);
639 self.current_row += (self.current_col + 1) / self.ncols;
640 self.current_col = (self.current_col + 1) % self.ncols;
641 ret
642 }
643}
644
645impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
646 type Error = PostgresSourceError;
647 type TypeSystem = PostgresTypeSystem;
648
649 #[throws(PostgresSourceError)]
650 fn fetch_next(&mut self) -> (usize, bool) {
651 assert!(self.current_col == 0);
652 let remaining_rows = self.rowbuf.len() - self.current_row;
653 if remaining_rows > 0 {
654 return (remaining_rows, self.is_finished);
655 } else if self.is_finished {
656 return (0, self.is_finished);
657 }
658
659 if !self.rowbuf.is_empty() {
660 self.rowbuf.drain(..);
661 }
662 for _ in 0..DB_BUFFER_SIZE {
663 if let Some(row) = self.iter.next() {
664 self.rowbuf.push(row?);
665 } else {
666 self.is_finished = true;
667 break;
668 }
669 }
670 self.current_row = 0;
671 self.current_col = 0;
672 (self.rowbuf.len(), self.is_finished)
673 }
674}
675
676macro_rules! impl_csv_produce {
677 ($($t: ty,)+) => {
678 $(
679 impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
680 type Error = PostgresSourceError;
681
682 #[throws(PostgresSourceError)]
683 fn produce(&'r mut self) -> $t {
684 let (ridx, cidx) = self.next_loc()?;
685 self.rowbuf[ridx][cidx].parse().map_err(|_| {
686 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
687 })?
688 }
689 }
690
691 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
692 type Error = PostgresSourceError;
693
694 #[throws(PostgresSourceError)]
695 fn produce(&'r mut self) -> Option<$t> {
696 let (ridx, cidx) = self.next_loc()?;
697 match &self.rowbuf[ridx][cidx][..] {
698 "" => None,
699 v => Some(v.parse().map_err(|_| {
700 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
701 })?),
702 }
703 }
704 }
705 )+
706 };
707}
708
709impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
710
711macro_rules! impl_csv_vec_produce {
712 ($($t: ty,)+) => {
713 $(
714 impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
715 type Error = PostgresSourceError;
716
717 #[throws(PostgresSourceError)]
718 fn produce(&mut self) -> Vec<Option<$t>> {
719 let (ridx, cidx) = self.next_loc()?;
720 let s = &self.rowbuf[ridx][cidx][..];
721 match s {
722 "{}" => vec![],
723 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
724 s => s[1..s.len() - 1]
725 .split(",")
726 .map(|v| {
727 if v == "NULL" {
728 Ok(None)
729 } else {
730 match v.parse() {
731 Ok(v) => Ok(Some(v)),
732 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
733 }
734 }
735 })
736 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
737 }
738 }
739 }
740
741 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
742 type Error = PostgresSourceError;
743
744 #[throws(PostgresSourceError)]
745 fn produce(&mut self) -> Option<Vec<Option<$t>>> {
746 let (ridx, cidx) = self.next_loc()?;
747 let s = &self.rowbuf[ridx][cidx][..];
748 match s {
749 "" => None,
750 "{}" => Some(vec![]),
751 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
752 s => Some(
753 s[1..s.len() - 1]
754 .split(",")
755 .map(|v| {
756 if v == "NULL" {
757 Ok(None)
758 } else {
759 match v.parse() {
760 Ok(v) => Ok(Some(v)),
761 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
762 }
763 }
764 })
765 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
766 ),
767 }
768 }
769 }
770 )+
771 };
772}
773
774impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
775
776impl Produce<'_, HashMap<String, Option<String>>> for PostgresCSVSourceParser<'_> {
777 type Error = PostgresSourceError;
778 #[throws(PostgresSourceError)]
779 fn produce(&mut self) -> HashMap<String, Option<String>> {
780 unimplemented!("Please use `cursor` protocol for hstore type");
781 }
782}
783
784impl Produce<'_, Option<HashMap<String, Option<String>>>> for PostgresCSVSourceParser<'_> {
785 type Error = PostgresSourceError;
786 #[throws(PostgresSourceError)]
787 fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
788 unimplemented!("Please use `cursor` protocol for hstore type");
789 }
790}
791
792impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
793 type Error = PostgresSourceError;
794
795 #[throws(PostgresSourceError)]
796 fn produce(&mut self) -> bool {
797 let (ridx, cidx) = self.next_loc()?;
798 let ret = match &self.rowbuf[ridx][cidx][..] {
799 "t" => true,
800 "f" => false,
801 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
802 self.rowbuf[ridx][cidx].into()
803 ))),
804 };
805 ret
806 }
807}
808
809impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
810 type Error = PostgresSourceError;
811
812 #[throws(PostgresSourceError)]
813 fn produce(&mut self) -> Option<bool> {
814 let (ridx, cidx) = self.next_loc()?;
815 let ret = match &self.rowbuf[ridx][cidx][..] {
816 "" => None,
817 "t" => Some(true),
818 "f" => Some(false),
819 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
820 self.rowbuf[ridx][cidx].into()
821 ))),
822 };
823 ret
824 }
825}
826
827impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
828 type Error = PostgresSourceError;
829
830 #[throws(PostgresSourceError)]
831 fn produce(&mut self) -> Vec<Option<bool>> {
832 let (ridx, cidx) = self.next_loc()?;
833 let s = &self.rowbuf[ridx][cidx][..];
834 match s {
835 "{}" => vec![],
836 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
837 s => s[1..s.len() - 1]
838 .split(',')
839 .map(|v| match v {
840 "NULL" => Ok(None),
841 "t" => Ok(Some(true)),
842 "f" => Ok(Some(false)),
843 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
844 })
845 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
846 }
847 }
848}
849
850impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
851 type Error = PostgresSourceError;
852
853 #[throws(PostgresSourceError)]
854 fn produce(&mut self) -> Option<Vec<Option<bool>>> {
855 let (ridx, cidx) = self.next_loc()?;
856 let s = &self.rowbuf[ridx][cidx][..];
857 match s {
858 "" => None,
859 "{}" => Some(vec![]),
860 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
861 s => Some(
862 s[1..s.len() - 1]
863 .split(',')
864 .map(|v| match v {
865 "NULL" => Ok(None),
866 "t" => Ok(Some(true)),
867 "f" => Ok(Some(false)),
868 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
869 })
870 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
871 ),
872 }
873 }
874}
875
876impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
877 type Error = PostgresSourceError;
878
879 #[throws(PostgresSourceError)]
880 fn produce(&'r mut self) -> Decimal {
881 let (ridx, cidx) = self.next_loc()?;
882 match &self.rowbuf[ridx][cidx][..] {
883 "Infinity" => Decimal::MAX,
884 "-Infinity" => Decimal::MIN,
885 v => v
886 .parse()
887 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
888 }
889 }
890}
891
892impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
893 type Error = PostgresSourceError;
894
895 #[throws(PostgresSourceError)]
896 fn produce(&'r mut self) -> Option<Decimal> {
897 let (ridx, cidx) = self.next_loc()?;
898 match &self.rowbuf[ridx][cidx][..] {
899 "" => None,
900 "Infinity" => Some(Decimal::MAX),
901 "-Infinity" => Some(Decimal::MIN),
902 v => Some(
903 v.parse()
904 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
905 ),
906 }
907 }
908}
909
910impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
911 type Error = PostgresSourceError;
912
913 #[throws(PostgresSourceError)]
914 fn produce(&mut self) -> DateTime<Utc> {
915 let (ridx, cidx) = self.next_loc()?;
916 match &self.rowbuf[ridx][cidx][..] {
917 "infinity" => DateTime::<Utc>::MAX_UTC,
918 "-infinity" => DateTime::<Utc>::MIN_UTC,
919 v => format!("{}:00", v)
921 .parse()
922 .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
923 }
924 }
925}
926
927impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
928 type Error = PostgresSourceError;
929
930 #[throws(PostgresSourceError)]
931 fn produce(&mut self) -> Option<DateTime<Utc>> {
932 let (ridx, cidx) = self.next_loc()?;
933 match &self.rowbuf[ridx][cidx][..] {
934 "" => None,
935 "infinity" => Some(DateTime::<Utc>::MAX_UTC),
936 "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
937 v => {
938 Some(format!("{}:00", v).parse().map_err(|_| {
940 ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
941 })?)
942 }
943 }
944 }
945}
946
947impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
948 type Error = PostgresSourceError;
949
950 #[throws(PostgresSourceError)]
951 fn produce(&mut self) -> NaiveDate {
952 let (ridx, cidx) = self.next_loc()?;
953 match &self.rowbuf[ridx][cidx][..] {
954 "infinity" => NaiveDate::MAX,
955 "-infinity" => NaiveDate::MIN,
956 v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
957 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
958 }
959 }
960}
961
962impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
963 type Error = PostgresSourceError;
964
965 #[throws(PostgresSourceError)]
966 fn produce(&mut self) -> Option<NaiveDate> {
967 let (ridx, cidx) = self.next_loc()?;
968 match &self.rowbuf[ridx][cidx][..] {
969 "" => None,
970 "infinity" => Some(NaiveDate::MAX),
971 "-infinity" => Some(NaiveDate::MIN),
972 v => Some(
973 NaiveDate::parse_from_str(v, "%Y-%m-%d")
974 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
975 ),
976 }
977 }
978}
979
980impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
981 type Error = PostgresSourceError;
982
983 #[throws(PostgresSourceError)]
984 fn produce(&mut self) -> NaiveDateTime {
985 let (ridx, cidx) = self.next_loc()?;
986 match &self.rowbuf[ridx][cidx] {
987 "infinity" => NaiveDateTime::MAX,
988 "-infinity" => NaiveDateTime::MIN,
989 v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
990 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
991 }
992 }
993}
994
995impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
996 type Error = PostgresSourceError;
997
998 #[throws(PostgresSourceError)]
999 fn produce(&mut self) -> Option<NaiveDateTime> {
1000 let (ridx, cidx) = self.next_loc()?;
1001 match &self.rowbuf[ridx][cidx][..] {
1002 "" => None,
1003 "infinity" => Some(NaiveDateTime::MAX),
1004 "-infinity" => Some(NaiveDateTime::MIN),
1005 v => Some(
1006 NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1007 ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1008 })?,
1009 ),
1010 }
1011 }
1012}
1013
1014impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1015 type Error = PostgresSourceError;
1016
1017 #[throws(PostgresSourceError)]
1018 fn produce(&mut self) -> NaiveTime {
1019 let (ridx, cidx) = self.next_loc()?;
1020 NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1021 ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1022 })?
1023 }
1024}
1025
1026impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1027 type Error = PostgresSourceError;
1028
1029 #[throws(PostgresSourceError)]
1030 fn produce(&mut self) -> Option<NaiveTime> {
1031 let (ridx, cidx) = self.next_loc()?;
1032 match &self.rowbuf[ridx][cidx][..] {
1033 "" => None,
1034 v => Some(
1035 NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1036 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1037 ),
1038 }
1039 }
1040}
1041
1042impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1043 type Error = PostgresSourceError;
1044
1045 #[throws(PostgresSourceError)]
1046 fn produce(&'r mut self) -> &'r str {
1047 let (ridx, cidx) = self.next_loc()?;
1048 &self.rowbuf[ridx][cidx]
1049 }
1050}
1051
1052impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1053 type Error = PostgresSourceError;
1054
1055 #[throws(PostgresSourceError)]
1056 fn produce(&'r mut self) -> Option<&'r str> {
1057 let (ridx, cidx) = self.next_loc()?;
1058 match &self.rowbuf[ridx][cidx][..] {
1059 "" => None,
1060 v => Some(v),
1061 }
1062 }
1063}
1064
1065impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1066 type Error = PostgresSourceError;
1067
1068 #[throws(PostgresSourceError)]
1069 fn produce(&'r mut self) -> Vec<u8> {
1070 let (ridx, cidx) = self.next_loc()?;
1071 decode(&self.rowbuf[ridx][cidx][2..])? }
1073}
1074
1075impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1076 type Error = PostgresSourceError;
1077
1078 #[throws(PostgresSourceError)]
1079 fn produce(&'r mut self) -> Option<Vec<u8>> {
1080 let (ridx, cidx) = self.next_loc()?;
1081 match &self.rowbuf[ridx][cidx] {
1082 "" => None,
1084 v => Some(decode(&v[2..])?),
1085 }
1086 }
1087}
1088
1089impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1090 type Error = PostgresSourceError;
1091
1092 #[throws(PostgresSourceError)]
1093 fn produce(&'r mut self) -> Value {
1094 let (ridx, cidx) = self.next_loc()?;
1095 let v = &self.rowbuf[ridx][cidx];
1096 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1097 }
1098}
1099
1100impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1101 type Error = PostgresSourceError;
1102
1103 #[throws(PostgresSourceError)]
1104 fn produce(&'r mut self) -> Option<Value> {
1105 let (ridx, cidx) = self.next_loc()?;
1106
1107 match &self.rowbuf[ridx][cidx][..] {
1108 "" => None,
1109 v => {
1110 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1111 }
1112 }
1113 }
1114}
1115
1116pub struct PostgresRawSourceParser<'a> {
1117 iter: RowIter<'a>,
1118 rowbuf: Vec<Row>,
1119 ncols: usize,
1120 current_col: usize,
1121 current_row: usize,
1122 is_finished: bool,
1123}
1124
1125impl<'a> PostgresRawSourceParser<'a> {
1126 pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1127 Self {
1128 iter,
1129 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1130 ncols: schema.len(),
1131 current_row: 0,
1132 current_col: 0,
1133 is_finished: false,
1134 }
1135 }
1136
1137 #[throws(PostgresSourceError)]
1138 fn next_loc(&mut self) -> (usize, usize) {
1139 let ret = (self.current_row, self.current_col);
1140 self.current_row += (self.current_col + 1) / self.ncols;
1141 self.current_col = (self.current_col + 1) % self.ncols;
1142 ret
1143 }
1144}
1145
1146impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1147 type TypeSystem = PostgresTypeSystem;
1148 type Error = PostgresSourceError;
1149
1150 #[throws(PostgresSourceError)]
1151 fn fetch_next(&mut self) -> (usize, bool) {
1152 assert!(self.current_col == 0);
1153 let remaining_rows = self.rowbuf.len() - self.current_row;
1154 if remaining_rows > 0 {
1155 return (remaining_rows, self.is_finished);
1156 } else if self.is_finished {
1157 return (0, self.is_finished);
1158 }
1159
1160 if !self.rowbuf.is_empty() {
1161 self.rowbuf.drain(..);
1162 }
1163 for _ in 0..DB_BUFFER_SIZE {
1164 if let Some(row) = self.iter.next()? {
1165 self.rowbuf.push(row);
1166 } else {
1167 self.is_finished = true;
1168 break;
1169 }
1170 }
1171 self.current_row = 0;
1172 self.current_col = 0;
1173 (self.rowbuf.len(), self.is_finished)
1174 }
1175}
1176
1177macro_rules! impl_produce {
1178 ($($t: ty,)+) => {
1179 $(
1180 impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1181 type Error = PostgresSourceError;
1182
1183 #[throws(PostgresSourceError)]
1184 fn produce(&'r mut self) -> $t {
1185 let (ridx, cidx) = self.next_loc()?;
1186 let row = &self.rowbuf[ridx];
1187 let val = row.try_get(cidx)?;
1188 val
1189 }
1190 }
1191
1192 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1193 type Error = PostgresSourceError;
1194
1195 #[throws(PostgresSourceError)]
1196 fn produce(&'r mut self) -> Option<$t> {
1197 let (ridx, cidx) = self.next_loc()?;
1198 let row = &self.rowbuf[ridx];
1199 let val = row.try_get(cidx)?;
1200 val
1201 }
1202 }
1203 )+
1204 };
1205}
1206
1207impl_produce!(
1208 i8,
1209 i16,
1210 i32,
1211 i64,
1212 f32,
1213 f64,
1214 Decimal,
1215 bool,
1216 &'r str,
1217 Vec<u8>,
1218 NaiveTime,
1219 Uuid,
1220 Value,
1221 IpInet,
1222 HashMap<String, Option<String>>,
1223 Vec<Option<bool>>,
1224 Vec<Option<String>>,
1225 Vec<Option<i16>>,
1226 Vec<Option<i32>>,
1227 Vec<Option<i64>>,
1228 Vec<Option<f32>>,
1229 Vec<Option<f64>>,
1230 Vec<Option<Decimal>>,
1231);
1232
1233impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1234 type Error = PostgresSourceError;
1235
1236 #[throws(PostgresSourceError)]
1237 fn produce(&'r mut self) -> DateTime<Utc> {
1238 let (ridx, cidx) = self.next_loc()?;
1239 let row = &self.rowbuf[ridx];
1240 let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1241 match val {
1242 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1243 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1244 postgres::types::Timestamp::Value(t) => t,
1245 }
1246 }
1247}
1248
1249impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1250 type Error = PostgresSourceError;
1251
1252 #[throws(PostgresSourceError)]
1253 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1254 let (ridx, cidx) = self.next_loc()?;
1255 let row = &self.rowbuf[ridx];
1256 let val = row.try_get(cidx)?;
1257 match val {
1258 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1259 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1260 Some(postgres::types::Timestamp::Value(t)) => t,
1261 None => None,
1262 }
1263 }
1264}
1265
1266impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1267 type Error = PostgresSourceError;
1268
1269 #[throws(PostgresSourceError)]
1270 fn produce(&'r mut self) -> NaiveDateTime {
1271 let (ridx, cidx) = self.next_loc()?;
1272 let row = &self.rowbuf[ridx];
1273 let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1274 match val {
1275 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1276 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1277 postgres::types::Timestamp::Value(t) => t,
1278 }
1279 }
1280}
1281
1282impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1283 type Error = PostgresSourceError;
1284
1285 #[throws(PostgresSourceError)]
1286 fn produce(&'r mut self) -> Option<NaiveDateTime> {
1287 let (ridx, cidx) = self.next_loc()?;
1288 let row = &self.rowbuf[ridx];
1289 let val = row.try_get(cidx)?;
1290 match val {
1291 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1292 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1293 Some(postgres::types::Timestamp::Value(t)) => t,
1294 None => None,
1295 }
1296 }
1297}
1298
1299impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1300 type Error = PostgresSourceError;
1301
1302 #[throws(PostgresSourceError)]
1303 fn produce(&'r mut self) -> NaiveDate {
1304 let (ridx, cidx) = self.next_loc()?;
1305 let row = &self.rowbuf[ridx];
1306 let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1307 match val {
1308 postgres::types::Date::PosInfinity => NaiveDate::MAX,
1309 postgres::types::Date::NegInfinity => NaiveDate::MIN,
1310 postgres::types::Date::Value(t) => t,
1311 }
1312 }
1313}
1314
1315impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1316 type Error = PostgresSourceError;
1317
1318 #[throws(PostgresSourceError)]
1319 fn produce(&'r mut self) -> Option<NaiveDate> {
1320 let (ridx, cidx) = self.next_loc()?;
1321 let row = &self.rowbuf[ridx];
1322 let val = row.try_get(cidx)?;
1323 match val {
1324 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1325 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1326 Some(postgres::types::Date::Value(t)) => t,
1327 None => None,
1328 }
1329 }
1330}
1331
1332impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1333where
1334 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1335 C::TlsConnect: Send,
1336 C::Stream: Send,
1337 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1338{
1339 type TypeSystem = PostgresTypeSystem;
1340 type Parser<'a> = PostgresSimpleSourceParser;
1341 type Error = PostgresSourceError;
1342
1343 #[throws(PostgresSourceError)]
1344 fn result_rows(&mut self) {
1345 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1346 }
1347
1348 #[throws(PostgresSourceError)]
1349 fn parser(&mut self) -> Self::Parser<'_> {
1350 let rows = self.conn.simple_query(self.query.as_str())?; PostgresSimpleSourceParser::new(rows, &self.schema)
1352 }
1353
1354 fn nrows(&self) -> usize {
1355 self.nrows
1356 }
1357
1358 fn ncols(&self) -> usize {
1359 self.ncols
1360 }
1361}
1362
1363pub struct PostgresSimpleSourceParser {
1364 rows: Vec<SimpleQueryMessage>,
1365 ncols: usize,
1366 current_col: usize,
1367 current_row: usize,
1368}
1369impl PostgresSimpleSourceParser {
1370 pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1371 Self {
1372 rows,
1373 ncols: schema.len(),
1374 current_row: 0,
1375 current_col: 0,
1376 }
1377 }
1378
1379 #[throws(PostgresSourceError)]
1380 fn next_loc(&mut self) -> (usize, usize) {
1381 let ret = (self.current_row, self.current_col);
1382 self.current_row += (self.current_col + 1) / self.ncols;
1383 self.current_col = (self.current_col + 1) % self.ncols;
1384 ret
1385 }
1386}
1387
1388impl PartitionParser<'_> for PostgresSimpleSourceParser {
1389 type TypeSystem = PostgresTypeSystem;
1390 type Error = PostgresSourceError;
1391
1392 #[throws(PostgresSourceError)]
1393 fn fetch_next(&mut self) -> (usize, bool) {
1394 self.current_row = 0;
1395 self.current_col = 0;
1396 if !self.rows.is_empty() {
1397 if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1398 self.current_row = 1;
1399 }
1400 }
1401
1402 (self.rows.len() - 1 - self.current_row, true) }
1404}
1405
1406macro_rules! impl_simple_produce_unimplemented {
1407 ($($t: ty,)+) => {
1408 $(
1409 impl<'r, 'a> Produce<'r, $t> for PostgresSimpleSourceParser {
1410 type Error = PostgresSourceError;
1411
1412 #[throws(PostgresSourceError)]
1413 fn produce(&'r mut self) -> $t {
1414 unimplemented!("not implemented!");
1415 }
1416 }
1417
1418 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1419 type Error = PostgresSourceError;
1420
1421 #[throws(PostgresSourceError)]
1422 fn produce(&'r mut self) -> Option<$t> {
1423 unimplemented!("not implemented!");
1424 }
1425 }
1426 )+
1427 };
1428}
1429
1430macro_rules! impl_simple_produce {
1431 ($($t: ty,)+) => {
1432 $(
1433 impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1434 type Error = PostgresSourceError;
1435
1436 #[throws(PostgresSourceError)]
1437 fn produce(&'r mut self) -> $t {
1438 let (ridx, cidx) = self.next_loc()?;
1439 let val = match &self.rows[ridx] {
1440 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1441 Some(s) => s
1442 .parse()
1443 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1444 None => throw!(anyhow!(
1445 "Cannot parse NULL in NOT NULL column."
1446 )),
1447 },
1448 SimpleQueryMessage::CommandComplete(c) => {
1449 panic!("get command: {}", c);
1450 }
1451 _ => {
1452 panic!("what?");
1453 }
1454 };
1455 val
1456 }
1457 }
1458
1459 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1460 type Error = PostgresSourceError;
1461
1462 #[throws(PostgresSourceError)]
1463 fn produce(&'r mut self) -> Option<$t> {
1464 let (ridx, cidx) = self.next_loc()?;
1465 let val = match &self.rows[ridx] {
1466 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1467 Some(s) => Some(
1468 s.parse()
1469 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1470 ),
1471 None => None,
1472 },
1473 SimpleQueryMessage::CommandComplete(c) => {
1474 panic!("get command: {}", c);
1475 }
1476 _ => {
1477 panic!("what?");
1478 }
1479 };
1480 val
1481 }
1482 }
1483 )+
1484 };
1485}
1486
1487impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
1488
1489impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1490 type Error = PostgresSourceError;
1491
1492 #[throws(PostgresSourceError)]
1493 fn produce(&'r mut self) -> bool {
1494 let (ridx, cidx) = self.next_loc()?;
1495 let val = match &self.rows[ridx] {
1496 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1497 Some(s) => match s {
1498 "t" => true,
1499 "f" => false,
1500 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1501 },
1502 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1503 },
1504 SimpleQueryMessage::CommandComplete(c) => {
1505 panic!("get command: {}", c);
1506 }
1507 _ => {
1508 panic!("what?");
1509 }
1510 };
1511 val
1512 }
1513}
1514
1515impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1516 type Error = PostgresSourceError;
1517
1518 #[throws(PostgresSourceError)]
1519 fn produce(&'r mut self) -> Option<bool> {
1520 let (ridx, cidx) = self.next_loc()?;
1521 let val = match &self.rows[ridx] {
1522 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1523 Some(s) => match s {
1524 "t" => Some(true),
1525 "f" => Some(false),
1526 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1527 },
1528 None => None,
1529 },
1530 SimpleQueryMessage::CommandComplete(c) => {
1531 panic!("get command: {}", c);
1532 }
1533 _ => {
1534 panic!("what?");
1535 }
1536 };
1537 val
1538 }
1539}
1540
1541impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1542 type Error = PostgresSourceError;
1543
1544 #[throws(PostgresSourceError)]
1545 fn produce(&'r mut self) -> Decimal {
1546 let (ridx, cidx) = self.next_loc()?;
1547 let val = match &self.rows[ridx] {
1548 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1549 Some("Infinity") => Decimal::MAX,
1550 Some("-Infinity") => Decimal::MIN,
1551 Some(s) => s
1552 .parse()
1553 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1554 None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1555 },
1556 SimpleQueryMessage::CommandComplete(c) => {
1557 panic!("get command: {}", c);
1558 }
1559 _ => {
1560 panic!("what?");
1561 }
1562 };
1563 val
1564 }
1565}
1566
1567impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1568 type Error = PostgresSourceError;
1569
1570 #[throws(PostgresSourceError)]
1571 fn produce(&'r mut self) -> Option<Decimal> {
1572 let (ridx, cidx) = self.next_loc()?;
1573 let val = match &self.rows[ridx] {
1574 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1575 Some("Infinity") => Some(Decimal::MAX),
1576 Some("-Infinity") => Some(Decimal::MIN),
1577 Some(s) => Some(
1578 s.parse()
1579 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1580 ),
1581 None => None,
1582 },
1583 SimpleQueryMessage::CommandComplete(c) => {
1584 panic!("get command: {}", c);
1585 }
1586 _ => {
1587 panic!("what?");
1588 }
1589 };
1590 val
1591 }
1592}
1593
1594impl_simple_produce_unimplemented!(
1595 Value,
1596 HashMap<String, Option<String>>,);
1597
1598impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1599 type Error = PostgresSourceError;
1600
1601 #[throws(PostgresSourceError)]
1602 fn produce(&'r mut self) -> &'r str {
1603 let (ridx, cidx) = self.next_loc()?;
1604 let val = match &self.rows[ridx] {
1605 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1606 Some(s) => s,
1607 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1608 },
1609 SimpleQueryMessage::CommandComplete(c) => {
1610 panic!("get command: {}", c);
1611 }
1612 _ => {
1613 panic!("what?");
1614 }
1615 };
1616 val
1617 }
1618}
1619
1620impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1621 type Error = PostgresSourceError;
1622
1623 #[throws(PostgresSourceError)]
1624 fn produce(&'r mut self) -> Option<&'r str> {
1625 let (ridx, cidx) = self.next_loc()?;
1626 let val = match &self.rows[ridx] {
1627 SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1628 SimpleQueryMessage::CommandComplete(c) => {
1629 panic!("get command: {}", c);
1630 }
1631 _ => {
1632 panic!("what?");
1633 }
1634 };
1635 val
1636 }
1637}
1638
1639impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1640 type Error = PostgresSourceError;
1641
1642 #[throws(PostgresSourceError)]
1643 fn produce(&'r mut self) -> Vec<u8> {
1644 let (ridx, cidx) = self.next_loc()?;
1645 let val = match &self.rows[ridx] {
1646 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1647 Some(s) => {
1648 let mut res = s.chars();
1649 res.next();
1650 res.next();
1651 decode(
1652 res.enumerate()
1653 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1654 .chars()
1655 .map(|c| c as u8)
1656 .collect::<Vec<u8>>(),
1657 )?
1658 }
1659 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1660 },
1661 SimpleQueryMessage::CommandComplete(c) => {
1662 panic!("get command: {}", c);
1663 }
1664 _ => {
1665 panic!("what?");
1666 }
1667 };
1668 val
1669 }
1670}
1671
1672impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1673 type Error = PostgresSourceError;
1674
1675 #[throws(PostgresSourceError)]
1676 fn produce(&'r mut self) -> Option<Vec<u8>> {
1677 let (ridx, cidx) = self.next_loc()?;
1678 let val = match &self.rows[ridx] {
1679 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1680 Some(s) => {
1681 let mut res = s.chars();
1682 res.next();
1683 res.next();
1684 Some(decode(
1685 res.enumerate()
1686 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1687 .chars()
1688 .map(|c| c as u8)
1689 .collect::<Vec<u8>>(),
1690 )?)
1691 }
1692 None => None,
1693 },
1694 SimpleQueryMessage::CommandComplete(c) => {
1695 panic!("get command: {}", c);
1696 }
1697 _ => {
1698 panic!("what?");
1699 }
1700 };
1701 val
1702 }
1703}
1704
1705fn rem_first_and_last(value: &str) -> &str {
1706 let mut chars = value.chars();
1707 chars.next();
1708 chars.next_back();
1709 chars.as_str()
1710}
1711
1712macro_rules! impl_simple_vec_produce {
1713 ($($t: ty,)+) => {
1714 $(
1715 impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1716 type Error = PostgresSourceError;
1717
1718 #[throws(PostgresSourceError)]
1719 fn produce(&'r mut self) -> Vec<Option<$t>> {
1720 let (ridx, cidx) = self.next_loc()?;
1721 let val = match &self.rows[ridx] {
1722 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1723 Some(s) => match s{
1724 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1725 "{}" => vec![],
1726 _ => rem_first_and_last(s).split(",").map(|v| {
1727 if v == "NULL" {
1728 Ok(None)
1729 } else {
1730 match v.parse() {
1731 Ok(v) => Ok(Some(v)),
1732 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1733 }
1734 }
1735 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1736 },
1737 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1738 },
1739 SimpleQueryMessage::CommandComplete(c) => {
1740 panic!("get command: {}", c);
1741 }
1742 _ => {
1743 panic!("what?");
1744 }
1745 };
1746 val
1747 }
1748 }
1749
1750 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1751 type Error = PostgresSourceError;
1752
1753 #[throws(PostgresSourceError)]
1754 fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1755 let (ridx, cidx) = self.next_loc()?;
1756 let val = match &self.rows[ridx] {
1757
1758 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1759 Some(s) => match s{
1760 "" => None,
1761 "{}" => Some(vec![]),
1762 _ => Some(rem_first_and_last(s).split(",").map(|v| {
1763 if v == "NULL" {
1764 Ok(None)
1765 } else {
1766 match v.parse() {
1767 Ok(v) => Ok(Some(v)),
1768 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1769 }
1770 }
1771 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1772 },
1773 None => None,
1774 },
1775
1776 SimpleQueryMessage::CommandComplete(c) => {
1777 panic!("get command: {}", c);
1778 }
1779 _ => {
1780 panic!("what?");
1781 }
1782 };
1783 val
1784 }
1785 }
1786 )+
1787 };
1788}
1789impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1790
1791impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1792 type Error = PostgresSourceError;
1793
1794 #[throws(PostgresSourceError)]
1795 fn produce(&'r mut self) -> Vec<Option<bool>> {
1796 let (ridx, cidx) = self.next_loc()?;
1797 let val = match &self.rows[ridx] {
1798 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1799 Some(s) => match s {
1800 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1801 "{}" => vec![],
1802 _ => rem_first_and_last(s)
1803 .split(',')
1804 .map(|token| match token {
1805 "NULL" => Ok(None),
1806 "t" => Ok(Some(true)),
1807 "f" => Ok(Some(false)),
1808 _ => {
1809 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1810 }
1811 })
1812 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1813 },
1814 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1815 },
1816 SimpleQueryMessage::CommandComplete(c) => {
1817 panic!("get command: {}", c);
1818 }
1819 _ => {
1820 panic!("what?");
1821 }
1822 };
1823 val
1824 }
1825}
1826
1827impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1828 type Error = PostgresSourceError;
1829
1830 #[throws(PostgresSourceError)]
1831 fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1832 let (ridx, cidx) = self.next_loc()?;
1833 let val = match &self.rows[ridx] {
1834 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1835 Some(s) => match s {
1836 "" => None,
1837 "{}" => Some(vec![]),
1838 _ => Some(
1839 rem_first_and_last(s)
1840 .split(',')
1841 .map(|token| match token {
1842 "NULL" => Ok(None),
1843 "t" => Ok(Some(true)),
1844 "f" => Ok(Some(false)),
1845 _ => {
1846 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1847 s.into()
1848 )))
1849 }
1850 })
1851 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1852 ),
1853 },
1854 None => None,
1855 },
1856 SimpleQueryMessage::CommandComplete(c) => {
1857 panic!("get command: {}", c);
1858 }
1859 _ => {
1860 panic!("what?");
1861 }
1862 };
1863 val
1864 }
1865}
1866
1867impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1868 type Error = PostgresSourceError;
1869
1870 #[throws(PostgresSourceError)]
1871 fn produce(&'r mut self) -> NaiveDate {
1872 let (ridx, cidx) = self.next_loc()?;
1873 let val = match &self.rows[ridx] {
1874 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1875 Some(s) => match s {
1876 "infinity" => NaiveDate::MAX,
1877 "-infinity" => NaiveDate::MIN,
1878 s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1879 ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1880 })?,
1881 },
1882 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1883 },
1884 SimpleQueryMessage::CommandComplete(c) => {
1885 panic!("get command: {}", c);
1886 }
1887 _ => {
1888 panic!("what?");
1889 }
1890 };
1891 val
1892 }
1893}
1894
1895impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1896 type Error = PostgresSourceError;
1897
1898 #[throws(PostgresSourceError)]
1899 fn produce(&'r mut self) -> Option<NaiveDate> {
1900 let (ridx, cidx) = self.next_loc()?;
1901 let val = match &self.rows[ridx] {
1902 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1903 Some(s) => match s {
1904 "infinity" => Some(NaiveDate::MAX),
1905 "-infinity" => Some(NaiveDate::MIN),
1906 s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1907 ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1908 })?),
1909 },
1910 None => None,
1911 },
1912 SimpleQueryMessage::CommandComplete(c) => {
1913 panic!("get command: {}", c);
1914 }
1915 _ => {
1916 panic!("what?");
1917 }
1918 };
1919 val
1920 }
1921}
1922
1923impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1924 type Error = PostgresSourceError;
1925
1926 #[throws(PostgresSourceError)]
1927 fn produce(&'r mut self) -> NaiveTime {
1928 let (ridx, cidx) = self.next_loc()?;
1929 let val = match &self.rows[ridx] {
1930 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1931 Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1932 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1933 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1934 },
1935 SimpleQueryMessage::CommandComplete(c) => {
1936 panic!("get command: {}", c);
1937 }
1938 _ => {
1939 panic!("what?");
1940 }
1941 };
1942 val
1943 }
1944}
1945
1946impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1947 type Error = PostgresSourceError;
1948
1949 #[throws(PostgresSourceError)]
1950 fn produce(&'r mut self) -> Option<NaiveTime> {
1951 let (ridx, cidx) = self.next_loc()?;
1952 let val = match &self.rows[ridx] {
1953 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1954 Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1955 ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1956 })?),
1957 None => None,
1958 },
1959 SimpleQueryMessage::CommandComplete(c) => {
1960 panic!("get command: {}", c);
1961 }
1962 _ => {
1963 panic!("what?");
1964 }
1965 };
1966 val
1967 }
1968}
1969
1970impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1971 type Error = PostgresSourceError;
1972
1973 #[throws(PostgresSourceError)]
1974 fn produce(&'r mut self) -> NaiveDateTime {
1975 let (ridx, cidx) = self.next_loc()?;
1976 let val =
1977 match &self.rows[ridx] {
1978 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1979 Some(s) => match s {
1980 "infinity" => NaiveDateTime::MAX,
1981 "-infinity" => NaiveDateTime::MIN,
1982 s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1983 |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1984 )?,
1985 },
1986 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1987 },
1988 SimpleQueryMessage::CommandComplete(c) => {
1989 panic!("get command: {}", c);
1990 }
1991 _ => {
1992 panic!("what?");
1993 }
1994 };
1995 val
1996 }
1997}
1998
1999impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2000 type Error = PostgresSourceError;
2001
2002 #[throws(PostgresSourceError)]
2003 fn produce(&'r mut self) -> Option<NaiveDateTime> {
2004 let (ridx, cidx) = self.next_loc()?;
2005 let val = match &self.rows[ridx] {
2006 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2007 Some(s) => match s {
2008 "infinity" => Some(NaiveDateTime::MAX),
2009 "-infinity" => Some(NaiveDateTime::MIN),
2010 s => Some(
2011 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2012 ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2013 })?,
2014 ),
2015 },
2016 None => None,
2017 },
2018 SimpleQueryMessage::CommandComplete(c) => {
2019 panic!("get command: {}", c);
2020 }
2021 _ => {
2022 panic!("what?");
2023 }
2024 };
2025 val
2026 }
2027}
2028
2029impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2030 type Error = PostgresSourceError;
2031
2032 #[throws(PostgresSourceError)]
2033 fn produce(&'r mut self) -> DateTime<Utc> {
2034 let (ridx, cidx) = self.next_loc()?;
2035 let val = match &self.rows[ridx] {
2036 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2037 Some("infinity") => DateTime::<Utc>::MAX_UTC,
2038 Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2039 Some(s) => {
2040 let time_string = format!("{}:00", s).to_owned();
2041 let slice: &str = &time_string[..];
2042 let time: DateTime<FixedOffset> =
2043 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2044
2045 time.with_timezone(&Utc)
2046 }
2047 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2048 },
2049 SimpleQueryMessage::CommandComplete(c) => {
2050 panic!("get command: {}", c);
2051 }
2052 _ => {
2053 panic!("what?");
2054 }
2055 };
2056 val
2057 }
2058}
2059
2060impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2061 type Error = PostgresSourceError;
2062
2063 #[throws(PostgresSourceError)]
2064 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2065 let (ridx, cidx) = self.next_loc()?;
2066 let val = match &self.rows[ridx] {
2067 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2068 Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2069 Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2070 Some(s) => {
2071 let time_string = format!("{}:00", s).to_owned();
2072 let slice: &str = &time_string[..];
2073 let time: DateTime<FixedOffset> =
2074 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2075
2076 Some(time.with_timezone(&Utc))
2077 }
2078 None => None,
2079 },
2080 SimpleQueryMessage::CommandComplete(c) => {
2081 panic!("get command: {}", c);
2082 }
2083 _ => {
2084 panic!("what?");
2085 }
2086 };
2087 val
2088 }
2089}