connectorx/sources/postgres/
mod.rs

1//! Source implementation for Postgres database, including the TLS support (client only).
2
3mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
11
12use crate::constants::DB_BUFFER_SIZE;
13use crate::{
14    data_order::DataOrder,
15    errors::ConnectorXError,
16    sources::{PartitionParser, Produce, Source, SourcePartition},
17    sql::{count_query, CXQuery},
18};
19use anyhow::anyhow;
20use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
21use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
22use fehler::{throw, throws};
23use hex::decode;
24use postgres::{
25    binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
26    fallible_iterator::FallibleIterator,
27    tls::{MakeTlsConnect, TlsConnect},
28    Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
29};
30use r2d2::{Pool, PooledConnection};
31use r2d2_postgres::PostgresConnectionManager;
32use rust_decimal::Decimal;
33use serde_json::{from_str, Value};
34use sqlparser::dialect::PostgreSqlDialect;
35use std::collections::HashMap;
36use std::convert::TryFrom;
37use std::marker::PhantomData;
38use uuid::Uuid;
39
40/// Protocol - Binary based bulk load
41pub enum BinaryProtocol {}
42
43/// Protocol - CSV based bulk load
44pub enum CSVProtocol {}
45
46/// Protocol - use Cursor
47pub enum CursorProtocol {}
48
49/// Protocol - use Simple Query
50pub enum SimpleProtocol {}
51
52type PgManager<C> = PostgresConnectionManager<C>;
53type PgConn<C> = PooledConnection<PgManager<C>>;
54
55// take a row and unwrap the interior field from column 0
56fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
57    let nrows: Option<R> = row.get(0);
58    nrows.expect("Could not parse int result from count_query")
59}
60
61#[throws(PostgresSourceError)]
62fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
63where
64    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
65    C::TlsConnect: Send,
66    C::Stream: Send,
67    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
68{
69    let dialect = PostgreSqlDialect {};
70
71    let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
72    let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
73    match col_type {
74        PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
75        PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
76        PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
77        _ => throw!(anyhow!(
78            "The result of the count query was not an int, aborting."
79        )),
80    }
81}
82
83pub struct PostgresSource<P, C>
84where
85    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
86    C::TlsConnect: Send,
87    C::Stream: Send,
88    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
89{
90    pool: Pool<PgManager<C>>,
91    origin_query: Option<String>,
92    queries: Vec<CXQuery<String>>,
93    names: Vec<String>,
94    schema: Vec<PostgresTypeSystem>,
95    pg_schema: Vec<postgres::types::Type>,
96    pre_execution_queries: Option<Vec<String>>,
97    _protocol: PhantomData<P>,
98}
99
100impl<P, C> PostgresSource<P, C>
101where
102    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
103    C::TlsConnect: Send,
104    C::Stream: Send,
105    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
106{
107    #[throws(PostgresSourceError)]
108    pub fn new(config: Config, tls: C, nconn: usize) -> Self {
109        let manager = PostgresConnectionManager::new(config, tls);
110        let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
111
112        Self {
113            pool,
114            origin_query: None,
115            queries: vec![],
116            names: vec![],
117            schema: vec![],
118            pg_schema: vec![],
119            pre_execution_queries: None,
120            _protocol: PhantomData,
121        }
122    }
123}
124
125impl<P, C> Source for PostgresSource<P, C>
126where
127    PostgresSourcePartition<P, C>:
128        SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
129    P: Send,
130    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
131    C::TlsConnect: Send,
132    C::Stream: Send,
133    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
134{
135    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
136    type Partition = PostgresSourcePartition<P, C>;
137    type TypeSystem = PostgresTypeSystem;
138    type Error = PostgresSourceError;
139
140    #[throws(PostgresSourceError)]
141    fn set_data_order(&mut self, data_order: DataOrder) {
142        if !matches!(data_order, DataOrder::RowMajor) {
143            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
144        }
145    }
146
147    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
148        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
149    }
150
151    fn set_origin_query(&mut self, query: Option<String>) {
152        self.origin_query = query;
153    }
154
155    fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
156        self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
157    }
158
159    #[throws(PostgresSourceError)]
160    fn fetch_metadata(&mut self) {
161        assert!(!self.queries.is_empty());
162
163        let mut conn = self.pool.get()?;
164        let first_query = &self.queries[0];
165
166        let stmt = conn.prepare(first_query.as_str())?;
167
168        let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
169            .columns()
170            .iter()
171            .map(|col| (col.name().to_string(), col.type_().clone()))
172            .unzip();
173
174        self.names = names;
175        self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
176        self.pg_schema = self
177            .schema
178            .iter()
179            .zip(pg_types.iter())
180            .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
181            .collect();
182    }
183
184    #[throws(PostgresSourceError)]
185    fn result_rows(&mut self) -> Option<usize> {
186        match &self.origin_query {
187            Some(q) => {
188                let cxq = CXQuery::Naked(q.clone());
189                let mut conn = self.pool.get()?;
190                let nrows = get_total_rows(&mut conn, &cxq)?;
191                Some(nrows)
192            }
193            None => None,
194        }
195    }
196
197    fn names(&self) -> Vec<String> {
198        self.names.clone()
199    }
200
201    fn schema(&self) -> Vec<Self::TypeSystem> {
202        self.schema.clone()
203    }
204
205    #[throws(PostgresSourceError)]
206    fn partition(self) -> Vec<Self::Partition> {
207        let mut ret = vec![];
208        for query in self.queries {
209            let mut conn = self.pool.get()?;
210
211            if let Some(pre_queries) = &self.pre_execution_queries {
212                for pre_query in pre_queries {
213                    conn.query(pre_query, &[])?;
214                }
215            }
216
217            ret.push(PostgresSourcePartition::<P, C>::new(
218                conn,
219                &query,
220                &self.schema,
221                &self.pg_schema,
222            ));
223        }
224        ret
225    }
226}
227
228pub struct PostgresSourcePartition<P, C>
229where
230    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
231    C::TlsConnect: Send,
232    C::Stream: Send,
233    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
234{
235    conn: PgConn<C>,
236    query: CXQuery<String>,
237    schema: Vec<PostgresTypeSystem>,
238    pg_schema: Vec<postgres::types::Type>,
239    nrows: usize,
240    ncols: usize,
241    _protocol: PhantomData<P>,
242}
243
244impl<P, C> PostgresSourcePartition<P, C>
245where
246    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
247    C::TlsConnect: Send,
248    C::Stream: Send,
249    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
250{
251    pub fn new(
252        conn: PgConn<C>,
253        query: &CXQuery<String>,
254        schema: &[PostgresTypeSystem],
255        pg_schema: &[postgres::types::Type],
256    ) -> Self {
257        Self {
258            conn,
259            query: query.clone(),
260            schema: schema.to_vec(),
261            pg_schema: pg_schema.to_vec(),
262            nrows: 0,
263            ncols: schema.len(),
264            _protocol: PhantomData,
265        }
266    }
267}
268
269impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
270where
271    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
272    C::TlsConnect: Send,
273    C::Stream: Send,
274    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
275{
276    type TypeSystem = PostgresTypeSystem;
277    type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
278    type Error = PostgresSourceError;
279
280    #[throws(PostgresSourceError)]
281    fn result_rows(&mut self) -> () {
282        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
283    }
284
285    #[throws(PostgresSourceError)]
286    fn parser(&mut self) -> Self::Parser<'_> {
287        let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
288        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
289        let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
290
291        PostgresBinarySourcePartitionParser::new(iter, &self.schema)
292    }
293
294    fn nrows(&self) -> usize {
295        self.nrows
296    }
297
298    fn ncols(&self) -> usize {
299        self.ncols
300    }
301}
302
303impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
304where
305    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
306    C::TlsConnect: Send,
307    C::Stream: Send,
308    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
309{
310    type TypeSystem = PostgresTypeSystem;
311    type Parser<'a> = PostgresCSVSourceParser<'a>;
312    type Error = PostgresSourceError;
313
314    #[throws(PostgresSourceError)]
315    fn result_rows(&mut self) {
316        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
317    }
318
319    #[throws(PostgresSourceError)]
320    fn parser(&mut self) -> Self::Parser<'_> {
321        let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
322        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
323        let iter = ReaderBuilder::new()
324            .has_headers(false)
325            .from_reader(reader)
326            .into_records();
327
328        PostgresCSVSourceParser::new(iter, &self.schema)
329    }
330
331    fn nrows(&self) -> usize {
332        self.nrows
333    }
334
335    fn ncols(&self) -> usize {
336        self.ncols
337    }
338}
339
340impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
341where
342    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
343    C::TlsConnect: Send,
344    C::Stream: Send,
345    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
346{
347    type TypeSystem = PostgresTypeSystem;
348    type Parser<'a> = PostgresRawSourceParser<'a>;
349    type Error = PostgresSourceError;
350
351    #[throws(PostgresSourceError)]
352    fn result_rows(&mut self) {
353        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
354    }
355
356    #[throws(PostgresSourceError)]
357    fn parser(&mut self) -> Self::Parser<'_> {
358        let iter = self
359            .conn
360            .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; // unless reading the data, it seems like issue the query is fast
361        PostgresRawSourceParser::new(iter, &self.schema)
362    }
363
364    fn nrows(&self) -> usize {
365        self.nrows
366    }
367
368    fn ncols(&self) -> usize {
369        self.ncols
370    }
371}
372pub struct PostgresBinarySourcePartitionParser<'a> {
373    iter: BinaryCopyOutIter<'a>,
374    rowbuf: Vec<BinaryCopyOutRow>,
375    ncols: usize,
376    current_col: usize,
377    current_row: usize,
378    is_finished: bool,
379}
380
381impl<'a> PostgresBinarySourcePartitionParser<'a> {
382    pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
383        Self {
384            iter,
385            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
386            ncols: schema.len(),
387            current_row: 0,
388            current_col: 0,
389            is_finished: false,
390        }
391    }
392
393    #[throws(PostgresSourceError)]
394    fn next_loc(&mut self) -> (usize, usize) {
395        let ret = (self.current_row, self.current_col);
396        self.current_row += (self.current_col + 1) / self.ncols;
397        self.current_col = (self.current_col + 1) % self.ncols;
398        ret
399    }
400}
401
402impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
403    type TypeSystem = PostgresTypeSystem;
404    type Error = PostgresSourceError;
405
406    #[throws(PostgresSourceError)]
407    fn fetch_next(&mut self) -> (usize, bool) {
408        assert!(self.current_col == 0);
409        let remaining_rows = self.rowbuf.len() - self.current_row;
410        if remaining_rows > 0 {
411            return (remaining_rows, self.is_finished);
412        } else if self.is_finished {
413            return (0, self.is_finished);
414        }
415
416        // clear the buffer
417        if !self.rowbuf.is_empty() {
418            self.rowbuf.drain(..);
419        }
420        for _ in 0..DB_BUFFER_SIZE {
421            match self.iter.next()? {
422                Some(row) => {
423                    self.rowbuf.push(row);
424                }
425                None => {
426                    self.is_finished = true;
427                    break;
428                }
429            }
430        }
431
432        // reset current cursor positions
433        self.current_row = 0;
434        self.current_col = 0;
435
436        (self.rowbuf.len(), self.is_finished)
437    }
438}
439
440macro_rules! impl_produce {
441    ($($t: ty,)+) => {
442        $(
443            impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
444                type Error = PostgresSourceError;
445
446                #[throws(PostgresSourceError)]
447                fn produce(&'r mut self) -> $t {
448                    let (ridx, cidx) = self.next_loc()?;
449                    let row = &self.rowbuf[ridx];
450                    let val = row.try_get(cidx)?;
451                    val
452                }
453            }
454
455            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
456                type Error = PostgresSourceError;
457
458                #[throws(PostgresSourceError)]
459                fn produce(&'r mut self) -> Option<$t> {
460                    let (ridx, cidx) = self.next_loc()?;
461                    let row = &self.rowbuf[ridx];
462                    let val = row.try_get(cidx)?;
463                    val
464                }
465            }
466        )+
467    };
468}
469
470impl_produce!(
471    i8,
472    i16,
473    i32,
474    i64,
475    f32,
476    f64,
477    Decimal,
478    bool,
479    &'r str,
480    Vec<u8>,
481    NaiveTime,
482    Uuid,
483    Value,
484    IpInet,
485    Vec<Option<bool>>,
486    Vec<Option<i16>>,
487    Vec<Option<i32>>,
488    Vec<Option<i64>>,
489    Vec<Option<Decimal>>,
490    Vec<Option<f32>>,
491    Vec<Option<f64>>,
492    Vec<Option<String>>,
493);
494
495impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
496    type Error = PostgresSourceError;
497
498    #[throws(PostgresSourceError)]
499    fn produce(&'r mut self) -> NaiveDateTime {
500        let (ridx, cidx) = self.next_loc()?;
501        let row = &self.rowbuf[ridx];
502        let val = row.try_get(cidx)?;
503        match val {
504            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
505            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
506            postgres::types::Timestamp::Value(t) => t,
507        }
508    }
509}
510
511impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
512    type Error = PostgresSourceError;
513
514    #[throws(PostgresSourceError)]
515    fn produce(&'r mut self) -> Option<NaiveDateTime> {
516        let (ridx, cidx) = self.next_loc()?;
517        let row = &self.rowbuf[ridx];
518        let val = row.try_get(cidx)?;
519        match val {
520            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
521            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
522            Some(postgres::types::Timestamp::Value(t)) => t,
523            None => None,
524        }
525    }
526}
527
528impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
529    type Error = PostgresSourceError;
530
531    #[throws(PostgresSourceError)]
532    fn produce(&'r mut self) -> DateTime<Utc> {
533        let (ridx, cidx) = self.next_loc()?;
534        let row = &self.rowbuf[ridx];
535        let val = row.try_get(cidx)?;
536        match val {
537            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
538            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
539            postgres::types::Timestamp::Value(t) => t,
540        }
541    }
542}
543
544impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
545    type Error = PostgresSourceError;
546
547    #[throws(PostgresSourceError)]
548    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
549        let (ridx, cidx) = self.next_loc()?;
550        let row = &self.rowbuf[ridx];
551        let val = row.try_get(cidx)?;
552        match val {
553            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
554            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
555            Some(postgres::types::Timestamp::Value(t)) => t,
556            None => None,
557        }
558    }
559}
560
561impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
562    type Error = PostgresSourceError;
563
564    #[throws(PostgresSourceError)]
565    fn produce(&'r mut self) -> NaiveDate {
566        let (ridx, cidx) = self.next_loc()?;
567        let row = &self.rowbuf[ridx];
568        let val = row.try_get(cidx)?;
569        match val {
570            postgres::types::Date::PosInfinity => NaiveDate::MAX,
571            postgres::types::Date::NegInfinity => NaiveDate::MIN,
572            postgres::types::Date::Value(t) => t,
573        }
574    }
575}
576
577impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
578    type Error = PostgresSourceError;
579
580    #[throws(PostgresSourceError)]
581    fn produce(&'r mut self) -> Option<NaiveDate> {
582        let (ridx, cidx) = self.next_loc()?;
583        let row = &self.rowbuf[ridx];
584        let val = row.try_get(cidx)?;
585        match val {
586            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
587            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
588            Some(postgres::types::Date::Value(t)) => t,
589            None => None,
590        }
591    }
592}
593
594impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
595    type Error = PostgresSourceError;
596    #[throws(PostgresSourceError)]
597    fn produce(&mut self) -> HashMap<String, Option<String>> {
598        unimplemented!("Please use `cursor` protocol for hstore type");
599    }
600}
601
602impl Produce<'_, Option<HashMap<String, Option<String>>>>
603    for PostgresBinarySourcePartitionParser<'_>
604{
605    type Error = PostgresSourceError;
606    #[throws(PostgresSourceError)]
607    fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
608        unimplemented!("Please use `cursor` protocol for hstore type");
609    }
610}
611
612pub struct PostgresCSVSourceParser<'a> {
613    iter: StringRecordsIntoIter<CopyOutReader<'a>>,
614    rowbuf: Vec<StringRecord>,
615    ncols: usize,
616    current_col: usize,
617    current_row: usize,
618    is_finished: bool,
619}
620
621impl<'a> PostgresCSVSourceParser<'a> {
622    pub fn new(
623        iter: StringRecordsIntoIter<CopyOutReader<'a>>,
624        schema: &[PostgresTypeSystem],
625    ) -> Self {
626        Self {
627            iter,
628            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
629            ncols: schema.len(),
630            current_row: 0,
631            current_col: 0,
632            is_finished: false,
633        }
634    }
635
636    #[throws(PostgresSourceError)]
637    fn next_loc(&mut self) -> (usize, usize) {
638        let ret = (self.current_row, self.current_col);
639        self.current_row += (self.current_col + 1) / self.ncols;
640        self.current_col = (self.current_col + 1) % self.ncols;
641        ret
642    }
643}
644
645impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
646    type Error = PostgresSourceError;
647    type TypeSystem = PostgresTypeSystem;
648
649    #[throws(PostgresSourceError)]
650    fn fetch_next(&mut self) -> (usize, bool) {
651        assert!(self.current_col == 0);
652        let remaining_rows = self.rowbuf.len() - self.current_row;
653        if remaining_rows > 0 {
654            return (remaining_rows, self.is_finished);
655        } else if self.is_finished {
656            return (0, self.is_finished);
657        }
658
659        if !self.rowbuf.is_empty() {
660            self.rowbuf.drain(..);
661        }
662        for _ in 0..DB_BUFFER_SIZE {
663            if let Some(row) = self.iter.next() {
664                self.rowbuf.push(row?);
665            } else {
666                self.is_finished = true;
667                break;
668            }
669        }
670        self.current_row = 0;
671        self.current_col = 0;
672        (self.rowbuf.len(), self.is_finished)
673    }
674}
675
676macro_rules! impl_csv_produce {
677    ($($t: ty,)+) => {
678        $(
679            impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
680                type Error = PostgresSourceError;
681
682                #[throws(PostgresSourceError)]
683                fn produce(&'r mut self) -> $t {
684                    let (ridx, cidx) = self.next_loc()?;
685                    self.rowbuf[ridx][cidx].parse().map_err(|_| {
686                        ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
687                    })?
688                }
689            }
690
691            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
692                type Error = PostgresSourceError;
693
694                #[throws(PostgresSourceError)]
695                fn produce(&'r mut self) -> Option<$t> {
696                    let (ridx, cidx) = self.next_loc()?;
697                    match &self.rowbuf[ridx][cidx][..] {
698                        "" => None,
699                        v => Some(v.parse().map_err(|_| {
700                            ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
701                        })?),
702                    }
703                }
704            }
705        )+
706    };
707}
708
709impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
710
711macro_rules! impl_csv_vec_produce {
712    ($($t: ty,)+) => {
713        $(
714            impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
715                type Error = PostgresSourceError;
716
717                #[throws(PostgresSourceError)]
718                fn produce(&mut self) -> Vec<Option<$t>> {
719                    let (ridx, cidx) = self.next_loc()?;
720                    let s = &self.rowbuf[ridx][cidx][..];
721                    match s {
722                        "{}" => vec![],
723                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
724                        s => s[1..s.len() - 1]
725                            .split(",")
726                            .map(|v| {
727                                if v == "NULL" {
728                                    Ok(None)
729                                } else {
730                                    match v.parse() {
731                                        Ok(v) => Ok(Some(v)),
732                                        Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
733                                    }
734                                }
735                            })
736                            .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
737                    }
738                }
739            }
740
741            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
742                type Error = PostgresSourceError;
743
744                #[throws(PostgresSourceError)]
745                fn produce(&mut self) -> Option<Vec<Option<$t>>> {
746                    let (ridx, cidx) = self.next_loc()?;
747                    let s = &self.rowbuf[ridx][cidx][..];
748                    match s {
749                        "" => None,
750                        "{}" => Some(vec![]),
751                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
752                        s => Some(
753                            s[1..s.len() - 1]
754                                .split(",")
755                                .map(|v| {
756                                    if v == "NULL" {
757                                        Ok(None)
758                                    } else {
759                                        match v.parse() {
760                                            Ok(v) => Ok(Some(v)),
761                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
762                                        }
763                                    }
764                                })
765                                .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
766                        ),
767                    }
768                }
769            }
770        )+
771    };
772}
773
774impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
775
776impl Produce<'_, HashMap<String, Option<String>>> for PostgresCSVSourceParser<'_> {
777    type Error = PostgresSourceError;
778    #[throws(PostgresSourceError)]
779    fn produce(&mut self) -> HashMap<String, Option<String>> {
780        unimplemented!("Please use `cursor` protocol for hstore type");
781    }
782}
783
784impl Produce<'_, Option<HashMap<String, Option<String>>>> for PostgresCSVSourceParser<'_> {
785    type Error = PostgresSourceError;
786    #[throws(PostgresSourceError)]
787    fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
788        unimplemented!("Please use `cursor` protocol for hstore type");
789    }
790}
791
792impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
793    type Error = PostgresSourceError;
794
795    #[throws(PostgresSourceError)]
796    fn produce(&mut self) -> bool {
797        let (ridx, cidx) = self.next_loc()?;
798        let ret = match &self.rowbuf[ridx][cidx][..] {
799            "t" => true,
800            "f" => false,
801            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
802                self.rowbuf[ridx][cidx].into()
803            ))),
804        };
805        ret
806    }
807}
808
809impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
810    type Error = PostgresSourceError;
811
812    #[throws(PostgresSourceError)]
813    fn produce(&mut self) -> Option<bool> {
814        let (ridx, cidx) = self.next_loc()?;
815        let ret = match &self.rowbuf[ridx][cidx][..] {
816            "" => None,
817            "t" => Some(true),
818            "f" => Some(false),
819            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
820                self.rowbuf[ridx][cidx].into()
821            ))),
822        };
823        ret
824    }
825}
826
827impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
828    type Error = PostgresSourceError;
829
830    #[throws(PostgresSourceError)]
831    fn produce(&mut self) -> Vec<Option<bool>> {
832        let (ridx, cidx) = self.next_loc()?;
833        let s = &self.rowbuf[ridx][cidx][..];
834        match s {
835            "{}" => vec![],
836            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
837            s => s[1..s.len() - 1]
838                .split(',')
839                .map(|v| match v {
840                    "NULL" => Ok(None),
841                    "t" => Ok(Some(true)),
842                    "f" => Ok(Some(false)),
843                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
844                })
845                .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
846        }
847    }
848}
849
850impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
851    type Error = PostgresSourceError;
852
853    #[throws(PostgresSourceError)]
854    fn produce(&mut self) -> Option<Vec<Option<bool>>> {
855        let (ridx, cidx) = self.next_loc()?;
856        let s = &self.rowbuf[ridx][cidx][..];
857        match s {
858            "" => None,
859            "{}" => Some(vec![]),
860            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
861            s => Some(
862                s[1..s.len() - 1]
863                    .split(',')
864                    .map(|v| match v {
865                        "NULL" => Ok(None),
866                        "t" => Ok(Some(true)),
867                        "f" => Ok(Some(false)),
868                        _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
869                    })
870                    .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
871            ),
872        }
873    }
874}
875
876impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
877    type Error = PostgresSourceError;
878
879    #[throws(PostgresSourceError)]
880    fn produce(&'r mut self) -> Decimal {
881        let (ridx, cidx) = self.next_loc()?;
882        match &self.rowbuf[ridx][cidx][..] {
883            "Infinity" => Decimal::MAX,
884            "-Infinity" => Decimal::MIN,
885            v => v
886                .parse()
887                .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
888        }
889    }
890}
891
892impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
893    type Error = PostgresSourceError;
894
895    #[throws(PostgresSourceError)]
896    fn produce(&'r mut self) -> Option<Decimal> {
897        let (ridx, cidx) = self.next_loc()?;
898        match &self.rowbuf[ridx][cidx][..] {
899            "" => None,
900            "Infinity" => Some(Decimal::MAX),
901            "-Infinity" => Some(Decimal::MIN),
902            v => Some(
903                v.parse()
904                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
905            ),
906        }
907    }
908}
909
910impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
911    type Error = PostgresSourceError;
912
913    #[throws(PostgresSourceError)]
914    fn produce(&mut self) -> DateTime<Utc> {
915        let (ridx, cidx) = self.next_loc()?;
916        match &self.rowbuf[ridx][cidx][..] {
917            "infinity" => DateTime::<Utc>::MAX_UTC,
918            "-infinity" => DateTime::<Utc>::MIN_UTC,
919            // postgres csv return example: 1970-01-01 00:00:01+00
920            v => format!("{}:00", v)
921                .parse()
922                .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
923        }
924    }
925}
926
927impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
928    type Error = PostgresSourceError;
929
930    #[throws(PostgresSourceError)]
931    fn produce(&mut self) -> Option<DateTime<Utc>> {
932        let (ridx, cidx) = self.next_loc()?;
933        match &self.rowbuf[ridx][cidx][..] {
934            "" => None,
935            "infinity" => Some(DateTime::<Utc>::MAX_UTC),
936            "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
937            v => {
938                // postgres csv return example: 1970-01-01 00:00:01+00
939                Some(format!("{}:00", v).parse().map_err(|_| {
940                    ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
941                })?)
942            }
943        }
944    }
945}
946
947impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
948    type Error = PostgresSourceError;
949
950    #[throws(PostgresSourceError)]
951    fn produce(&mut self) -> NaiveDate {
952        let (ridx, cidx) = self.next_loc()?;
953        match &self.rowbuf[ridx][cidx][..] {
954            "infinity" => NaiveDate::MAX,
955            "-infinity" => NaiveDate::MIN,
956            v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
957                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
958        }
959    }
960}
961
962impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
963    type Error = PostgresSourceError;
964
965    #[throws(PostgresSourceError)]
966    fn produce(&mut self) -> Option<NaiveDate> {
967        let (ridx, cidx) = self.next_loc()?;
968        match &self.rowbuf[ridx][cidx][..] {
969            "" => None,
970            "infinity" => Some(NaiveDate::MAX),
971            "-infinity" => Some(NaiveDate::MIN),
972            v => Some(
973                NaiveDate::parse_from_str(v, "%Y-%m-%d")
974                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
975            ),
976        }
977    }
978}
979
980impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
981    type Error = PostgresSourceError;
982
983    #[throws(PostgresSourceError)]
984    fn produce(&mut self) -> NaiveDateTime {
985        let (ridx, cidx) = self.next_loc()?;
986        match &self.rowbuf[ridx][cidx] {
987            "infinity" => NaiveDateTime::MAX,
988            "-infinity" => NaiveDateTime::MIN,
989            v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
990                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
991        }
992    }
993}
994
995impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
996    type Error = PostgresSourceError;
997
998    #[throws(PostgresSourceError)]
999    fn produce(&mut self) -> Option<NaiveDateTime> {
1000        let (ridx, cidx) = self.next_loc()?;
1001        match &self.rowbuf[ridx][cidx][..] {
1002            "" => None,
1003            "infinity" => Some(NaiveDateTime::MAX),
1004            "-infinity" => Some(NaiveDateTime::MIN),
1005            v => Some(
1006                NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1007                    ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1008                })?,
1009            ),
1010        }
1011    }
1012}
1013
1014impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1015    type Error = PostgresSourceError;
1016
1017    #[throws(PostgresSourceError)]
1018    fn produce(&mut self) -> NaiveTime {
1019        let (ridx, cidx) = self.next_loc()?;
1020        NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1021            ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1022        })?
1023    }
1024}
1025
1026impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1027    type Error = PostgresSourceError;
1028
1029    #[throws(PostgresSourceError)]
1030    fn produce(&mut self) -> Option<NaiveTime> {
1031        let (ridx, cidx) = self.next_loc()?;
1032        match &self.rowbuf[ridx][cidx][..] {
1033            "" => None,
1034            v => Some(
1035                NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1036                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1037            ),
1038        }
1039    }
1040}
1041
1042impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1043    type Error = PostgresSourceError;
1044
1045    #[throws(PostgresSourceError)]
1046    fn produce(&'r mut self) -> &'r str {
1047        let (ridx, cidx) = self.next_loc()?;
1048        &self.rowbuf[ridx][cidx]
1049    }
1050}
1051
1052impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1053    type Error = PostgresSourceError;
1054
1055    #[throws(PostgresSourceError)]
1056    fn produce(&'r mut self) -> Option<&'r str> {
1057        let (ridx, cidx) = self.next_loc()?;
1058        match &self.rowbuf[ridx][cidx][..] {
1059            "" => None,
1060            v => Some(v),
1061        }
1062    }
1063}
1064
1065impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1066    type Error = PostgresSourceError;
1067
1068    #[throws(PostgresSourceError)]
1069    fn produce(&'r mut self) -> Vec<u8> {
1070        let (ridx, cidx) = self.next_loc()?;
1071        decode(&self.rowbuf[ridx][cidx][2..])? // escape \x in the beginning
1072    }
1073}
1074
1075impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1076    type Error = PostgresSourceError;
1077
1078    #[throws(PostgresSourceError)]
1079    fn produce(&'r mut self) -> Option<Vec<u8>> {
1080        let (ridx, cidx) = self.next_loc()?;
1081        match &self.rowbuf[ridx][cidx] {
1082            // escape \x in the beginning, empty if None
1083            "" => None,
1084            v => Some(decode(&v[2..])?),
1085        }
1086    }
1087}
1088
1089impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1090    type Error = PostgresSourceError;
1091
1092    #[throws(PostgresSourceError)]
1093    fn produce(&'r mut self) -> Value {
1094        let (ridx, cidx) = self.next_loc()?;
1095        let v = &self.rowbuf[ridx][cidx];
1096        from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1097    }
1098}
1099
1100impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1101    type Error = PostgresSourceError;
1102
1103    #[throws(PostgresSourceError)]
1104    fn produce(&'r mut self) -> Option<Value> {
1105        let (ridx, cidx) = self.next_loc()?;
1106
1107        match &self.rowbuf[ridx][cidx][..] {
1108            "" => None,
1109            v => {
1110                from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1111            }
1112        }
1113    }
1114}
1115
1116pub struct PostgresRawSourceParser<'a> {
1117    iter: RowIter<'a>,
1118    rowbuf: Vec<Row>,
1119    ncols: usize,
1120    current_col: usize,
1121    current_row: usize,
1122    is_finished: bool,
1123}
1124
1125impl<'a> PostgresRawSourceParser<'a> {
1126    pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1127        Self {
1128            iter,
1129            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1130            ncols: schema.len(),
1131            current_row: 0,
1132            current_col: 0,
1133            is_finished: false,
1134        }
1135    }
1136
1137    #[throws(PostgresSourceError)]
1138    fn next_loc(&mut self) -> (usize, usize) {
1139        let ret = (self.current_row, self.current_col);
1140        self.current_row += (self.current_col + 1) / self.ncols;
1141        self.current_col = (self.current_col + 1) % self.ncols;
1142        ret
1143    }
1144}
1145
1146impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1147    type TypeSystem = PostgresTypeSystem;
1148    type Error = PostgresSourceError;
1149
1150    #[throws(PostgresSourceError)]
1151    fn fetch_next(&mut self) -> (usize, bool) {
1152        assert!(self.current_col == 0);
1153        let remaining_rows = self.rowbuf.len() - self.current_row;
1154        if remaining_rows > 0 {
1155            return (remaining_rows, self.is_finished);
1156        } else if self.is_finished {
1157            return (0, self.is_finished);
1158        }
1159
1160        if !self.rowbuf.is_empty() {
1161            self.rowbuf.drain(..);
1162        }
1163        for _ in 0..DB_BUFFER_SIZE {
1164            if let Some(row) = self.iter.next()? {
1165                self.rowbuf.push(row);
1166            } else {
1167                self.is_finished = true;
1168                break;
1169            }
1170        }
1171        self.current_row = 0;
1172        self.current_col = 0;
1173        (self.rowbuf.len(), self.is_finished)
1174    }
1175}
1176
1177macro_rules! impl_produce {
1178    ($($t: ty,)+) => {
1179        $(
1180            impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1181                type Error = PostgresSourceError;
1182
1183                #[throws(PostgresSourceError)]
1184                fn produce(&'r mut self) -> $t {
1185                    let (ridx, cidx) = self.next_loc()?;
1186                    let row = &self.rowbuf[ridx];
1187                    let val = row.try_get(cidx)?;
1188                    val
1189                }
1190            }
1191
1192            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1193                type Error = PostgresSourceError;
1194
1195                #[throws(PostgresSourceError)]
1196                fn produce(&'r mut self) -> Option<$t> {
1197                    let (ridx, cidx) = self.next_loc()?;
1198                    let row = &self.rowbuf[ridx];
1199                    let val = row.try_get(cidx)?;
1200                    val
1201                }
1202            }
1203        )+
1204    };
1205}
1206
1207impl_produce!(
1208    i8,
1209    i16,
1210    i32,
1211    i64,
1212    f32,
1213    f64,
1214    Decimal,
1215    bool,
1216    &'r str,
1217    Vec<u8>,
1218    NaiveTime,
1219    Uuid,
1220    Value,
1221    IpInet,
1222    HashMap<String, Option<String>>,
1223    Vec<Option<bool>>,
1224    Vec<Option<String>>,
1225    Vec<Option<i16>>,
1226    Vec<Option<i32>>,
1227    Vec<Option<i64>>,
1228    Vec<Option<f32>>,
1229    Vec<Option<f64>>,
1230    Vec<Option<Decimal>>,
1231);
1232
1233impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1234    type Error = PostgresSourceError;
1235
1236    #[throws(PostgresSourceError)]
1237    fn produce(&'r mut self) -> DateTime<Utc> {
1238        let (ridx, cidx) = self.next_loc()?;
1239        let row = &self.rowbuf[ridx];
1240        let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1241        match val {
1242            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1243            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1244            postgres::types::Timestamp::Value(t) => t,
1245        }
1246    }
1247}
1248
1249impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1250    type Error = PostgresSourceError;
1251
1252    #[throws(PostgresSourceError)]
1253    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1254        let (ridx, cidx) = self.next_loc()?;
1255        let row = &self.rowbuf[ridx];
1256        let val = row.try_get(cidx)?;
1257        match val {
1258            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1259            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1260            Some(postgres::types::Timestamp::Value(t)) => t,
1261            None => None,
1262        }
1263    }
1264}
1265
1266impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1267    type Error = PostgresSourceError;
1268
1269    #[throws(PostgresSourceError)]
1270    fn produce(&'r mut self) -> NaiveDateTime {
1271        let (ridx, cidx) = self.next_loc()?;
1272        let row = &self.rowbuf[ridx];
1273        let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1274        match val {
1275            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1276            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1277            postgres::types::Timestamp::Value(t) => t,
1278        }
1279    }
1280}
1281
1282impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1283    type Error = PostgresSourceError;
1284
1285    #[throws(PostgresSourceError)]
1286    fn produce(&'r mut self) -> Option<NaiveDateTime> {
1287        let (ridx, cidx) = self.next_loc()?;
1288        let row = &self.rowbuf[ridx];
1289        let val = row.try_get(cidx)?;
1290        match val {
1291            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1292            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1293            Some(postgres::types::Timestamp::Value(t)) => t,
1294            None => None,
1295        }
1296    }
1297}
1298
1299impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1300    type Error = PostgresSourceError;
1301
1302    #[throws(PostgresSourceError)]
1303    fn produce(&'r mut self) -> NaiveDate {
1304        let (ridx, cidx) = self.next_loc()?;
1305        let row = &self.rowbuf[ridx];
1306        let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1307        match val {
1308            postgres::types::Date::PosInfinity => NaiveDate::MAX,
1309            postgres::types::Date::NegInfinity => NaiveDate::MIN,
1310            postgres::types::Date::Value(t) => t,
1311        }
1312    }
1313}
1314
1315impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1316    type Error = PostgresSourceError;
1317
1318    #[throws(PostgresSourceError)]
1319    fn produce(&'r mut self) -> Option<NaiveDate> {
1320        let (ridx, cidx) = self.next_loc()?;
1321        let row = &self.rowbuf[ridx];
1322        let val = row.try_get(cidx)?;
1323        match val {
1324            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1325            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1326            Some(postgres::types::Date::Value(t)) => t,
1327            None => None,
1328        }
1329    }
1330}
1331
1332impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1333where
1334    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1335    C::TlsConnect: Send,
1336    C::Stream: Send,
1337    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1338{
1339    type TypeSystem = PostgresTypeSystem;
1340    type Parser<'a> = PostgresSimpleSourceParser;
1341    type Error = PostgresSourceError;
1342
1343    #[throws(PostgresSourceError)]
1344    fn result_rows(&mut self) {
1345        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1346    }
1347
1348    #[throws(PostgresSourceError)]
1349    fn parser(&mut self) -> Self::Parser<'_> {
1350        let rows = self.conn.simple_query(self.query.as_str())?; // unless reading the data, it seems like issue the query is fast
1351        PostgresSimpleSourceParser::new(rows, &self.schema)
1352    }
1353
1354    fn nrows(&self) -> usize {
1355        self.nrows
1356    }
1357
1358    fn ncols(&self) -> usize {
1359        self.ncols
1360    }
1361}
1362
1363pub struct PostgresSimpleSourceParser {
1364    rows: Vec<SimpleQueryMessage>,
1365    ncols: usize,
1366    current_col: usize,
1367    current_row: usize,
1368}
1369impl PostgresSimpleSourceParser {
1370    pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1371        Self {
1372            rows,
1373            ncols: schema.len(),
1374            current_row: 0,
1375            current_col: 0,
1376        }
1377    }
1378
1379    #[throws(PostgresSourceError)]
1380    fn next_loc(&mut self) -> (usize, usize) {
1381        let ret = (self.current_row, self.current_col);
1382        self.current_row += (self.current_col + 1) / self.ncols;
1383        self.current_col = (self.current_col + 1) % self.ncols;
1384        ret
1385    }
1386}
1387
1388impl PartitionParser<'_> for PostgresSimpleSourceParser {
1389    type TypeSystem = PostgresTypeSystem;
1390    type Error = PostgresSourceError;
1391
1392    #[throws(PostgresSourceError)]
1393    fn fetch_next(&mut self) -> (usize, bool) {
1394        self.current_row = 0;
1395        self.current_col = 0;
1396        if !self.rows.is_empty() {
1397            if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1398                self.current_row = 1;
1399            }
1400        }
1401
1402        (self.rows.len() - 1 - self.current_row, true) // last message is command complete
1403    }
1404}
1405
1406macro_rules! impl_simple_produce_unimplemented {
1407    ($($t: ty,)+) => {
1408        $(
1409            impl<'r, 'a> Produce<'r, $t> for PostgresSimpleSourceParser {
1410                type Error = PostgresSourceError;
1411
1412                #[throws(PostgresSourceError)]
1413                fn produce(&'r mut self) -> $t {
1414                   unimplemented!("not implemented!");
1415                }
1416            }
1417
1418            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1419                type Error = PostgresSourceError;
1420
1421                #[throws(PostgresSourceError)]
1422                fn produce(&'r mut self) -> Option<$t> {
1423                   unimplemented!("not implemented!");
1424                }
1425            }
1426        )+
1427    };
1428}
1429
1430macro_rules! impl_simple_produce {
1431    ($($t: ty,)+) => {
1432        $(
1433            impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1434                type Error = PostgresSourceError;
1435
1436                #[throws(PostgresSourceError)]
1437                fn produce(&'r mut self) -> $t {
1438                    let (ridx, cidx) = self.next_loc()?;
1439                    let val = match &self.rows[ridx] {
1440                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1441                            Some(s) => s
1442                                .parse()
1443                                .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1444                            None => throw!(anyhow!(
1445                                "Cannot parse NULL in NOT NULL column."
1446                            )),
1447                        },
1448                        SimpleQueryMessage::CommandComplete(c) => {
1449                            panic!("get command: {}", c);
1450                        }
1451                        _ => {
1452                            panic!("what?");
1453                        }
1454                    };
1455                    val
1456                }
1457            }
1458
1459            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1460                type Error = PostgresSourceError;
1461
1462                #[throws(PostgresSourceError)]
1463                fn produce(&'r mut self) -> Option<$t> {
1464                    let (ridx, cidx) = self.next_loc()?;
1465                    let val = match &self.rows[ridx] {
1466                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1467                            Some(s) => Some(
1468                                s.parse()
1469                                    .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1470                            ),
1471                            None => None,
1472                        },
1473                        SimpleQueryMessage::CommandComplete(c) => {
1474                            panic!("get command: {}", c);
1475                        }
1476                        _ => {
1477                            panic!("what?");
1478                        }
1479                    };
1480                    val
1481                }
1482            }
1483        )+
1484    };
1485}
1486
1487impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
1488
1489impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1490    type Error = PostgresSourceError;
1491
1492    #[throws(PostgresSourceError)]
1493    fn produce(&'r mut self) -> bool {
1494        let (ridx, cidx) = self.next_loc()?;
1495        let val = match &self.rows[ridx] {
1496            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1497                Some(s) => match s {
1498                    "t" => true,
1499                    "f" => false,
1500                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1501                },
1502                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1503            },
1504            SimpleQueryMessage::CommandComplete(c) => {
1505                panic!("get command: {}", c);
1506            }
1507            _ => {
1508                panic!("what?");
1509            }
1510        };
1511        val
1512    }
1513}
1514
1515impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1516    type Error = PostgresSourceError;
1517
1518    #[throws(PostgresSourceError)]
1519    fn produce(&'r mut self) -> Option<bool> {
1520        let (ridx, cidx) = self.next_loc()?;
1521        let val = match &self.rows[ridx] {
1522            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1523                Some(s) => match s {
1524                    "t" => Some(true),
1525                    "f" => Some(false),
1526                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1527                },
1528                None => None,
1529            },
1530            SimpleQueryMessage::CommandComplete(c) => {
1531                panic!("get command: {}", c);
1532            }
1533            _ => {
1534                panic!("what?");
1535            }
1536        };
1537        val
1538    }
1539}
1540
1541impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1542    type Error = PostgresSourceError;
1543
1544    #[throws(PostgresSourceError)]
1545    fn produce(&'r mut self) -> Decimal {
1546        let (ridx, cidx) = self.next_loc()?;
1547        let val = match &self.rows[ridx] {
1548            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1549                Some("Infinity") => Decimal::MAX,
1550                Some("-Infinity") => Decimal::MIN,
1551                Some(s) => s
1552                    .parse()
1553                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1554                None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1555            },
1556            SimpleQueryMessage::CommandComplete(c) => {
1557                panic!("get command: {}", c);
1558            }
1559            _ => {
1560                panic!("what?");
1561            }
1562        };
1563        val
1564    }
1565}
1566
1567impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1568    type Error = PostgresSourceError;
1569
1570    #[throws(PostgresSourceError)]
1571    fn produce(&'r mut self) -> Option<Decimal> {
1572        let (ridx, cidx) = self.next_loc()?;
1573        let val = match &self.rows[ridx] {
1574            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1575                Some("Infinity") => Some(Decimal::MAX),
1576                Some("-Infinity") => Some(Decimal::MIN),
1577                Some(s) => Some(
1578                    s.parse()
1579                        .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1580                ),
1581                None => None,
1582            },
1583            SimpleQueryMessage::CommandComplete(c) => {
1584                panic!("get command: {}", c);
1585            }
1586            _ => {
1587                panic!("what?");
1588            }
1589        };
1590        val
1591    }
1592}
1593
1594impl_simple_produce_unimplemented!(
1595    Value,
1596    HashMap<String, Option<String>>,);
1597
1598impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1599    type Error = PostgresSourceError;
1600
1601    #[throws(PostgresSourceError)]
1602    fn produce(&'r mut self) -> &'r str {
1603        let (ridx, cidx) = self.next_loc()?;
1604        let val = match &self.rows[ridx] {
1605            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1606                Some(s) => s,
1607                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1608            },
1609            SimpleQueryMessage::CommandComplete(c) => {
1610                panic!("get command: {}", c);
1611            }
1612            _ => {
1613                panic!("what?");
1614            }
1615        };
1616        val
1617    }
1618}
1619
1620impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1621    type Error = PostgresSourceError;
1622
1623    #[throws(PostgresSourceError)]
1624    fn produce(&'r mut self) -> Option<&'r str> {
1625        let (ridx, cidx) = self.next_loc()?;
1626        let val = match &self.rows[ridx] {
1627            SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1628            SimpleQueryMessage::CommandComplete(c) => {
1629                panic!("get command: {}", c);
1630            }
1631            _ => {
1632                panic!("what?");
1633            }
1634        };
1635        val
1636    }
1637}
1638
1639impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1640    type Error = PostgresSourceError;
1641
1642    #[throws(PostgresSourceError)]
1643    fn produce(&'r mut self) -> Vec<u8> {
1644        let (ridx, cidx) = self.next_loc()?;
1645        let val = match &self.rows[ridx] {
1646            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1647                Some(s) => {
1648                    let mut res = s.chars();
1649                    res.next();
1650                    res.next();
1651                    decode(
1652                        res.enumerate()
1653                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1654                            .chars()
1655                            .map(|c| c as u8)
1656                            .collect::<Vec<u8>>(),
1657                    )?
1658                }
1659                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1660            },
1661            SimpleQueryMessage::CommandComplete(c) => {
1662                panic!("get command: {}", c);
1663            }
1664            _ => {
1665                panic!("what?");
1666            }
1667        };
1668        val
1669    }
1670}
1671
1672impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1673    type Error = PostgresSourceError;
1674
1675    #[throws(PostgresSourceError)]
1676    fn produce(&'r mut self) -> Option<Vec<u8>> {
1677        let (ridx, cidx) = self.next_loc()?;
1678        let val = match &self.rows[ridx] {
1679            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1680                Some(s) => {
1681                    let mut res = s.chars();
1682                    res.next();
1683                    res.next();
1684                    Some(decode(
1685                        res.enumerate()
1686                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1687                            .chars()
1688                            .map(|c| c as u8)
1689                            .collect::<Vec<u8>>(),
1690                    )?)
1691                }
1692                None => None,
1693            },
1694            SimpleQueryMessage::CommandComplete(c) => {
1695                panic!("get command: {}", c);
1696            }
1697            _ => {
1698                panic!("what?");
1699            }
1700        };
1701        val
1702    }
1703}
1704
1705fn rem_first_and_last(value: &str) -> &str {
1706    let mut chars = value.chars();
1707    chars.next();
1708    chars.next_back();
1709    chars.as_str()
1710}
1711
1712macro_rules! impl_simple_vec_produce {
1713    ($($t: ty,)+) => {
1714        $(
1715            impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1716                type Error = PostgresSourceError;
1717
1718                #[throws(PostgresSourceError)]
1719                fn produce(&'r mut self) -> Vec<Option<$t>> {
1720                    let (ridx, cidx) = self.next_loc()?;
1721                    let val = match &self.rows[ridx] {
1722                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1723                            Some(s) => match s{
1724                                "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1725                                "{}" => vec![],
1726                                _ => rem_first_and_last(s).split(",").map(|v| {
1727                                    if v == "NULL" {
1728                                        Ok(None)
1729                                    } else {
1730                                        match v.parse() {
1731                                            Ok(v) => Ok(Some(v)),
1732                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1733                                        }
1734                                    }
1735                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1736                            },
1737                            None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1738                        },
1739                        SimpleQueryMessage::CommandComplete(c) => {
1740                            panic!("get command: {}", c);
1741                        }
1742                        _ => {
1743                            panic!("what?");
1744                        }
1745                    };
1746                    val
1747                }
1748            }
1749
1750            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1751                type Error = PostgresSourceError;
1752
1753                #[throws(PostgresSourceError)]
1754                fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1755                    let (ridx, cidx) = self.next_loc()?;
1756                    let val = match &self.rows[ridx] {
1757
1758                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1759                            Some(s) => match s{
1760                                "" => None,
1761                                "{}" => Some(vec![]),
1762                                _ => Some(rem_first_and_last(s).split(",").map(|v| {
1763                                    if v == "NULL" {
1764                                        Ok(None)
1765                                    } else {
1766                                        match v.parse() {
1767                                            Ok(v) => Ok(Some(v)),
1768                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1769                                        }
1770                                    }
1771                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1772                            },
1773                            None => None,
1774                        },
1775
1776                        SimpleQueryMessage::CommandComplete(c) => {
1777                            panic!("get command: {}", c);
1778                        }
1779                        _ => {
1780                            panic!("what?");
1781                        }
1782                    };
1783                    val
1784                }
1785            }
1786        )+
1787    };
1788}
1789impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1790
1791impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1792    type Error = PostgresSourceError;
1793
1794    #[throws(PostgresSourceError)]
1795    fn produce(&'r mut self) -> Vec<Option<bool>> {
1796        let (ridx, cidx) = self.next_loc()?;
1797        let val = match &self.rows[ridx] {
1798            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1799                Some(s) => match s {
1800                    "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1801                    "{}" => vec![],
1802                    _ => rem_first_and_last(s)
1803                        .split(',')
1804                        .map(|token| match token {
1805                            "NULL" => Ok(None),
1806                            "t" => Ok(Some(true)),
1807                            "f" => Ok(Some(false)),
1808                            _ => {
1809                                throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1810                            }
1811                        })
1812                        .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1813                },
1814                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1815            },
1816            SimpleQueryMessage::CommandComplete(c) => {
1817                panic!("get command: {}", c);
1818            }
1819            _ => {
1820                panic!("what?");
1821            }
1822        };
1823        val
1824    }
1825}
1826
1827impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1828    type Error = PostgresSourceError;
1829
1830    #[throws(PostgresSourceError)]
1831    fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1832        let (ridx, cidx) = self.next_loc()?;
1833        let val = match &self.rows[ridx] {
1834            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1835                Some(s) => match s {
1836                    "" => None,
1837                    "{}" => Some(vec![]),
1838                    _ => Some(
1839                        rem_first_and_last(s)
1840                            .split(',')
1841                            .map(|token| match token {
1842                                "NULL" => Ok(None),
1843                                "t" => Ok(Some(true)),
1844                                "f" => Ok(Some(false)),
1845                                _ => {
1846                                    throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1847                                        s.into()
1848                                    )))
1849                                }
1850                            })
1851                            .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1852                    ),
1853                },
1854                None => None,
1855            },
1856            SimpleQueryMessage::CommandComplete(c) => {
1857                panic!("get command: {}", c);
1858            }
1859            _ => {
1860                panic!("what?");
1861            }
1862        };
1863        val
1864    }
1865}
1866
1867impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1868    type Error = PostgresSourceError;
1869
1870    #[throws(PostgresSourceError)]
1871    fn produce(&'r mut self) -> NaiveDate {
1872        let (ridx, cidx) = self.next_loc()?;
1873        let val = match &self.rows[ridx] {
1874            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1875                Some(s) => match s {
1876                    "infinity" => NaiveDate::MAX,
1877                    "-infinity" => NaiveDate::MIN,
1878                    s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1879                        ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1880                    })?,
1881                },
1882                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1883            },
1884            SimpleQueryMessage::CommandComplete(c) => {
1885                panic!("get command: {}", c);
1886            }
1887            _ => {
1888                panic!("what?");
1889            }
1890        };
1891        val
1892    }
1893}
1894
1895impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1896    type Error = PostgresSourceError;
1897
1898    #[throws(PostgresSourceError)]
1899    fn produce(&'r mut self) -> Option<NaiveDate> {
1900        let (ridx, cidx) = self.next_loc()?;
1901        let val = match &self.rows[ridx] {
1902            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1903                Some(s) => match s {
1904                    "infinity" => Some(NaiveDate::MAX),
1905                    "-infinity" => Some(NaiveDate::MIN),
1906                    s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1907                        ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1908                    })?),
1909                },
1910                None => None,
1911            },
1912            SimpleQueryMessage::CommandComplete(c) => {
1913                panic!("get command: {}", c);
1914            }
1915            _ => {
1916                panic!("what?");
1917            }
1918        };
1919        val
1920    }
1921}
1922
1923impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1924    type Error = PostgresSourceError;
1925
1926    #[throws(PostgresSourceError)]
1927    fn produce(&'r mut self) -> NaiveTime {
1928        let (ridx, cidx) = self.next_loc()?;
1929        let val = match &self.rows[ridx] {
1930            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1931                Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1932                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1933                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1934            },
1935            SimpleQueryMessage::CommandComplete(c) => {
1936                panic!("get command: {}", c);
1937            }
1938            _ => {
1939                panic!("what?");
1940            }
1941        };
1942        val
1943    }
1944}
1945
1946impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1947    type Error = PostgresSourceError;
1948
1949    #[throws(PostgresSourceError)]
1950    fn produce(&'r mut self) -> Option<NaiveTime> {
1951        let (ridx, cidx) = self.next_loc()?;
1952        let val = match &self.rows[ridx] {
1953            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1954                Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1955                    ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1956                })?),
1957                None => None,
1958            },
1959            SimpleQueryMessage::CommandComplete(c) => {
1960                panic!("get command: {}", c);
1961            }
1962            _ => {
1963                panic!("what?");
1964            }
1965        };
1966        val
1967    }
1968}
1969
1970impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1971    type Error = PostgresSourceError;
1972
1973    #[throws(PostgresSourceError)]
1974    fn produce(&'r mut self) -> NaiveDateTime {
1975        let (ridx, cidx) = self.next_loc()?;
1976        let val =
1977            match &self.rows[ridx] {
1978                SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1979                    Some(s) => match s {
1980                        "infinity" => NaiveDateTime::MAX,
1981                        "-infinity" => NaiveDateTime::MIN,
1982                        s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1983                            |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1984                        )?,
1985                    },
1986                    None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1987                },
1988                SimpleQueryMessage::CommandComplete(c) => {
1989                    panic!("get command: {}", c);
1990                }
1991                _ => {
1992                    panic!("what?");
1993                }
1994            };
1995        val
1996    }
1997}
1998
1999impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2000    type Error = PostgresSourceError;
2001
2002    #[throws(PostgresSourceError)]
2003    fn produce(&'r mut self) -> Option<NaiveDateTime> {
2004        let (ridx, cidx) = self.next_loc()?;
2005        let val = match &self.rows[ridx] {
2006            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2007                Some(s) => match s {
2008                    "infinity" => Some(NaiveDateTime::MAX),
2009                    "-infinity" => Some(NaiveDateTime::MIN),
2010                    s => Some(
2011                        NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2012                            ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2013                        })?,
2014                    ),
2015                },
2016                None => None,
2017            },
2018            SimpleQueryMessage::CommandComplete(c) => {
2019                panic!("get command: {}", c);
2020            }
2021            _ => {
2022                panic!("what?");
2023            }
2024        };
2025        val
2026    }
2027}
2028
2029impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2030    type Error = PostgresSourceError;
2031
2032    #[throws(PostgresSourceError)]
2033    fn produce(&'r mut self) -> DateTime<Utc> {
2034        let (ridx, cidx) = self.next_loc()?;
2035        let val = match &self.rows[ridx] {
2036            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2037                Some("infinity") => DateTime::<Utc>::MAX_UTC,
2038                Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2039                Some(s) => {
2040                    let time_string = format!("{}:00", s).to_owned();
2041                    let slice: &str = &time_string[..];
2042                    let time: DateTime<FixedOffset> =
2043                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2044
2045                    time.with_timezone(&Utc)
2046                }
2047                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2048            },
2049            SimpleQueryMessage::CommandComplete(c) => {
2050                panic!("get command: {}", c);
2051            }
2052            _ => {
2053                panic!("what?");
2054            }
2055        };
2056        val
2057    }
2058}
2059
2060impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2061    type Error = PostgresSourceError;
2062
2063    #[throws(PostgresSourceError)]
2064    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2065        let (ridx, cidx) = self.next_loc()?;
2066        let val = match &self.rows[ridx] {
2067            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2068                Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2069                Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2070                Some(s) => {
2071                    let time_string = format!("{}:00", s).to_owned();
2072                    let slice: &str = &time_string[..];
2073                    let time: DateTime<FixedOffset> =
2074                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2075
2076                    Some(time.with_timezone(&Utc))
2077                }
2078                None => None,
2079            },
2080            SimpleQueryMessage::CommandComplete(c) => {
2081                panic!("get command: {}", c);
2082            }
2083            _ => {
2084                panic!("what?");
2085            }
2086        };
2087        val
2088    }
2089}