connectorx/sources/postgres/
mod.rs

1//! Source implementation for Postgres database, including the TLS support (client only).
2
3mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8pub use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use pgvector::{Bit, HalfVector, SparseVector, Vector};
11pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
12
13use crate::constants::DB_BUFFER_SIZE;
14use crate::{
15    data_order::DataOrder,
16    errors::ConnectorXError,
17    sources::{PartitionParser, Produce, Source, SourcePartition},
18    sql::{count_query, CXQuery},
19};
20use anyhow::anyhow;
21use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
22use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
23use fehler::{throw, throws};
24use hex::decode;
25use postgres::{
26    binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
27    fallible_iterator::FallibleIterator,
28    tls::{MakeTlsConnect, TlsConnect},
29    Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
30};
31use r2d2::{Pool, PooledConnection};
32use r2d2_postgres::PostgresConnectionManager;
33use rust_decimal::Decimal;
34use serde_json::{from_str, Value};
35use sqlparser::dialect::PostgreSqlDialect;
36use std::collections::HashMap;
37use std::convert::TryFrom;
38use std::marker::PhantomData;
39use uuid::Uuid;
40
41/// Protocol - Binary based bulk load
42pub enum BinaryProtocol {}
43
44/// Protocol - CSV based bulk load
45pub enum CSVProtocol {}
46
47/// Protocol - use Cursor
48pub enum CursorProtocol {}
49
50/// Protocol - use Simple Query
51pub enum SimpleProtocol {}
52
53type PgManager<C> = PostgresConnectionManager<C>;
54type PgConn<C> = PooledConnection<PgManager<C>>;
55
56macro_rules! impl_produce_unimplemented {
57    ($(($protocol: ty, $t: ty, $msg: expr),)+) => {
58        $(
59            impl<'r> Produce<'r, $t> for $protocol {
60                type Error = PostgresSourceError;
61
62                #[throws(PostgresSourceError)]
63                fn produce(&'r mut self) -> $t {
64                   unimplemented!($msg);
65                }
66            }
67
68            impl<'r> Produce<'r, Option<$t>> for $protocol {
69                type Error = PostgresSourceError;
70
71                #[throws(PostgresSourceError)]
72                fn produce(&'r mut self) -> Option<$t> {
73                   unimplemented!($msg);
74                }
75            }
76        )+
77    };
78}
79
80impl_produce_unimplemented!(
81    (PostgresCSVSourceParser<'_>, HashMap<String, Option<String>>, "Please use `cursor` protocol for hstore type"),
82    (PostgresCSVSourceParser<'_>, Vector, "Please use `binary` protocol for vector type"),
83    (PostgresCSVSourceParser<'_>, HalfVector, "Please use `binary` protocol for halfvector type"),
84    (PostgresCSVSourceParser<'_>, Bit, "Please use `binary` protocol for bit type"),
85    (PostgresCSVSourceParser<'_>, SparseVector, "Please use `binary` protocol for sparsevector type"),
86
87
88    (PostgresSimpleSourceParser,HashMap<String, Option<String>>, "unimplemented"),
89    (PostgresSimpleSourceParser,Value, "unimplemented"),
90    (PostgresSimpleSourceParser, Vector, "Please use `binary` protocol for vector type"),
91    (PostgresSimpleSourceParser, HalfVector, "Please use `binary` protocol for halfvector type"),
92    (PostgresSimpleSourceParser, Bit, "Please use `binary` protocol for bit type"),
93    (PostgresSimpleSourceParser, SparseVector, "Please use `binary` protocol for sparsevector type"),
94
95);
96
97// take a row and unwrap the interior field from column 0
98fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
99    let nrows: Option<R> = row.get(0);
100    nrows.expect("Could not parse int result from count_query")
101}
102
103#[throws(PostgresSourceError)]
104fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
105where
106    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
107    C::TlsConnect: Send,
108    C::Stream: Send,
109    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
110{
111    let dialect = PostgreSqlDialect {};
112
113    let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
114    let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
115    match col_type {
116        PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
117        PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
118        PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
119        _ => throw!(anyhow!(
120            "The result of the count query was not an int, aborting."
121        )),
122    }
123}
124
125pub struct PostgresSource<P, C>
126where
127    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
128    C::TlsConnect: Send,
129    C::Stream: Send,
130    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
131{
132    pool: Pool<PgManager<C>>,
133    origin_query: Option<String>,
134    queries: Vec<CXQuery<String>>,
135    names: Vec<String>,
136    schema: Vec<PostgresTypeSystem>,
137    pg_schema: Vec<postgres::types::Type>,
138    pre_execution_queries: Option<Vec<String>>,
139    _protocol: PhantomData<P>,
140}
141
142impl<P, C> PostgresSource<P, C>
143where
144    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
145    C::TlsConnect: Send,
146    C::Stream: Send,
147    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
148{
149    #[throws(PostgresSourceError)]
150    pub fn new(config: Config, tls: C, nconn: usize) -> Self {
151        let manager = PostgresConnectionManager::new(config, tls);
152        let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
153
154        Self {
155            pool,
156            origin_query: None,
157            queries: vec![],
158            names: vec![],
159            schema: vec![],
160            pg_schema: vec![],
161            pre_execution_queries: None,
162            _protocol: PhantomData,
163        }
164    }
165}
166
167impl<P, C> Source for PostgresSource<P, C>
168where
169    PostgresSourcePartition<P, C>:
170        SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
171    P: Send,
172    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
173    C::TlsConnect: Send,
174    C::Stream: Send,
175    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
176{
177    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
178    type Partition = PostgresSourcePartition<P, C>;
179    type TypeSystem = PostgresTypeSystem;
180    type Error = PostgresSourceError;
181
182    #[throws(PostgresSourceError)]
183    fn set_data_order(&mut self, data_order: DataOrder) {
184        if !matches!(data_order, DataOrder::RowMajor) {
185            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
186        }
187    }
188
189    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
190        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
191    }
192
193    fn set_origin_query(&mut self, query: Option<String>) {
194        self.origin_query = query;
195    }
196
197    fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
198        self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
199    }
200
201    #[throws(PostgresSourceError)]
202    fn fetch_metadata(&mut self) {
203        assert!(!self.queries.is_empty());
204
205        let mut conn = self.pool.get()?;
206        let first_query = &self.queries[0];
207
208        let stmt = conn.prepare(first_query.as_str())?;
209
210        let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
211            .columns()
212            .iter()
213            .map(|col| (col.name().to_string(), col.type_().clone()))
214            .unzip();
215
216        self.names = names;
217        self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
218        self.pg_schema = self
219            .schema
220            .iter()
221            .zip(pg_types.iter())
222            .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
223            .collect();
224    }
225
226    #[throws(PostgresSourceError)]
227    fn result_rows(&mut self) -> Option<usize> {
228        match &self.origin_query {
229            Some(q) => {
230                let cxq = CXQuery::Naked(q.clone());
231                let mut conn = self.pool.get()?;
232                let nrows = get_total_rows(&mut conn, &cxq)?;
233                Some(nrows)
234            }
235            None => None,
236        }
237    }
238
239    fn names(&self) -> Vec<String> {
240        self.names.clone()
241    }
242
243    fn schema(&self) -> Vec<Self::TypeSystem> {
244        self.schema.clone()
245    }
246
247    #[throws(PostgresSourceError)]
248    fn partition(self) -> Vec<Self::Partition> {
249        let mut ret = vec![];
250        for query in self.queries {
251            let mut conn = self.pool.get()?;
252
253            if let Some(pre_queries) = &self.pre_execution_queries {
254                for pre_query in pre_queries {
255                    conn.query(pre_query, &[])?;
256                }
257            }
258
259            ret.push(PostgresSourcePartition::<P, C>::new(
260                conn,
261                &query,
262                &self.schema,
263                &self.pg_schema,
264            ));
265        }
266        ret
267    }
268}
269
270pub struct PostgresSourcePartition<P, C>
271where
272    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
273    C::TlsConnect: Send,
274    C::Stream: Send,
275    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
276{
277    conn: PgConn<C>,
278    query: CXQuery<String>,
279    schema: Vec<PostgresTypeSystem>,
280    pg_schema: Vec<postgres::types::Type>,
281    nrows: usize,
282    ncols: usize,
283    _protocol: PhantomData<P>,
284}
285
286impl<P, C> PostgresSourcePartition<P, C>
287where
288    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
289    C::TlsConnect: Send,
290    C::Stream: Send,
291    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
292{
293    pub fn new(
294        conn: PgConn<C>,
295        query: &CXQuery<String>,
296        schema: &[PostgresTypeSystem],
297        pg_schema: &[postgres::types::Type],
298    ) -> Self {
299        Self {
300            conn,
301            query: query.clone(),
302            schema: schema.to_vec(),
303            pg_schema: pg_schema.to_vec(),
304            nrows: 0,
305            ncols: schema.len(),
306            _protocol: PhantomData,
307        }
308    }
309}
310
311impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
312where
313    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
314    C::TlsConnect: Send,
315    C::Stream: Send,
316    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
317{
318    type TypeSystem = PostgresTypeSystem;
319    type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
320    type Error = PostgresSourceError;
321
322    #[throws(PostgresSourceError)]
323    fn result_rows(&mut self) -> () {
324        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
325    }
326
327    #[throws(PostgresSourceError)]
328    fn parser(&mut self) -> Self::Parser<'_> {
329        let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
330        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
331        let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
332
333        PostgresBinarySourcePartitionParser::new(iter, &self.schema)
334    }
335
336    fn nrows(&self) -> usize {
337        self.nrows
338    }
339
340    fn ncols(&self) -> usize {
341        self.ncols
342    }
343}
344
345impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
346where
347    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
348    C::TlsConnect: Send,
349    C::Stream: Send,
350    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
351{
352    type TypeSystem = PostgresTypeSystem;
353    type Parser<'a> = PostgresCSVSourceParser<'a>;
354    type Error = PostgresSourceError;
355
356    #[throws(PostgresSourceError)]
357    fn result_rows(&mut self) {
358        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
359    }
360
361    #[throws(PostgresSourceError)]
362    fn parser(&mut self) -> Self::Parser<'_> {
363        let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
364        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
365        let iter = ReaderBuilder::new()
366            .has_headers(false)
367            .from_reader(reader)
368            .into_records();
369
370        PostgresCSVSourceParser::new(iter, &self.schema)
371    }
372
373    fn nrows(&self) -> usize {
374        self.nrows
375    }
376
377    fn ncols(&self) -> usize {
378        self.ncols
379    }
380}
381
382impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
383where
384    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
385    C::TlsConnect: Send,
386    C::Stream: Send,
387    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
388{
389    type TypeSystem = PostgresTypeSystem;
390    type Parser<'a> = PostgresRawSourceParser<'a>;
391    type Error = PostgresSourceError;
392
393    #[throws(PostgresSourceError)]
394    fn result_rows(&mut self) {
395        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
396    }
397
398    #[throws(PostgresSourceError)]
399    fn parser(&mut self) -> Self::Parser<'_> {
400        let iter = self
401            .conn
402            .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; // unless reading the data, it seems like issue the query is fast
403        PostgresRawSourceParser::new(iter, &self.schema)
404    }
405
406    fn nrows(&self) -> usize {
407        self.nrows
408    }
409
410    fn ncols(&self) -> usize {
411        self.ncols
412    }
413}
414pub struct PostgresBinarySourcePartitionParser<'a> {
415    iter: BinaryCopyOutIter<'a>,
416    rowbuf: Vec<BinaryCopyOutRow>,
417    ncols: usize,
418    current_col: usize,
419    current_row: usize,
420    is_finished: bool,
421}
422
423impl<'a> PostgresBinarySourcePartitionParser<'a> {
424    pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
425        Self {
426            iter,
427            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
428            ncols: schema.len(),
429            current_row: 0,
430            current_col: 0,
431            is_finished: false,
432        }
433    }
434
435    #[throws(PostgresSourceError)]
436    fn next_loc(&mut self) -> (usize, usize) {
437        let ret = (self.current_row, self.current_col);
438        self.current_row += (self.current_col + 1) / self.ncols;
439        self.current_col = (self.current_col + 1) % self.ncols;
440        ret
441    }
442}
443
444impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
445    type TypeSystem = PostgresTypeSystem;
446    type Error = PostgresSourceError;
447
448    #[throws(PostgresSourceError)]
449    fn fetch_next(&mut self) -> (usize, bool) {
450        assert!(self.current_col == 0);
451        let remaining_rows = self.rowbuf.len() - self.current_row;
452        if remaining_rows > 0 {
453            return (remaining_rows, self.is_finished);
454        } else if self.is_finished {
455            return (0, self.is_finished);
456        }
457
458        // clear the buffer
459        if !self.rowbuf.is_empty() {
460            self.rowbuf.drain(..);
461        }
462        for _ in 0..DB_BUFFER_SIZE {
463            match self.iter.next()? {
464                Some(row) => {
465                    self.rowbuf.push(row);
466                }
467                None => {
468                    self.is_finished = true;
469                    break;
470                }
471            }
472        }
473
474        // reset current cursor positions
475        self.current_row = 0;
476        self.current_col = 0;
477
478        (self.rowbuf.len(), self.is_finished)
479    }
480}
481
482macro_rules! impl_produce {
483    ($($t: ty,)+) => {
484        $(
485            impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
486                type Error = PostgresSourceError;
487
488                #[throws(PostgresSourceError)]
489                fn produce(&'r mut self) -> $t {
490                    let (ridx, cidx) = self.next_loc()?;
491                    let row = &self.rowbuf[ridx];
492                    let val = row.try_get(cidx)?;
493                    val
494                }
495            }
496
497            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
498                type Error = PostgresSourceError;
499
500                #[throws(PostgresSourceError)]
501                fn produce(&'r mut self) -> Option<$t> {
502                    let (ridx, cidx) = self.next_loc()?;
503                    let row = &self.rowbuf[ridx];
504                    let val = row.try_get(cidx)?;
505                    val
506                }
507            }
508        )+
509    };
510}
511
512impl_produce!(
513    i8,
514    i16,
515    i32,
516    i64,
517    u32,
518    f32,
519    f64,
520    Decimal,
521    bool,
522    &'r str,
523    Vec<u8>,
524    NaiveTime,
525    Uuid,
526    Value,
527    IpInet,
528    Vector,
529    HalfVector,
530    Bit,
531    SparseVector,
532    Vec<Option<bool>>,
533    Vec<Option<i16>>,
534    Vec<Option<i32>>,
535    Vec<Option<i64>>,
536    Vec<Option<Decimal>>,
537    Vec<Option<f32>>,
538    Vec<Option<f64>>,
539    Vec<Option<String>>,
540);
541
542impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
543    type Error = PostgresSourceError;
544
545    #[throws(PostgresSourceError)]
546    fn produce(&'r mut self) -> NaiveDateTime {
547        let (ridx, cidx) = self.next_loc()?;
548        let row = &self.rowbuf[ridx];
549        let val = row.try_get(cidx)?;
550        match val {
551            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
552            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
553            postgres::types::Timestamp::Value(t) => t,
554        }
555    }
556}
557
558impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
559    type Error = PostgresSourceError;
560
561    #[throws(PostgresSourceError)]
562    fn produce(&'r mut self) -> Option<NaiveDateTime> {
563        let (ridx, cidx) = self.next_loc()?;
564        let row = &self.rowbuf[ridx];
565        let val = row.try_get(cidx)?;
566        match val {
567            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
568            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
569            Some(postgres::types::Timestamp::Value(t)) => t,
570            None => None,
571        }
572    }
573}
574
575impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
576    type Error = PostgresSourceError;
577
578    #[throws(PostgresSourceError)]
579    fn produce(&'r mut self) -> DateTime<Utc> {
580        let (ridx, cidx) = self.next_loc()?;
581        let row = &self.rowbuf[ridx];
582        let val = row.try_get(cidx)?;
583        match val {
584            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
585            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
586            postgres::types::Timestamp::Value(t) => t,
587        }
588    }
589}
590
591impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
592    type Error = PostgresSourceError;
593
594    #[throws(PostgresSourceError)]
595    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
596        let (ridx, cidx) = self.next_loc()?;
597        let row = &self.rowbuf[ridx];
598        let val = row.try_get(cidx)?;
599        match val {
600            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
601            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
602            Some(postgres::types::Timestamp::Value(t)) => t,
603            None => None,
604        }
605    }
606}
607
608impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
609    type Error = PostgresSourceError;
610
611    #[throws(PostgresSourceError)]
612    fn produce(&'r mut self) -> NaiveDate {
613        let (ridx, cidx) = self.next_loc()?;
614        let row = &self.rowbuf[ridx];
615        let val = row.try_get(cidx)?;
616        match val {
617            postgres::types::Date::PosInfinity => NaiveDate::MAX,
618            postgres::types::Date::NegInfinity => NaiveDate::MIN,
619            postgres::types::Date::Value(t) => t,
620        }
621    }
622}
623
624impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
625    type Error = PostgresSourceError;
626
627    #[throws(PostgresSourceError)]
628    fn produce(&'r mut self) -> Option<NaiveDate> {
629        let (ridx, cidx) = self.next_loc()?;
630        let row = &self.rowbuf[ridx];
631        let val = row.try_get(cidx)?;
632        match val {
633            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
634            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
635            Some(postgres::types::Date::Value(t)) => t,
636            None => None,
637        }
638    }
639}
640
641impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
642    type Error = PostgresSourceError;
643    #[throws(PostgresSourceError)]
644    fn produce(&mut self) -> HashMap<String, Option<String>> {
645        unimplemented!("Please use `cursor` protocol for hstore type");
646    }
647}
648
649impl Produce<'_, Option<HashMap<String, Option<String>>>>
650    for PostgresBinarySourcePartitionParser<'_>
651{
652    type Error = PostgresSourceError;
653    #[throws(PostgresSourceError)]
654    fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
655        unimplemented!("Please use `cursor` protocol for hstore type");
656    }
657}
658
659pub struct PostgresCSVSourceParser<'a> {
660    iter: StringRecordsIntoIter<CopyOutReader<'a>>,
661    rowbuf: Vec<StringRecord>,
662    ncols: usize,
663    current_col: usize,
664    current_row: usize,
665    is_finished: bool,
666}
667
668impl<'a> PostgresCSVSourceParser<'a> {
669    pub fn new(
670        iter: StringRecordsIntoIter<CopyOutReader<'a>>,
671        schema: &[PostgresTypeSystem],
672    ) -> Self {
673        Self {
674            iter,
675            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
676            ncols: schema.len(),
677            current_row: 0,
678            current_col: 0,
679            is_finished: false,
680        }
681    }
682
683    #[throws(PostgresSourceError)]
684    fn next_loc(&mut self) -> (usize, usize) {
685        let ret = (self.current_row, self.current_col);
686        self.current_row += (self.current_col + 1) / self.ncols;
687        self.current_col = (self.current_col + 1) % self.ncols;
688        ret
689    }
690}
691
692impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
693    type Error = PostgresSourceError;
694    type TypeSystem = PostgresTypeSystem;
695
696    #[throws(PostgresSourceError)]
697    fn fetch_next(&mut self) -> (usize, bool) {
698        assert!(self.current_col == 0);
699        let remaining_rows = self.rowbuf.len() - self.current_row;
700        if remaining_rows > 0 {
701            return (remaining_rows, self.is_finished);
702        } else if self.is_finished {
703            return (0, self.is_finished);
704        }
705
706        if !self.rowbuf.is_empty() {
707            self.rowbuf.drain(..);
708        }
709        for _ in 0..DB_BUFFER_SIZE {
710            if let Some(row) = self.iter.next() {
711                self.rowbuf.push(row?);
712            } else {
713                self.is_finished = true;
714                break;
715            }
716        }
717        self.current_row = 0;
718        self.current_col = 0;
719        (self.rowbuf.len(), self.is_finished)
720    }
721}
722
723macro_rules! impl_csv_produce {
724    ($($t: ty,)+) => {
725        $(
726            impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
727                type Error = PostgresSourceError;
728
729                #[throws(PostgresSourceError)]
730                fn produce(&'r mut self) -> $t {
731                    let (ridx, cidx) = self.next_loc()?;
732                    self.rowbuf[ridx][cidx].parse().map_err(|_| {
733                        ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
734                    })?
735                }
736            }
737
738            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
739                type Error = PostgresSourceError;
740
741                #[throws(PostgresSourceError)]
742                fn produce(&'r mut self) -> Option<$t> {
743                    let (ridx, cidx) = self.next_loc()?;
744                    match &self.rowbuf[ridx][cidx][..] {
745                        "" => None,
746                        v => Some(v.parse().map_err(|_| {
747                            ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
748                        })?),
749                    }
750                }
751            }
752        )+
753    };
754}
755
756impl_csv_produce!(i8, i16, i32, i64, u32, f32, f64, Uuid, IpInet,);
757
758macro_rules! impl_csv_vec_produce {
759    ($($t: ty,)+) => {
760        $(
761            impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
762                type Error = PostgresSourceError;
763
764                #[throws(PostgresSourceError)]
765                fn produce(&mut self) -> Vec<Option<$t>> {
766                    let (ridx, cidx) = self.next_loc()?;
767                    let s = &self.rowbuf[ridx][cidx][..];
768                    match s {
769                        "{}" => vec![],
770                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
771                        s => s[1..s.len() - 1]
772                            .split(",")
773                            .map(|v| {
774                                if v == "NULL" {
775                                    Ok(None)
776                                } else {
777                                    match v.parse() {
778                                        Ok(v) => Ok(Some(v)),
779                                        Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
780                                    }
781                                }
782                            })
783                            .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
784                    }
785                }
786            }
787
788            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
789                type Error = PostgresSourceError;
790
791                #[throws(PostgresSourceError)]
792                fn produce(&mut self) -> Option<Vec<Option<$t>>> {
793                    let (ridx, cidx) = self.next_loc()?;
794                    let s = &self.rowbuf[ridx][cidx][..];
795                    match s {
796                        "" => None,
797                        "{}" => Some(vec![]),
798                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
799                        s => Some(
800                            s[1..s.len() - 1]
801                                .split(",")
802                                .map(|v| {
803                                    if v == "NULL" {
804                                        Ok(None)
805                                    } else {
806                                        match v.parse() {
807                                            Ok(v) => Ok(Some(v)),
808                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
809                                        }
810                                    }
811                                })
812                                .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
813                        ),
814                    }
815                }
816            }
817        )+
818    };
819}
820
821impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
822
823impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
824    type Error = PostgresSourceError;
825
826    #[throws(PostgresSourceError)]
827    fn produce(&mut self) -> bool {
828        let (ridx, cidx) = self.next_loc()?;
829        let ret = match &self.rowbuf[ridx][cidx][..] {
830            "t" => true,
831            "f" => false,
832            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
833                self.rowbuf[ridx][cidx].into()
834            ))),
835        };
836        ret
837    }
838}
839
840impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
841    type Error = PostgresSourceError;
842
843    #[throws(PostgresSourceError)]
844    fn produce(&mut self) -> Option<bool> {
845        let (ridx, cidx) = self.next_loc()?;
846        let ret = match &self.rowbuf[ridx][cidx][..] {
847            "" => None,
848            "t" => Some(true),
849            "f" => Some(false),
850            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
851                self.rowbuf[ridx][cidx].into()
852            ))),
853        };
854        ret
855    }
856}
857
858impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
859    type Error = PostgresSourceError;
860
861    #[throws(PostgresSourceError)]
862    fn produce(&mut self) -> Vec<Option<bool>> {
863        let (ridx, cidx) = self.next_loc()?;
864        let s = &self.rowbuf[ridx][cidx][..];
865        match s {
866            "{}" => vec![],
867            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
868            s => s[1..s.len() - 1]
869                .split(',')
870                .map(|v| match v {
871                    "NULL" => Ok(None),
872                    "t" => Ok(Some(true)),
873                    "f" => Ok(Some(false)),
874                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
875                })
876                .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
877        }
878    }
879}
880
881impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
882    type Error = PostgresSourceError;
883
884    #[throws(PostgresSourceError)]
885    fn produce(&mut self) -> Option<Vec<Option<bool>>> {
886        let (ridx, cidx) = self.next_loc()?;
887        let s = &self.rowbuf[ridx][cidx][..];
888        match s {
889            "" => None,
890            "{}" => Some(vec![]),
891            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
892            s => Some(
893                s[1..s.len() - 1]
894                    .split(',')
895                    .map(|v| match v {
896                        "NULL" => Ok(None),
897                        "t" => Ok(Some(true)),
898                        "f" => Ok(Some(false)),
899                        _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
900                    })
901                    .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
902            ),
903        }
904    }
905}
906
907impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
908    type Error = PostgresSourceError;
909
910    #[throws(PostgresSourceError)]
911    fn produce(&'r mut self) -> Decimal {
912        let (ridx, cidx) = self.next_loc()?;
913        match &self.rowbuf[ridx][cidx][..] {
914            "Infinity" => Decimal::MAX,
915            "-Infinity" => Decimal::MIN,
916            v => v
917                .parse()
918                .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
919        }
920    }
921}
922
923impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
924    type Error = PostgresSourceError;
925
926    #[throws(PostgresSourceError)]
927    fn produce(&'r mut self) -> Option<Decimal> {
928        let (ridx, cidx) = self.next_loc()?;
929        match &self.rowbuf[ridx][cidx][..] {
930            "" => None,
931            "Infinity" => Some(Decimal::MAX),
932            "-Infinity" => Some(Decimal::MIN),
933            v => Some(
934                v.parse()
935                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
936            ),
937        }
938    }
939}
940
941impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
942    type Error = PostgresSourceError;
943
944    #[throws(PostgresSourceError)]
945    fn produce(&mut self) -> DateTime<Utc> {
946        let (ridx, cidx) = self.next_loc()?;
947        match &self.rowbuf[ridx][cidx][..] {
948            "infinity" => DateTime::<Utc>::MAX_UTC,
949            "-infinity" => DateTime::<Utc>::MIN_UTC,
950            // postgres csv return example: 1970-01-01 00:00:01+00
951            v => format!("{}:00", v)
952                .parse()
953                .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
954        }
955    }
956}
957
958impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
959    type Error = PostgresSourceError;
960
961    #[throws(PostgresSourceError)]
962    fn produce(&mut self) -> Option<DateTime<Utc>> {
963        let (ridx, cidx) = self.next_loc()?;
964        match &self.rowbuf[ridx][cidx][..] {
965            "" => None,
966            "infinity" => Some(DateTime::<Utc>::MAX_UTC),
967            "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
968            v => {
969                // postgres csv return example: 1970-01-01 00:00:01+00
970                Some(format!("{}:00", v).parse().map_err(|_| {
971                    ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
972                })?)
973            }
974        }
975    }
976}
977
978impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
979    type Error = PostgresSourceError;
980
981    #[throws(PostgresSourceError)]
982    fn produce(&mut self) -> NaiveDate {
983        let (ridx, cidx) = self.next_loc()?;
984        match &self.rowbuf[ridx][cidx][..] {
985            "infinity" => NaiveDate::MAX,
986            "-infinity" => NaiveDate::MIN,
987            v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
988                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
989        }
990    }
991}
992
993impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
994    type Error = PostgresSourceError;
995
996    #[throws(PostgresSourceError)]
997    fn produce(&mut self) -> Option<NaiveDate> {
998        let (ridx, cidx) = self.next_loc()?;
999        match &self.rowbuf[ridx][cidx][..] {
1000            "" => None,
1001            "infinity" => Some(NaiveDate::MAX),
1002            "-infinity" => Some(NaiveDate::MIN),
1003            v => Some(
1004                NaiveDate::parse_from_str(v, "%Y-%m-%d")
1005                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
1006            ),
1007        }
1008    }
1009}
1010
1011impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
1012    type Error = PostgresSourceError;
1013
1014    #[throws(PostgresSourceError)]
1015    fn produce(&mut self) -> NaiveDateTime {
1016        let (ridx, cidx) = self.next_loc()?;
1017        match &self.rowbuf[ridx][cidx] {
1018            "infinity" => NaiveDateTime::MAX,
1019            "-infinity" => NaiveDateTime::MIN,
1020            v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
1021                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
1022        }
1023    }
1024}
1025
1026impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
1027    type Error = PostgresSourceError;
1028
1029    #[throws(PostgresSourceError)]
1030    fn produce(&mut self) -> Option<NaiveDateTime> {
1031        let (ridx, cidx) = self.next_loc()?;
1032        match &self.rowbuf[ridx][cidx][..] {
1033            "" => None,
1034            "infinity" => Some(NaiveDateTime::MAX),
1035            "-infinity" => Some(NaiveDateTime::MIN),
1036            v => Some(
1037                NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1038                    ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1039                })?,
1040            ),
1041        }
1042    }
1043}
1044
1045impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1046    type Error = PostgresSourceError;
1047
1048    #[throws(PostgresSourceError)]
1049    fn produce(&mut self) -> NaiveTime {
1050        let (ridx, cidx) = self.next_loc()?;
1051        NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1052            ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1053        })?
1054    }
1055}
1056
1057impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1058    type Error = PostgresSourceError;
1059
1060    #[throws(PostgresSourceError)]
1061    fn produce(&mut self) -> Option<NaiveTime> {
1062        let (ridx, cidx) = self.next_loc()?;
1063        match &self.rowbuf[ridx][cidx][..] {
1064            "" => None,
1065            v => Some(
1066                NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1067                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1068            ),
1069        }
1070    }
1071}
1072
1073impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1074    type Error = PostgresSourceError;
1075
1076    #[throws(PostgresSourceError)]
1077    fn produce(&'r mut self) -> &'r str {
1078        let (ridx, cidx) = self.next_loc()?;
1079        &self.rowbuf[ridx][cidx]
1080    }
1081}
1082
1083impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1084    type Error = PostgresSourceError;
1085
1086    #[throws(PostgresSourceError)]
1087    fn produce(&'r mut self) -> Option<&'r str> {
1088        let (ridx, cidx) = self.next_loc()?;
1089        match &self.rowbuf[ridx][cidx][..] {
1090            "" => None,
1091            v => Some(v),
1092        }
1093    }
1094}
1095
1096impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1097    type Error = PostgresSourceError;
1098
1099    #[throws(PostgresSourceError)]
1100    fn produce(&'r mut self) -> Vec<u8> {
1101        let (ridx, cidx) = self.next_loc()?;
1102        decode(&self.rowbuf[ridx][cidx][2..])? // escape \x in the beginning
1103    }
1104}
1105
1106impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1107    type Error = PostgresSourceError;
1108
1109    #[throws(PostgresSourceError)]
1110    fn produce(&'r mut self) -> Option<Vec<u8>> {
1111        let (ridx, cidx) = self.next_loc()?;
1112        match &self.rowbuf[ridx][cidx] {
1113            // escape \x in the beginning, empty if None
1114            "" => None,
1115            v => Some(decode(&v[2..])?),
1116        }
1117    }
1118}
1119
1120impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1121    type Error = PostgresSourceError;
1122
1123    #[throws(PostgresSourceError)]
1124    fn produce(&'r mut self) -> Value {
1125        let (ridx, cidx) = self.next_loc()?;
1126        let v = &self.rowbuf[ridx][cidx];
1127        from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1128    }
1129}
1130
1131impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1132    type Error = PostgresSourceError;
1133
1134    #[throws(PostgresSourceError)]
1135    fn produce(&'r mut self) -> Option<Value> {
1136        let (ridx, cidx) = self.next_loc()?;
1137
1138        match &self.rowbuf[ridx][cidx][..] {
1139            "" => None,
1140            v => {
1141                from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1142            }
1143        }
1144    }
1145}
1146
1147pub struct PostgresRawSourceParser<'a> {
1148    iter: RowIter<'a>,
1149    rowbuf: Vec<Row>,
1150    ncols: usize,
1151    current_col: usize,
1152    current_row: usize,
1153    is_finished: bool,
1154}
1155
1156impl<'a> PostgresRawSourceParser<'a> {
1157    pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1158        Self {
1159            iter,
1160            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1161            ncols: schema.len(),
1162            current_row: 0,
1163            current_col: 0,
1164            is_finished: false,
1165        }
1166    }
1167
1168    #[throws(PostgresSourceError)]
1169    fn next_loc(&mut self) -> (usize, usize) {
1170        let ret = (self.current_row, self.current_col);
1171        self.current_row += (self.current_col + 1) / self.ncols;
1172        self.current_col = (self.current_col + 1) % self.ncols;
1173        ret
1174    }
1175}
1176
1177impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1178    type TypeSystem = PostgresTypeSystem;
1179    type Error = PostgresSourceError;
1180
1181    #[throws(PostgresSourceError)]
1182    fn fetch_next(&mut self) -> (usize, bool) {
1183        assert!(self.current_col == 0);
1184        let remaining_rows = self.rowbuf.len() - self.current_row;
1185        if remaining_rows > 0 {
1186            return (remaining_rows, self.is_finished);
1187        } else if self.is_finished {
1188            return (0, self.is_finished);
1189        }
1190
1191        if !self.rowbuf.is_empty() {
1192            self.rowbuf.drain(..);
1193        }
1194        for _ in 0..DB_BUFFER_SIZE {
1195            if let Some(row) = self.iter.next()? {
1196                self.rowbuf.push(row);
1197            } else {
1198                self.is_finished = true;
1199                break;
1200            }
1201        }
1202        self.current_row = 0;
1203        self.current_col = 0;
1204        (self.rowbuf.len(), self.is_finished)
1205    }
1206}
1207
1208macro_rules! impl_produce {
1209    ($($t: ty,)+) => {
1210        $(
1211            impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1212                type Error = PostgresSourceError;
1213
1214                #[throws(PostgresSourceError)]
1215                fn produce(&'r mut self) -> $t {
1216                    let (ridx, cidx) = self.next_loc()?;
1217                    let row = &self.rowbuf[ridx];
1218                    let val = row.try_get(cidx)?;
1219                    val
1220                }
1221            }
1222
1223            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1224                type Error = PostgresSourceError;
1225
1226                #[throws(PostgresSourceError)]
1227                fn produce(&'r mut self) -> Option<$t> {
1228                    let (ridx, cidx) = self.next_loc()?;
1229                    let row = &self.rowbuf[ridx];
1230                    let val = row.try_get(cidx)?;
1231                    val
1232                }
1233            }
1234        )+
1235    };
1236}
1237
1238impl_produce!(
1239    i8,
1240    i16,
1241    i32,
1242    i64,
1243    u32,
1244    f32,
1245    f64,
1246    Decimal,
1247    bool,
1248    &'r str,
1249    Vec<u8>,
1250    NaiveTime,
1251    Uuid,
1252    Value,
1253    IpInet,
1254    Vector,
1255    HalfVector,
1256    Bit,
1257    SparseVector,
1258    HashMap<String, Option<String>>,
1259    Vec<Option<bool>>,
1260    Vec<Option<String>>,
1261    Vec<Option<i16>>,
1262    Vec<Option<i32>>,
1263    Vec<Option<i64>>,
1264    Vec<Option<f32>>,
1265    Vec<Option<f64>>,
1266    Vec<Option<Decimal>>,
1267);
1268
1269impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1270    type Error = PostgresSourceError;
1271
1272    #[throws(PostgresSourceError)]
1273    fn produce(&'r mut self) -> DateTime<Utc> {
1274        let (ridx, cidx) = self.next_loc()?;
1275        let row = &self.rowbuf[ridx];
1276        let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1277        match val {
1278            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1279            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1280            postgres::types::Timestamp::Value(t) => t,
1281        }
1282    }
1283}
1284
1285impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1286    type Error = PostgresSourceError;
1287
1288    #[throws(PostgresSourceError)]
1289    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1290        let (ridx, cidx) = self.next_loc()?;
1291        let row = &self.rowbuf[ridx];
1292        let val = row.try_get(cidx)?;
1293        match val {
1294            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1295            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1296            Some(postgres::types::Timestamp::Value(t)) => t,
1297            None => None,
1298        }
1299    }
1300}
1301
1302impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1303    type Error = PostgresSourceError;
1304
1305    #[throws(PostgresSourceError)]
1306    fn produce(&'r mut self) -> NaiveDateTime {
1307        let (ridx, cidx) = self.next_loc()?;
1308        let row = &self.rowbuf[ridx];
1309        let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1310        match val {
1311            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1312            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1313            postgres::types::Timestamp::Value(t) => t,
1314        }
1315    }
1316}
1317
1318impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1319    type Error = PostgresSourceError;
1320
1321    #[throws(PostgresSourceError)]
1322    fn produce(&'r mut self) -> Option<NaiveDateTime> {
1323        let (ridx, cidx) = self.next_loc()?;
1324        let row = &self.rowbuf[ridx];
1325        let val = row.try_get(cidx)?;
1326        match val {
1327            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1328            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1329            Some(postgres::types::Timestamp::Value(t)) => t,
1330            None => None,
1331        }
1332    }
1333}
1334
1335impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1336    type Error = PostgresSourceError;
1337
1338    #[throws(PostgresSourceError)]
1339    fn produce(&'r mut self) -> NaiveDate {
1340        let (ridx, cidx) = self.next_loc()?;
1341        let row = &self.rowbuf[ridx];
1342        let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1343        match val {
1344            postgres::types::Date::PosInfinity => NaiveDate::MAX,
1345            postgres::types::Date::NegInfinity => NaiveDate::MIN,
1346            postgres::types::Date::Value(t) => t,
1347        }
1348    }
1349}
1350
1351impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1352    type Error = PostgresSourceError;
1353
1354    #[throws(PostgresSourceError)]
1355    fn produce(&'r mut self) -> Option<NaiveDate> {
1356        let (ridx, cidx) = self.next_loc()?;
1357        let row = &self.rowbuf[ridx];
1358        let val = row.try_get(cidx)?;
1359        match val {
1360            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1361            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1362            Some(postgres::types::Date::Value(t)) => t,
1363            None => None,
1364        }
1365    }
1366}
1367
1368impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1369where
1370    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1371    C::TlsConnect: Send,
1372    C::Stream: Send,
1373    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1374{
1375    type TypeSystem = PostgresTypeSystem;
1376    type Parser<'a> = PostgresSimpleSourceParser;
1377    type Error = PostgresSourceError;
1378
1379    #[throws(PostgresSourceError)]
1380    fn result_rows(&mut self) {
1381        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1382    }
1383
1384    #[throws(PostgresSourceError)]
1385    fn parser(&mut self) -> Self::Parser<'_> {
1386        let rows = self.conn.simple_query(self.query.as_str())?; // unless reading the data, it seems like issue the query is fast
1387        PostgresSimpleSourceParser::new(rows, &self.schema)
1388    }
1389
1390    fn nrows(&self) -> usize {
1391        self.nrows
1392    }
1393
1394    fn ncols(&self) -> usize {
1395        self.ncols
1396    }
1397}
1398
1399pub struct PostgresSimpleSourceParser {
1400    rows: Vec<SimpleQueryMessage>,
1401    ncols: usize,
1402    current_col: usize,
1403    current_row: usize,
1404}
1405impl PostgresSimpleSourceParser {
1406    pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1407        Self {
1408            rows,
1409            ncols: schema.len(),
1410            current_row: 0,
1411            current_col: 0,
1412        }
1413    }
1414
1415    #[throws(PostgresSourceError)]
1416    fn next_loc(&mut self) -> (usize, usize) {
1417        let ret = (self.current_row, self.current_col);
1418        self.current_row += (self.current_col + 1) / self.ncols;
1419        self.current_col = (self.current_col + 1) % self.ncols;
1420        ret
1421    }
1422}
1423
1424impl PartitionParser<'_> for PostgresSimpleSourceParser {
1425    type TypeSystem = PostgresTypeSystem;
1426    type Error = PostgresSourceError;
1427
1428    #[throws(PostgresSourceError)]
1429    fn fetch_next(&mut self) -> (usize, bool) {
1430        self.current_row = 0;
1431        self.current_col = 0;
1432        if !self.rows.is_empty() {
1433            if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1434                self.current_row = 1;
1435            }
1436        }
1437
1438        (self.rows.len() - 1 - self.current_row, true) // last message is command complete
1439    }
1440}
1441
1442macro_rules! impl_simple_produce {
1443    ($($t: ty,)+) => {
1444        $(
1445            impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1446                type Error = PostgresSourceError;
1447
1448                #[throws(PostgresSourceError)]
1449                fn produce(&'r mut self) -> $t {
1450                    let (ridx, cidx) = self.next_loc()?;
1451                    let val = match &self.rows[ridx] {
1452                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1453                            Some(s) => s
1454                                .parse()
1455                                .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1456                            None => throw!(anyhow!(
1457                                "Cannot parse NULL in NOT NULL column."
1458                            )),
1459                        },
1460                        SimpleQueryMessage::CommandComplete(c) => {
1461                            panic!("get command: {}", c);
1462                        }
1463                        _ => {
1464                            panic!("what?");
1465                        }
1466                    };
1467                    val
1468                }
1469            }
1470
1471            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1472                type Error = PostgresSourceError;
1473
1474                #[throws(PostgresSourceError)]
1475                fn produce(&'r mut self) -> Option<$t> {
1476                    let (ridx, cidx) = self.next_loc()?;
1477                    let val = match &self.rows[ridx] {
1478                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1479                            Some(s) => Some(
1480                                s.parse()
1481                                    .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1482                            ),
1483                            None => None,
1484                        },
1485                        SimpleQueryMessage::CommandComplete(c) => {
1486                            panic!("get command: {}", c);
1487                        }
1488                        _ => {
1489                            panic!("what?");
1490                        }
1491                    };
1492                    val
1493                }
1494            }
1495        )+
1496    };
1497}
1498
1499impl_simple_produce!(i8, i16, i32, i64, u32, f32, f64, Uuid, IpInet,);
1500
1501impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1502    type Error = PostgresSourceError;
1503
1504    #[throws(PostgresSourceError)]
1505    fn produce(&'r mut self) -> bool {
1506        let (ridx, cidx) = self.next_loc()?;
1507        let val = match &self.rows[ridx] {
1508            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1509                Some(s) => match s {
1510                    "t" => true,
1511                    "f" => false,
1512                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1513                },
1514                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1515            },
1516            SimpleQueryMessage::CommandComplete(c) => {
1517                panic!("get command: {}", c);
1518            }
1519            _ => {
1520                panic!("what?");
1521            }
1522        };
1523        val
1524    }
1525}
1526
1527impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1528    type Error = PostgresSourceError;
1529
1530    #[throws(PostgresSourceError)]
1531    fn produce(&'r mut self) -> Option<bool> {
1532        let (ridx, cidx) = self.next_loc()?;
1533        let val = match &self.rows[ridx] {
1534            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1535                Some(s) => match s {
1536                    "t" => Some(true),
1537                    "f" => Some(false),
1538                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1539                },
1540                None => None,
1541            },
1542            SimpleQueryMessage::CommandComplete(c) => {
1543                panic!("get command: {}", c);
1544            }
1545            _ => {
1546                panic!("what?");
1547            }
1548        };
1549        val
1550    }
1551}
1552
1553impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1554    type Error = PostgresSourceError;
1555
1556    #[throws(PostgresSourceError)]
1557    fn produce(&'r mut self) -> Decimal {
1558        let (ridx, cidx) = self.next_loc()?;
1559        let val = match &self.rows[ridx] {
1560            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1561                Some("Infinity") => Decimal::MAX,
1562                Some("-Infinity") => Decimal::MIN,
1563                Some(s) => s
1564                    .parse()
1565                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1566                None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1567            },
1568            SimpleQueryMessage::CommandComplete(c) => {
1569                panic!("get command: {}", c);
1570            }
1571            _ => {
1572                panic!("what?");
1573            }
1574        };
1575        val
1576    }
1577}
1578
1579impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1580    type Error = PostgresSourceError;
1581
1582    #[throws(PostgresSourceError)]
1583    fn produce(&'r mut self) -> Option<Decimal> {
1584        let (ridx, cidx) = self.next_loc()?;
1585        let val = match &self.rows[ridx] {
1586            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1587                Some("Infinity") => Some(Decimal::MAX),
1588                Some("-Infinity") => Some(Decimal::MIN),
1589                Some(s) => Some(
1590                    s.parse()
1591                        .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1592                ),
1593                None => None,
1594            },
1595            SimpleQueryMessage::CommandComplete(c) => {
1596                panic!("get command: {}", c);
1597            }
1598            _ => {
1599                panic!("what?");
1600            }
1601        };
1602        val
1603    }
1604}
1605
1606impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1607    type Error = PostgresSourceError;
1608
1609    #[throws(PostgresSourceError)]
1610    fn produce(&'r mut self) -> &'r str {
1611        let (ridx, cidx) = self.next_loc()?;
1612        let val = match &self.rows[ridx] {
1613            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1614                Some(s) => s,
1615                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1616            },
1617            SimpleQueryMessage::CommandComplete(c) => {
1618                panic!("get command: {}", c);
1619            }
1620            _ => {
1621                panic!("what?");
1622            }
1623        };
1624        val
1625    }
1626}
1627
1628impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1629    type Error = PostgresSourceError;
1630
1631    #[throws(PostgresSourceError)]
1632    fn produce(&'r mut self) -> Option<&'r str> {
1633        let (ridx, cidx) = self.next_loc()?;
1634        let val = match &self.rows[ridx] {
1635            SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1636            SimpleQueryMessage::CommandComplete(c) => {
1637                panic!("get command: {}", c);
1638            }
1639            _ => {
1640                panic!("what?");
1641            }
1642        };
1643        val
1644    }
1645}
1646
1647impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1648    type Error = PostgresSourceError;
1649
1650    #[throws(PostgresSourceError)]
1651    fn produce(&'r mut self) -> Vec<u8> {
1652        let (ridx, cidx) = self.next_loc()?;
1653        let val = match &self.rows[ridx] {
1654            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1655                Some(s) => {
1656                    let mut res = s.chars();
1657                    res.next();
1658                    res.next();
1659                    decode(
1660                        res.enumerate()
1661                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1662                            .chars()
1663                            .map(|c| c as u8)
1664                            .collect::<Vec<u8>>(),
1665                    )?
1666                }
1667                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1668            },
1669            SimpleQueryMessage::CommandComplete(c) => {
1670                panic!("get command: {}", c);
1671            }
1672            _ => {
1673                panic!("what?");
1674            }
1675        };
1676        val
1677    }
1678}
1679
1680impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1681    type Error = PostgresSourceError;
1682
1683    #[throws(PostgresSourceError)]
1684    fn produce(&'r mut self) -> Option<Vec<u8>> {
1685        let (ridx, cidx) = self.next_loc()?;
1686        let val = match &self.rows[ridx] {
1687            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1688                Some(s) => {
1689                    let mut res = s.chars();
1690                    res.next();
1691                    res.next();
1692                    Some(decode(
1693                        res.enumerate()
1694                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1695                            .chars()
1696                            .map(|c| c as u8)
1697                            .collect::<Vec<u8>>(),
1698                    )?)
1699                }
1700                None => None,
1701            },
1702            SimpleQueryMessage::CommandComplete(c) => {
1703                panic!("get command: {}", c);
1704            }
1705            _ => {
1706                panic!("what?");
1707            }
1708        };
1709        val
1710    }
1711}
1712
1713fn rem_first_and_last(value: &str) -> &str {
1714    let mut chars = value.chars();
1715    chars.next();
1716    chars.next_back();
1717    chars.as_str()
1718}
1719
1720macro_rules! impl_simple_vec_produce {
1721    ($($t: ty,)+) => {
1722        $(
1723            impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1724                type Error = PostgresSourceError;
1725
1726                #[throws(PostgresSourceError)]
1727                fn produce(&'r mut self) -> Vec<Option<$t>> {
1728                    let (ridx, cidx) = self.next_loc()?;
1729                    let val = match &self.rows[ridx] {
1730                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1731                            Some(s) => match s{
1732                                "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1733                                "{}" => vec![],
1734                                _ => rem_first_and_last(s).split(",").map(|v| {
1735                                    if v == "NULL" {
1736                                        Ok(None)
1737                                    } else {
1738                                        match v.parse() {
1739                                            Ok(v) => Ok(Some(v)),
1740                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1741                                        }
1742                                    }
1743                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1744                            },
1745                            None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1746                        },
1747                        SimpleQueryMessage::CommandComplete(c) => {
1748                            panic!("get command: {}", c);
1749                        }
1750                        _ => {
1751                            panic!("what?");
1752                        }
1753                    };
1754                    val
1755                }
1756            }
1757
1758            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1759                type Error = PostgresSourceError;
1760
1761                #[throws(PostgresSourceError)]
1762                fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1763                    let (ridx, cidx) = self.next_loc()?;
1764                    let val = match &self.rows[ridx] {
1765
1766                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1767                            Some(s) => match s{
1768                                "" => None,
1769                                "{}" => Some(vec![]),
1770                                _ => Some(rem_first_and_last(s).split(",").map(|v| {
1771                                    if v == "NULL" {
1772                                        Ok(None)
1773                                    } else {
1774                                        match v.parse() {
1775                                            Ok(v) => Ok(Some(v)),
1776                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1777                                        }
1778                                    }
1779                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1780                            },
1781                            None => None,
1782                        },
1783
1784                        SimpleQueryMessage::CommandComplete(c) => {
1785                            panic!("get command: {}", c);
1786                        }
1787                        _ => {
1788                            panic!("what?");
1789                        }
1790                    };
1791                    val
1792                }
1793            }
1794        )+
1795    };
1796}
1797impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1798
1799impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1800    type Error = PostgresSourceError;
1801
1802    #[throws(PostgresSourceError)]
1803    fn produce(&'r mut self) -> Vec<Option<bool>> {
1804        let (ridx, cidx) = self.next_loc()?;
1805        let val = match &self.rows[ridx] {
1806            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1807                Some(s) => match s {
1808                    "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1809                    "{}" => vec![],
1810                    _ => rem_first_and_last(s)
1811                        .split(',')
1812                        .map(|token| match token {
1813                            "NULL" => Ok(None),
1814                            "t" => Ok(Some(true)),
1815                            "f" => Ok(Some(false)),
1816                            _ => {
1817                                throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1818                            }
1819                        })
1820                        .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1821                },
1822                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1823            },
1824            SimpleQueryMessage::CommandComplete(c) => {
1825                panic!("get command: {}", c);
1826            }
1827            _ => {
1828                panic!("what?");
1829            }
1830        };
1831        val
1832    }
1833}
1834
1835impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1836    type Error = PostgresSourceError;
1837
1838    #[throws(PostgresSourceError)]
1839    fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1840        let (ridx, cidx) = self.next_loc()?;
1841        let val = match &self.rows[ridx] {
1842            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1843                Some(s) => match s {
1844                    "" => None,
1845                    "{}" => Some(vec![]),
1846                    _ => Some(
1847                        rem_first_and_last(s)
1848                            .split(',')
1849                            .map(|token| match token {
1850                                "NULL" => Ok(None),
1851                                "t" => Ok(Some(true)),
1852                                "f" => Ok(Some(false)),
1853                                _ => {
1854                                    throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1855                                        s.into()
1856                                    )))
1857                                }
1858                            })
1859                            .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1860                    ),
1861                },
1862                None => None,
1863            },
1864            SimpleQueryMessage::CommandComplete(c) => {
1865                panic!("get command: {}", c);
1866            }
1867            _ => {
1868                panic!("what?");
1869            }
1870        };
1871        val
1872    }
1873}
1874
1875impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1876    type Error = PostgresSourceError;
1877
1878    #[throws(PostgresSourceError)]
1879    fn produce(&'r mut self) -> NaiveDate {
1880        let (ridx, cidx) = self.next_loc()?;
1881        let val = match &self.rows[ridx] {
1882            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1883                Some(s) => match s {
1884                    "infinity" => NaiveDate::MAX,
1885                    "-infinity" => NaiveDate::MIN,
1886                    s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1887                        ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1888                    })?,
1889                },
1890                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1891            },
1892            SimpleQueryMessage::CommandComplete(c) => {
1893                panic!("get command: {}", c);
1894            }
1895            _ => {
1896                panic!("what?");
1897            }
1898        };
1899        val
1900    }
1901}
1902
1903impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1904    type Error = PostgresSourceError;
1905
1906    #[throws(PostgresSourceError)]
1907    fn produce(&'r mut self) -> Option<NaiveDate> {
1908        let (ridx, cidx) = self.next_loc()?;
1909        let val = match &self.rows[ridx] {
1910            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1911                Some(s) => match s {
1912                    "infinity" => Some(NaiveDate::MAX),
1913                    "-infinity" => Some(NaiveDate::MIN),
1914                    s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1915                        ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1916                    })?),
1917                },
1918                None => None,
1919            },
1920            SimpleQueryMessage::CommandComplete(c) => {
1921                panic!("get command: {}", c);
1922            }
1923            _ => {
1924                panic!("what?");
1925            }
1926        };
1927        val
1928    }
1929}
1930
1931impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1932    type Error = PostgresSourceError;
1933
1934    #[throws(PostgresSourceError)]
1935    fn produce(&'r mut self) -> NaiveTime {
1936        let (ridx, cidx) = self.next_loc()?;
1937        let val = match &self.rows[ridx] {
1938            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1939                Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1940                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1941                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1942            },
1943            SimpleQueryMessage::CommandComplete(c) => {
1944                panic!("get command: {}", c);
1945            }
1946            _ => {
1947                panic!("what?");
1948            }
1949        };
1950        val
1951    }
1952}
1953
1954impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1955    type Error = PostgresSourceError;
1956
1957    #[throws(PostgresSourceError)]
1958    fn produce(&'r mut self) -> Option<NaiveTime> {
1959        let (ridx, cidx) = self.next_loc()?;
1960        let val = match &self.rows[ridx] {
1961            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1962                Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1963                    ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1964                })?),
1965                None => None,
1966            },
1967            SimpleQueryMessage::CommandComplete(c) => {
1968                panic!("get command: {}", c);
1969            }
1970            _ => {
1971                panic!("what?");
1972            }
1973        };
1974        val
1975    }
1976}
1977
1978impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1979    type Error = PostgresSourceError;
1980
1981    #[throws(PostgresSourceError)]
1982    fn produce(&'r mut self) -> NaiveDateTime {
1983        let (ridx, cidx) = self.next_loc()?;
1984        let val =
1985            match &self.rows[ridx] {
1986                SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1987                    Some(s) => match s {
1988                        "infinity" => NaiveDateTime::MAX,
1989                        "-infinity" => NaiveDateTime::MIN,
1990                        s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1991                            |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1992                        )?,
1993                    },
1994                    None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1995                },
1996                SimpleQueryMessage::CommandComplete(c) => {
1997                    panic!("get command: {}", c);
1998                }
1999                _ => {
2000                    panic!("what?");
2001                }
2002            };
2003        val
2004    }
2005}
2006
2007impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2008    type Error = PostgresSourceError;
2009
2010    #[throws(PostgresSourceError)]
2011    fn produce(&'r mut self) -> Option<NaiveDateTime> {
2012        let (ridx, cidx) = self.next_loc()?;
2013        let val = match &self.rows[ridx] {
2014            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2015                Some(s) => match s {
2016                    "infinity" => Some(NaiveDateTime::MAX),
2017                    "-infinity" => Some(NaiveDateTime::MIN),
2018                    s => Some(
2019                        NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2020                            ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2021                        })?,
2022                    ),
2023                },
2024                None => None,
2025            },
2026            SimpleQueryMessage::CommandComplete(c) => {
2027                panic!("get command: {}", c);
2028            }
2029            _ => {
2030                panic!("what?");
2031            }
2032        };
2033        val
2034    }
2035}
2036
2037impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2038    type Error = PostgresSourceError;
2039
2040    #[throws(PostgresSourceError)]
2041    fn produce(&'r mut self) -> DateTime<Utc> {
2042        let (ridx, cidx) = self.next_loc()?;
2043        let val = match &self.rows[ridx] {
2044            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2045                Some("infinity") => DateTime::<Utc>::MAX_UTC,
2046                Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2047                Some(s) => {
2048                    let time_string = format!("{}:00", s).to_owned();
2049                    let slice: &str = &time_string[..];
2050                    let time: DateTime<FixedOffset> =
2051                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2052
2053                    time.with_timezone(&Utc)
2054                }
2055                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2056            },
2057            SimpleQueryMessage::CommandComplete(c) => {
2058                panic!("get command: {}", c);
2059            }
2060            _ => {
2061                panic!("what?");
2062            }
2063        };
2064        val
2065    }
2066}
2067
2068impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2069    type Error = PostgresSourceError;
2070
2071    #[throws(PostgresSourceError)]
2072    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2073        let (ridx, cidx) = self.next_loc()?;
2074        let val = match &self.rows[ridx] {
2075            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2076                Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2077                Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2078                Some(s) => {
2079                    let time_string = format!("{}:00", s).to_owned();
2080                    let slice: &str = &time_string[..];
2081                    let time: DateTime<FixedOffset> =
2082                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2083
2084                    Some(time.with_timezone(&Utc))
2085                }
2086                None => None,
2087            },
2088            SimpleQueryMessage::CommandComplete(c) => {
2089                panic!("get command: {}", c);
2090            }
2091            _ => {
2092                panic!("what?");
2093            }
2094        };
2095        val
2096    }
2097}