connectorx/sources/postgres/
mod.rs

1//! Source implementation for Postgres database, including the TLS support (client only).
2
3mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8pub use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use pgvector::{Bit, HalfVector, SparseVector, Vector};
11pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
12
13use crate::constants::DB_BUFFER_SIZE;
14use crate::{
15    data_order::DataOrder,
16    errors::ConnectorXError,
17    sources::{PartitionParser, Produce, Source, SourcePartition},
18    sql::{count_query, CXQuery},
19};
20use anyhow::anyhow;
21use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
22use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
23use fehler::{throw, throws};
24use hex::decode;
25use postgres::{
26    binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
27    fallible_iterator::FallibleIterator,
28    tls::{MakeTlsConnect, TlsConnect},
29    Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
30};
31use r2d2::{Pool, PooledConnection};
32use r2d2_postgres::PostgresConnectionManager;
33use rust_decimal::Decimal;
34use serde_json::{from_str, Value};
35use sqlparser::dialect::PostgreSqlDialect;
36use std::collections::HashMap;
37use std::convert::TryFrom;
38use std::marker::PhantomData;
39use uuid::Uuid;
40
41/// Protocol - Binary based bulk load
42pub enum BinaryProtocol {}
43
44/// Protocol - CSV based bulk load
45pub enum CSVProtocol {}
46
47/// Protocol - use Cursor
48pub enum CursorProtocol {}
49
50/// Protocol - use Simple Query
51pub enum SimpleProtocol {}
52
53type PgManager<C> = PostgresConnectionManager<C>;
54type PgConn<C> = PooledConnection<PgManager<C>>;
55
56macro_rules! impl_produce_unimplemented {
57    ($(($protocol: ty, $t: ty, $msg: expr),)+) => {
58        $(
59            impl<'r> Produce<'r, $t> for $protocol {
60                type Error = PostgresSourceError;
61
62                #[throws(PostgresSourceError)]
63                fn produce(&'r mut self) -> $t {
64                   unimplemented!($msg);
65                }
66            }
67
68            impl<'r> Produce<'r, Option<$t>> for $protocol {
69                type Error = PostgresSourceError;
70
71                #[throws(PostgresSourceError)]
72                fn produce(&'r mut self) -> Option<$t> {
73                   unimplemented!($msg);
74                }
75            }
76        )+
77    };
78}
79
80impl_produce_unimplemented!(
81    (PostgresCSVSourceParser<'_>, HashMap<String, Option<String>>, "Please use `cursor` protocol for hstore type"),
82    (PostgresCSVSourceParser<'_>, Vector, "Please use `binary` protocol for vector type"),
83    (PostgresCSVSourceParser<'_>, HalfVector, "Please use `binary` protocol for halfvector type"),
84    (PostgresCSVSourceParser<'_>, Bit, "Please use `binary` protocol for bit type"),
85    (PostgresCSVSourceParser<'_>, SparseVector, "Please use `binary` protocol for sparsevector type"),
86
87
88    (PostgresSimpleSourceParser,HashMap<String, Option<String>>, "unimplemented"),
89    (PostgresSimpleSourceParser,Value, "unimplemented"),
90    (PostgresSimpleSourceParser, Vector, "Please use `binary` protocol for vector type"),
91    (PostgresSimpleSourceParser, HalfVector, "Please use `binary` protocol for halfvector type"),
92    (PostgresSimpleSourceParser, Bit, "Please use `binary` protocol for bit type"),
93    (PostgresSimpleSourceParser, SparseVector, "Please use `binary` protocol for sparsevector type"),
94
95);
96
97// take a row and unwrap the interior field from column 0
98fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
99    let nrows: Option<R> = row.get(0);
100    nrows.expect("Could not parse int result from count_query")
101}
102
103#[throws(PostgresSourceError)]
104fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
105where
106    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
107    C::TlsConnect: Send,
108    C::Stream: Send,
109    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
110{
111    let dialect = PostgreSqlDialect {};
112
113    let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
114    let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
115    match col_type {
116        PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
117        PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
118        PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
119        _ => throw!(anyhow!(
120            "The result of the count query was not an int, aborting."
121        )),
122    }
123}
124
125pub struct PostgresSource<P, C>
126where
127    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
128    C::TlsConnect: Send,
129    C::Stream: Send,
130    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
131{
132    pool: Pool<PgManager<C>>,
133    origin_query: Option<String>,
134    queries: Vec<CXQuery<String>>,
135    names: Vec<String>,
136    schema: Vec<PostgresTypeSystem>,
137    pg_schema: Vec<postgres::types::Type>,
138    pre_execution_queries: Option<Vec<String>>,
139    _protocol: PhantomData<P>,
140}
141
142impl<P, C> PostgresSource<P, C>
143where
144    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
145    C::TlsConnect: Send,
146    C::Stream: Send,
147    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
148{
149    #[throws(PostgresSourceError)]
150    pub fn new(config: Config, tls: C, nconn: usize) -> Self {
151        let manager = PostgresConnectionManager::new(config, tls);
152        let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
153
154        Self {
155            pool,
156            origin_query: None,
157            queries: vec![],
158            names: vec![],
159            schema: vec![],
160            pg_schema: vec![],
161            pre_execution_queries: None,
162            _protocol: PhantomData,
163        }
164    }
165}
166
167impl<P, C> Source for PostgresSource<P, C>
168where
169    PostgresSourcePartition<P, C>:
170        SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
171    P: Send,
172    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
173    C::TlsConnect: Send,
174    C::Stream: Send,
175    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
176{
177    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
178    type Partition = PostgresSourcePartition<P, C>;
179    type TypeSystem = PostgresTypeSystem;
180    type Error = PostgresSourceError;
181
182    #[throws(PostgresSourceError)]
183    fn set_data_order(&mut self, data_order: DataOrder) {
184        if !matches!(data_order, DataOrder::RowMajor) {
185            throw!(ConnectorXError::UnsupportedDataOrder(data_order));
186        }
187    }
188
189    fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
190        self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
191    }
192
193    fn set_origin_query(&mut self, query: Option<String>) {
194        self.origin_query = query;
195    }
196
197    fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
198        self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
199    }
200
201    #[throws(PostgresSourceError)]
202    fn fetch_metadata(&mut self) {
203        assert!(!self.queries.is_empty());
204
205        let mut conn = self.pool.get()?;
206        let first_query = &self.queries[0];
207
208        let stmt = conn.prepare(first_query.as_str())?;
209
210        let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
211            .columns()
212            .iter()
213            .map(|col| (col.name().to_string(), col.type_().clone()))
214            .unzip();
215
216        self.names = names;
217        self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
218        self.pg_schema = self
219            .schema
220            .iter()
221            .zip(pg_types.iter())
222            .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
223            .collect();
224    }
225
226    #[throws(PostgresSourceError)]
227    fn result_rows(&mut self) -> Option<usize> {
228        match &self.origin_query {
229            Some(q) => {
230                let cxq = CXQuery::Naked(q.clone());
231                let mut conn = self.pool.get()?;
232                let nrows = get_total_rows(&mut conn, &cxq)?;
233                Some(nrows)
234            }
235            None => None,
236        }
237    }
238
239    fn names(&self) -> Vec<String> {
240        self.names.clone()
241    }
242
243    fn schema(&self) -> Vec<Self::TypeSystem> {
244        self.schema.clone()
245    }
246
247    #[throws(PostgresSourceError)]
248    fn partition(self) -> Vec<Self::Partition> {
249        let mut ret = vec![];
250        for query in self.queries {
251            let mut conn = self.pool.get()?;
252
253            if let Some(pre_queries) = &self.pre_execution_queries {
254                for pre_query in pre_queries {
255                    conn.query(pre_query, &[])?;
256                }
257            }
258
259            ret.push(PostgresSourcePartition::<P, C>::new(
260                conn,
261                &query,
262                &self.schema,
263                &self.pg_schema,
264            ));
265        }
266        ret
267    }
268}
269
270pub struct PostgresSourcePartition<P, C>
271where
272    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
273    C::TlsConnect: Send,
274    C::Stream: Send,
275    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
276{
277    conn: PgConn<C>,
278    query: CXQuery<String>,
279    schema: Vec<PostgresTypeSystem>,
280    pg_schema: Vec<postgres::types::Type>,
281    nrows: usize,
282    ncols: usize,
283    _protocol: PhantomData<P>,
284}
285
286impl<P, C> PostgresSourcePartition<P, C>
287where
288    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
289    C::TlsConnect: Send,
290    C::Stream: Send,
291    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
292{
293    pub fn new(
294        conn: PgConn<C>,
295        query: &CXQuery<String>,
296        schema: &[PostgresTypeSystem],
297        pg_schema: &[postgres::types::Type],
298    ) -> Self {
299        Self {
300            conn,
301            query: query.clone(),
302            schema: schema.to_vec(),
303            pg_schema: pg_schema.to_vec(),
304            nrows: 0,
305            ncols: schema.len(),
306            _protocol: PhantomData,
307        }
308    }
309}
310
311impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
312where
313    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
314    C::TlsConnect: Send,
315    C::Stream: Send,
316    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
317{
318    type TypeSystem = PostgresTypeSystem;
319    type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
320    type Error = PostgresSourceError;
321
322    #[throws(PostgresSourceError)]
323    fn result_rows(&mut self) -> () {
324        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
325    }
326
327    #[throws(PostgresSourceError)]
328    fn parser(&mut self) -> Self::Parser<'_> {
329        let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
330        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
331        let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
332
333        PostgresBinarySourcePartitionParser::new(iter, &self.schema)
334    }
335
336    fn nrows(&self) -> usize {
337        self.nrows
338    }
339
340    fn ncols(&self) -> usize {
341        self.ncols
342    }
343}
344
345impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
346where
347    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
348    C::TlsConnect: Send,
349    C::Stream: Send,
350    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
351{
352    type TypeSystem = PostgresTypeSystem;
353    type Parser<'a> = PostgresCSVSourceParser<'a>;
354    type Error = PostgresSourceError;
355
356    #[throws(PostgresSourceError)]
357    fn result_rows(&mut self) {
358        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
359    }
360
361    #[throws(PostgresSourceError)]
362    fn parser(&mut self) -> Self::Parser<'_> {
363        let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
364        let reader = self.conn.copy_out(&*query)?; // unless reading the data, it seems like issue the query is fast
365        let iter = ReaderBuilder::new()
366            .has_headers(false)
367            .from_reader(reader)
368            .into_records();
369
370        PostgresCSVSourceParser::new(iter, &self.schema)
371    }
372
373    fn nrows(&self) -> usize {
374        self.nrows
375    }
376
377    fn ncols(&self) -> usize {
378        self.ncols
379    }
380}
381
382impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
383where
384    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
385    C::TlsConnect: Send,
386    C::Stream: Send,
387    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
388{
389    type TypeSystem = PostgresTypeSystem;
390    type Parser<'a> = PostgresRawSourceParser<'a>;
391    type Error = PostgresSourceError;
392
393    #[throws(PostgresSourceError)]
394    fn result_rows(&mut self) {
395        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
396    }
397
398    #[throws(PostgresSourceError)]
399    fn parser(&mut self) -> Self::Parser<'_> {
400        let iter = self
401            .conn
402            .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; // unless reading the data, it seems like issue the query is fast
403        PostgresRawSourceParser::new(iter, &self.schema)
404    }
405
406    fn nrows(&self) -> usize {
407        self.nrows
408    }
409
410    fn ncols(&self) -> usize {
411        self.ncols
412    }
413}
414pub struct PostgresBinarySourcePartitionParser<'a> {
415    iter: BinaryCopyOutIter<'a>,
416    rowbuf: Vec<BinaryCopyOutRow>,
417    ncols: usize,
418    current_col: usize,
419    current_row: usize,
420    is_finished: bool,
421}
422
423impl<'a> PostgresBinarySourcePartitionParser<'a> {
424    pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
425        Self {
426            iter,
427            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
428            ncols: schema.len(),
429            current_row: 0,
430            current_col: 0,
431            is_finished: false,
432        }
433    }
434
435    #[throws(PostgresSourceError)]
436    fn next_loc(&mut self) -> (usize, usize) {
437        let ret = (self.current_row, self.current_col);
438        self.current_row += (self.current_col + 1) / self.ncols;
439        self.current_col = (self.current_col + 1) % self.ncols;
440        ret
441    }
442}
443
444impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
445    type TypeSystem = PostgresTypeSystem;
446    type Error = PostgresSourceError;
447
448    #[throws(PostgresSourceError)]
449    fn fetch_next(&mut self) -> (usize, bool) {
450        assert!(self.current_col == 0);
451        let remaining_rows = self.rowbuf.len() - self.current_row;
452        if remaining_rows > 0 {
453            return (remaining_rows, self.is_finished);
454        } else if self.is_finished {
455            return (0, self.is_finished);
456        }
457
458        // clear the buffer
459        if !self.rowbuf.is_empty() {
460            self.rowbuf.drain(..);
461        }
462        for _ in 0..DB_BUFFER_SIZE {
463            match self.iter.next()? {
464                Some(row) => {
465                    self.rowbuf.push(row);
466                }
467                None => {
468                    self.is_finished = true;
469                    break;
470                }
471            }
472        }
473
474        // reset current cursor positions
475        self.current_row = 0;
476        self.current_col = 0;
477
478        (self.rowbuf.len(), self.is_finished)
479    }
480}
481
482macro_rules! impl_produce {
483    ($($t: ty,)+) => {
484        $(
485            impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
486                type Error = PostgresSourceError;
487
488                #[throws(PostgresSourceError)]
489                fn produce(&'r mut self) -> $t {
490                    let (ridx, cidx) = self.next_loc()?;
491                    let row = &self.rowbuf[ridx];
492                    let val = row.try_get(cidx)?;
493                    val
494                }
495            }
496
497            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
498                type Error = PostgresSourceError;
499
500                #[throws(PostgresSourceError)]
501                fn produce(&'r mut self) -> Option<$t> {
502                    let (ridx, cidx) = self.next_loc()?;
503                    let row = &self.rowbuf[ridx];
504                    let val = row.try_get(cidx)?;
505                    val
506                }
507            }
508        )+
509    };
510}
511
512impl_produce!(
513    i8,
514    i16,
515    i32,
516    i64,
517    f32,
518    f64,
519    Decimal,
520    bool,
521    &'r str,
522    Vec<u8>,
523    NaiveTime,
524    Uuid,
525    Value,
526    IpInet,
527    Vector,
528    HalfVector,
529    Bit,
530    SparseVector,
531    Vec<Option<bool>>,
532    Vec<Option<i16>>,
533    Vec<Option<i32>>,
534    Vec<Option<i64>>,
535    Vec<Option<Decimal>>,
536    Vec<Option<f32>>,
537    Vec<Option<f64>>,
538    Vec<Option<String>>,
539);
540
541impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
542    type Error = PostgresSourceError;
543
544    #[throws(PostgresSourceError)]
545    fn produce(&'r mut self) -> NaiveDateTime {
546        let (ridx, cidx) = self.next_loc()?;
547        let row = &self.rowbuf[ridx];
548        let val = row.try_get(cidx)?;
549        match val {
550            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
551            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
552            postgres::types::Timestamp::Value(t) => t,
553        }
554    }
555}
556
557impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
558    type Error = PostgresSourceError;
559
560    #[throws(PostgresSourceError)]
561    fn produce(&'r mut self) -> Option<NaiveDateTime> {
562        let (ridx, cidx) = self.next_loc()?;
563        let row = &self.rowbuf[ridx];
564        let val = row.try_get(cidx)?;
565        match val {
566            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
567            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
568            Some(postgres::types::Timestamp::Value(t)) => t,
569            None => None,
570        }
571    }
572}
573
574impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
575    type Error = PostgresSourceError;
576
577    #[throws(PostgresSourceError)]
578    fn produce(&'r mut self) -> DateTime<Utc> {
579        let (ridx, cidx) = self.next_loc()?;
580        let row = &self.rowbuf[ridx];
581        let val = row.try_get(cidx)?;
582        match val {
583            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
584            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
585            postgres::types::Timestamp::Value(t) => t,
586        }
587    }
588}
589
590impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
591    type Error = PostgresSourceError;
592
593    #[throws(PostgresSourceError)]
594    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
595        let (ridx, cidx) = self.next_loc()?;
596        let row = &self.rowbuf[ridx];
597        let val = row.try_get(cidx)?;
598        match val {
599            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
600            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
601            Some(postgres::types::Timestamp::Value(t)) => t,
602            None => None,
603        }
604    }
605}
606
607impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
608    type Error = PostgresSourceError;
609
610    #[throws(PostgresSourceError)]
611    fn produce(&'r mut self) -> NaiveDate {
612        let (ridx, cidx) = self.next_loc()?;
613        let row = &self.rowbuf[ridx];
614        let val = row.try_get(cidx)?;
615        match val {
616            postgres::types::Date::PosInfinity => NaiveDate::MAX,
617            postgres::types::Date::NegInfinity => NaiveDate::MIN,
618            postgres::types::Date::Value(t) => t,
619        }
620    }
621}
622
623impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
624    type Error = PostgresSourceError;
625
626    #[throws(PostgresSourceError)]
627    fn produce(&'r mut self) -> Option<NaiveDate> {
628        let (ridx, cidx) = self.next_loc()?;
629        let row = &self.rowbuf[ridx];
630        let val = row.try_get(cidx)?;
631        match val {
632            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
633            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
634            Some(postgres::types::Date::Value(t)) => t,
635            None => None,
636        }
637    }
638}
639
640impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
641    type Error = PostgresSourceError;
642    #[throws(PostgresSourceError)]
643    fn produce(&mut self) -> HashMap<String, Option<String>> {
644        unimplemented!("Please use `cursor` protocol for hstore type");
645    }
646}
647
648impl Produce<'_, Option<HashMap<String, Option<String>>>>
649    for PostgresBinarySourcePartitionParser<'_>
650{
651    type Error = PostgresSourceError;
652    #[throws(PostgresSourceError)]
653    fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
654        unimplemented!("Please use `cursor` protocol for hstore type");
655    }
656}
657
658pub struct PostgresCSVSourceParser<'a> {
659    iter: StringRecordsIntoIter<CopyOutReader<'a>>,
660    rowbuf: Vec<StringRecord>,
661    ncols: usize,
662    current_col: usize,
663    current_row: usize,
664    is_finished: bool,
665}
666
667impl<'a> PostgresCSVSourceParser<'a> {
668    pub fn new(
669        iter: StringRecordsIntoIter<CopyOutReader<'a>>,
670        schema: &[PostgresTypeSystem],
671    ) -> Self {
672        Self {
673            iter,
674            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
675            ncols: schema.len(),
676            current_row: 0,
677            current_col: 0,
678            is_finished: false,
679        }
680    }
681
682    #[throws(PostgresSourceError)]
683    fn next_loc(&mut self) -> (usize, usize) {
684        let ret = (self.current_row, self.current_col);
685        self.current_row += (self.current_col + 1) / self.ncols;
686        self.current_col = (self.current_col + 1) % self.ncols;
687        ret
688    }
689}
690
691impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
692    type Error = PostgresSourceError;
693    type TypeSystem = PostgresTypeSystem;
694
695    #[throws(PostgresSourceError)]
696    fn fetch_next(&mut self) -> (usize, bool) {
697        assert!(self.current_col == 0);
698        let remaining_rows = self.rowbuf.len() - self.current_row;
699        if remaining_rows > 0 {
700            return (remaining_rows, self.is_finished);
701        } else if self.is_finished {
702            return (0, self.is_finished);
703        }
704
705        if !self.rowbuf.is_empty() {
706            self.rowbuf.drain(..);
707        }
708        for _ in 0..DB_BUFFER_SIZE {
709            if let Some(row) = self.iter.next() {
710                self.rowbuf.push(row?);
711            } else {
712                self.is_finished = true;
713                break;
714            }
715        }
716        self.current_row = 0;
717        self.current_col = 0;
718        (self.rowbuf.len(), self.is_finished)
719    }
720}
721
722macro_rules! impl_csv_produce {
723    ($($t: ty,)+) => {
724        $(
725            impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
726                type Error = PostgresSourceError;
727
728                #[throws(PostgresSourceError)]
729                fn produce(&'r mut self) -> $t {
730                    let (ridx, cidx) = self.next_loc()?;
731                    self.rowbuf[ridx][cidx].parse().map_err(|_| {
732                        ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
733                    })?
734                }
735            }
736
737            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
738                type Error = PostgresSourceError;
739
740                #[throws(PostgresSourceError)]
741                fn produce(&'r mut self) -> Option<$t> {
742                    let (ridx, cidx) = self.next_loc()?;
743                    match &self.rowbuf[ridx][cidx][..] {
744                        "" => None,
745                        v => Some(v.parse().map_err(|_| {
746                            ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
747                        })?),
748                    }
749                }
750            }
751        )+
752    };
753}
754
755impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
756
757macro_rules! impl_csv_vec_produce {
758    ($($t: ty,)+) => {
759        $(
760            impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
761                type Error = PostgresSourceError;
762
763                #[throws(PostgresSourceError)]
764                fn produce(&mut self) -> Vec<Option<$t>> {
765                    let (ridx, cidx) = self.next_loc()?;
766                    let s = &self.rowbuf[ridx][cidx][..];
767                    match s {
768                        "{}" => vec![],
769                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
770                        s => s[1..s.len() - 1]
771                            .split(",")
772                            .map(|v| {
773                                if v == "NULL" {
774                                    Ok(None)
775                                } else {
776                                    match v.parse() {
777                                        Ok(v) => Ok(Some(v)),
778                                        Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
779                                    }
780                                }
781                            })
782                            .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
783                    }
784                }
785            }
786
787            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
788                type Error = PostgresSourceError;
789
790                #[throws(PostgresSourceError)]
791                fn produce(&mut self) -> Option<Vec<Option<$t>>> {
792                    let (ridx, cidx) = self.next_loc()?;
793                    let s = &self.rowbuf[ridx][cidx][..];
794                    match s {
795                        "" => None,
796                        "{}" => Some(vec![]),
797                        _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
798                        s => Some(
799                            s[1..s.len() - 1]
800                                .split(",")
801                                .map(|v| {
802                                    if v == "NULL" {
803                                        Ok(None)
804                                    } else {
805                                        match v.parse() {
806                                            Ok(v) => Ok(Some(v)),
807                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
808                                        }
809                                    }
810                                })
811                                .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
812                        ),
813                    }
814                }
815            }
816        )+
817    };
818}
819
820impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
821
822impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
823    type Error = PostgresSourceError;
824
825    #[throws(PostgresSourceError)]
826    fn produce(&mut self) -> bool {
827        let (ridx, cidx) = self.next_loc()?;
828        let ret = match &self.rowbuf[ridx][cidx][..] {
829            "t" => true,
830            "f" => false,
831            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
832                self.rowbuf[ridx][cidx].into()
833            ))),
834        };
835        ret
836    }
837}
838
839impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
840    type Error = PostgresSourceError;
841
842    #[throws(PostgresSourceError)]
843    fn produce(&mut self) -> Option<bool> {
844        let (ridx, cidx) = self.next_loc()?;
845        let ret = match &self.rowbuf[ridx][cidx][..] {
846            "" => None,
847            "t" => Some(true),
848            "f" => Some(false),
849            _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
850                self.rowbuf[ridx][cidx].into()
851            ))),
852        };
853        ret
854    }
855}
856
857impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
858    type Error = PostgresSourceError;
859
860    #[throws(PostgresSourceError)]
861    fn produce(&mut self) -> Vec<Option<bool>> {
862        let (ridx, cidx) = self.next_loc()?;
863        let s = &self.rowbuf[ridx][cidx][..];
864        match s {
865            "{}" => vec![],
866            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
867            s => s[1..s.len() - 1]
868                .split(',')
869                .map(|v| match v {
870                    "NULL" => Ok(None),
871                    "t" => Ok(Some(true)),
872                    "f" => Ok(Some(false)),
873                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
874                })
875                .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
876        }
877    }
878}
879
880impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
881    type Error = PostgresSourceError;
882
883    #[throws(PostgresSourceError)]
884    fn produce(&mut self) -> Option<Vec<Option<bool>>> {
885        let (ridx, cidx) = self.next_loc()?;
886        let s = &self.rowbuf[ridx][cidx][..];
887        match s {
888            "" => None,
889            "{}" => Some(vec![]),
890            _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
891            s => Some(
892                s[1..s.len() - 1]
893                    .split(',')
894                    .map(|v| match v {
895                        "NULL" => Ok(None),
896                        "t" => Ok(Some(true)),
897                        "f" => Ok(Some(false)),
898                        _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
899                    })
900                    .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
901            ),
902        }
903    }
904}
905
906impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
907    type Error = PostgresSourceError;
908
909    #[throws(PostgresSourceError)]
910    fn produce(&'r mut self) -> Decimal {
911        let (ridx, cidx) = self.next_loc()?;
912        match &self.rowbuf[ridx][cidx][..] {
913            "Infinity" => Decimal::MAX,
914            "-Infinity" => Decimal::MIN,
915            v => v
916                .parse()
917                .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
918        }
919    }
920}
921
922impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
923    type Error = PostgresSourceError;
924
925    #[throws(PostgresSourceError)]
926    fn produce(&'r mut self) -> Option<Decimal> {
927        let (ridx, cidx) = self.next_loc()?;
928        match &self.rowbuf[ridx][cidx][..] {
929            "" => None,
930            "Infinity" => Some(Decimal::MAX),
931            "-Infinity" => Some(Decimal::MIN),
932            v => Some(
933                v.parse()
934                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
935            ),
936        }
937    }
938}
939
940impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
941    type Error = PostgresSourceError;
942
943    #[throws(PostgresSourceError)]
944    fn produce(&mut self) -> DateTime<Utc> {
945        let (ridx, cidx) = self.next_loc()?;
946        match &self.rowbuf[ridx][cidx][..] {
947            "infinity" => DateTime::<Utc>::MAX_UTC,
948            "-infinity" => DateTime::<Utc>::MIN_UTC,
949            // postgres csv return example: 1970-01-01 00:00:01+00
950            v => format!("{}:00", v)
951                .parse()
952                .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
953        }
954    }
955}
956
957impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
958    type Error = PostgresSourceError;
959
960    #[throws(PostgresSourceError)]
961    fn produce(&mut self) -> Option<DateTime<Utc>> {
962        let (ridx, cidx) = self.next_loc()?;
963        match &self.rowbuf[ridx][cidx][..] {
964            "" => None,
965            "infinity" => Some(DateTime::<Utc>::MAX_UTC),
966            "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
967            v => {
968                // postgres csv return example: 1970-01-01 00:00:01+00
969                Some(format!("{}:00", v).parse().map_err(|_| {
970                    ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
971                })?)
972            }
973        }
974    }
975}
976
977impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
978    type Error = PostgresSourceError;
979
980    #[throws(PostgresSourceError)]
981    fn produce(&mut self) -> NaiveDate {
982        let (ridx, cidx) = self.next_loc()?;
983        match &self.rowbuf[ridx][cidx][..] {
984            "infinity" => NaiveDate::MAX,
985            "-infinity" => NaiveDate::MIN,
986            v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
987                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
988        }
989    }
990}
991
992impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
993    type Error = PostgresSourceError;
994
995    #[throws(PostgresSourceError)]
996    fn produce(&mut self) -> Option<NaiveDate> {
997        let (ridx, cidx) = self.next_loc()?;
998        match &self.rowbuf[ridx][cidx][..] {
999            "" => None,
1000            "infinity" => Some(NaiveDate::MAX),
1001            "-infinity" => Some(NaiveDate::MIN),
1002            v => Some(
1003                NaiveDate::parse_from_str(v, "%Y-%m-%d")
1004                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
1005            ),
1006        }
1007    }
1008}
1009
1010impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
1011    type Error = PostgresSourceError;
1012
1013    #[throws(PostgresSourceError)]
1014    fn produce(&mut self) -> NaiveDateTime {
1015        let (ridx, cidx) = self.next_loc()?;
1016        match &self.rowbuf[ridx][cidx] {
1017            "infinity" => NaiveDateTime::MAX,
1018            "-infinity" => NaiveDateTime::MIN,
1019            v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
1020                .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
1021        }
1022    }
1023}
1024
1025impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
1026    type Error = PostgresSourceError;
1027
1028    #[throws(PostgresSourceError)]
1029    fn produce(&mut self) -> Option<NaiveDateTime> {
1030        let (ridx, cidx) = self.next_loc()?;
1031        match &self.rowbuf[ridx][cidx][..] {
1032            "" => None,
1033            "infinity" => Some(NaiveDateTime::MAX),
1034            "-infinity" => Some(NaiveDateTime::MIN),
1035            v => Some(
1036                NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1037                    ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1038                })?,
1039            ),
1040        }
1041    }
1042}
1043
1044impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1045    type Error = PostgresSourceError;
1046
1047    #[throws(PostgresSourceError)]
1048    fn produce(&mut self) -> NaiveTime {
1049        let (ridx, cidx) = self.next_loc()?;
1050        NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1051            ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1052        })?
1053    }
1054}
1055
1056impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1057    type Error = PostgresSourceError;
1058
1059    #[throws(PostgresSourceError)]
1060    fn produce(&mut self) -> Option<NaiveTime> {
1061        let (ridx, cidx) = self.next_loc()?;
1062        match &self.rowbuf[ridx][cidx][..] {
1063            "" => None,
1064            v => Some(
1065                NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1066                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1067            ),
1068        }
1069    }
1070}
1071
1072impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1073    type Error = PostgresSourceError;
1074
1075    #[throws(PostgresSourceError)]
1076    fn produce(&'r mut self) -> &'r str {
1077        let (ridx, cidx) = self.next_loc()?;
1078        &self.rowbuf[ridx][cidx]
1079    }
1080}
1081
1082impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1083    type Error = PostgresSourceError;
1084
1085    #[throws(PostgresSourceError)]
1086    fn produce(&'r mut self) -> Option<&'r str> {
1087        let (ridx, cidx) = self.next_loc()?;
1088        match &self.rowbuf[ridx][cidx][..] {
1089            "" => None,
1090            v => Some(v),
1091        }
1092    }
1093}
1094
1095impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1096    type Error = PostgresSourceError;
1097
1098    #[throws(PostgresSourceError)]
1099    fn produce(&'r mut self) -> Vec<u8> {
1100        let (ridx, cidx) = self.next_loc()?;
1101        decode(&self.rowbuf[ridx][cidx][2..])? // escape \x in the beginning
1102    }
1103}
1104
1105impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1106    type Error = PostgresSourceError;
1107
1108    #[throws(PostgresSourceError)]
1109    fn produce(&'r mut self) -> Option<Vec<u8>> {
1110        let (ridx, cidx) = self.next_loc()?;
1111        match &self.rowbuf[ridx][cidx] {
1112            // escape \x in the beginning, empty if None
1113            "" => None,
1114            v => Some(decode(&v[2..])?),
1115        }
1116    }
1117}
1118
1119impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1120    type Error = PostgresSourceError;
1121
1122    #[throws(PostgresSourceError)]
1123    fn produce(&'r mut self) -> Value {
1124        let (ridx, cidx) = self.next_loc()?;
1125        let v = &self.rowbuf[ridx][cidx];
1126        from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1127    }
1128}
1129
1130impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1131    type Error = PostgresSourceError;
1132
1133    #[throws(PostgresSourceError)]
1134    fn produce(&'r mut self) -> Option<Value> {
1135        let (ridx, cidx) = self.next_loc()?;
1136
1137        match &self.rowbuf[ridx][cidx][..] {
1138            "" => None,
1139            v => {
1140                from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1141            }
1142        }
1143    }
1144}
1145
1146pub struct PostgresRawSourceParser<'a> {
1147    iter: RowIter<'a>,
1148    rowbuf: Vec<Row>,
1149    ncols: usize,
1150    current_col: usize,
1151    current_row: usize,
1152    is_finished: bool,
1153}
1154
1155impl<'a> PostgresRawSourceParser<'a> {
1156    pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1157        Self {
1158            iter,
1159            rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1160            ncols: schema.len(),
1161            current_row: 0,
1162            current_col: 0,
1163            is_finished: false,
1164        }
1165    }
1166
1167    #[throws(PostgresSourceError)]
1168    fn next_loc(&mut self) -> (usize, usize) {
1169        let ret = (self.current_row, self.current_col);
1170        self.current_row += (self.current_col + 1) / self.ncols;
1171        self.current_col = (self.current_col + 1) % self.ncols;
1172        ret
1173    }
1174}
1175
1176impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1177    type TypeSystem = PostgresTypeSystem;
1178    type Error = PostgresSourceError;
1179
1180    #[throws(PostgresSourceError)]
1181    fn fetch_next(&mut self) -> (usize, bool) {
1182        assert!(self.current_col == 0);
1183        let remaining_rows = self.rowbuf.len() - self.current_row;
1184        if remaining_rows > 0 {
1185            return (remaining_rows, self.is_finished);
1186        } else if self.is_finished {
1187            return (0, self.is_finished);
1188        }
1189
1190        if !self.rowbuf.is_empty() {
1191            self.rowbuf.drain(..);
1192        }
1193        for _ in 0..DB_BUFFER_SIZE {
1194            if let Some(row) = self.iter.next()? {
1195                self.rowbuf.push(row);
1196            } else {
1197                self.is_finished = true;
1198                break;
1199            }
1200        }
1201        self.current_row = 0;
1202        self.current_col = 0;
1203        (self.rowbuf.len(), self.is_finished)
1204    }
1205}
1206
1207macro_rules! impl_produce {
1208    ($($t: ty,)+) => {
1209        $(
1210            impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1211                type Error = PostgresSourceError;
1212
1213                #[throws(PostgresSourceError)]
1214                fn produce(&'r mut self) -> $t {
1215                    let (ridx, cidx) = self.next_loc()?;
1216                    let row = &self.rowbuf[ridx];
1217                    let val = row.try_get(cidx)?;
1218                    val
1219                }
1220            }
1221
1222            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1223                type Error = PostgresSourceError;
1224
1225                #[throws(PostgresSourceError)]
1226                fn produce(&'r mut self) -> Option<$t> {
1227                    let (ridx, cidx) = self.next_loc()?;
1228                    let row = &self.rowbuf[ridx];
1229                    let val = row.try_get(cidx)?;
1230                    val
1231                }
1232            }
1233        )+
1234    };
1235}
1236
1237impl_produce!(
1238    i8,
1239    i16,
1240    i32,
1241    i64,
1242    f32,
1243    f64,
1244    Decimal,
1245    bool,
1246    &'r str,
1247    Vec<u8>,
1248    NaiveTime,
1249    Uuid,
1250    Value,
1251    IpInet,
1252    Vector,
1253    HalfVector,
1254    Bit,
1255    SparseVector,
1256    HashMap<String, Option<String>>,
1257    Vec<Option<bool>>,
1258    Vec<Option<String>>,
1259    Vec<Option<i16>>,
1260    Vec<Option<i32>>,
1261    Vec<Option<i64>>,
1262    Vec<Option<f32>>,
1263    Vec<Option<f64>>,
1264    Vec<Option<Decimal>>,
1265);
1266
1267impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1268    type Error = PostgresSourceError;
1269
1270    #[throws(PostgresSourceError)]
1271    fn produce(&'r mut self) -> DateTime<Utc> {
1272        let (ridx, cidx) = self.next_loc()?;
1273        let row = &self.rowbuf[ridx];
1274        let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1275        match val {
1276            postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1277            postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1278            postgres::types::Timestamp::Value(t) => t,
1279        }
1280    }
1281}
1282
1283impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1284    type Error = PostgresSourceError;
1285
1286    #[throws(PostgresSourceError)]
1287    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1288        let (ridx, cidx) = self.next_loc()?;
1289        let row = &self.rowbuf[ridx];
1290        let val = row.try_get(cidx)?;
1291        match val {
1292            Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1293            Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1294            Some(postgres::types::Timestamp::Value(t)) => t,
1295            None => None,
1296        }
1297    }
1298}
1299
1300impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1301    type Error = PostgresSourceError;
1302
1303    #[throws(PostgresSourceError)]
1304    fn produce(&'r mut self) -> NaiveDateTime {
1305        let (ridx, cidx) = self.next_loc()?;
1306        let row = &self.rowbuf[ridx];
1307        let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1308        match val {
1309            postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1310            postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1311            postgres::types::Timestamp::Value(t) => t,
1312        }
1313    }
1314}
1315
1316impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1317    type Error = PostgresSourceError;
1318
1319    #[throws(PostgresSourceError)]
1320    fn produce(&'r mut self) -> Option<NaiveDateTime> {
1321        let (ridx, cidx) = self.next_loc()?;
1322        let row = &self.rowbuf[ridx];
1323        let val = row.try_get(cidx)?;
1324        match val {
1325            Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1326            Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1327            Some(postgres::types::Timestamp::Value(t)) => t,
1328            None => None,
1329        }
1330    }
1331}
1332
1333impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1334    type Error = PostgresSourceError;
1335
1336    #[throws(PostgresSourceError)]
1337    fn produce(&'r mut self) -> NaiveDate {
1338        let (ridx, cidx) = self.next_loc()?;
1339        let row = &self.rowbuf[ridx];
1340        let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1341        match val {
1342            postgres::types::Date::PosInfinity => NaiveDate::MAX,
1343            postgres::types::Date::NegInfinity => NaiveDate::MIN,
1344            postgres::types::Date::Value(t) => t,
1345        }
1346    }
1347}
1348
1349impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1350    type Error = PostgresSourceError;
1351
1352    #[throws(PostgresSourceError)]
1353    fn produce(&'r mut self) -> Option<NaiveDate> {
1354        let (ridx, cidx) = self.next_loc()?;
1355        let row = &self.rowbuf[ridx];
1356        let val = row.try_get(cidx)?;
1357        match val {
1358            Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1359            Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1360            Some(postgres::types::Date::Value(t)) => t,
1361            None => None,
1362        }
1363    }
1364}
1365
1366impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1367where
1368    C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1369    C::TlsConnect: Send,
1370    C::Stream: Send,
1371    <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1372{
1373    type TypeSystem = PostgresTypeSystem;
1374    type Parser<'a> = PostgresSimpleSourceParser;
1375    type Error = PostgresSourceError;
1376
1377    #[throws(PostgresSourceError)]
1378    fn result_rows(&mut self) {
1379        self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1380    }
1381
1382    #[throws(PostgresSourceError)]
1383    fn parser(&mut self) -> Self::Parser<'_> {
1384        let rows = self.conn.simple_query(self.query.as_str())?; // unless reading the data, it seems like issue the query is fast
1385        PostgresSimpleSourceParser::new(rows, &self.schema)
1386    }
1387
1388    fn nrows(&self) -> usize {
1389        self.nrows
1390    }
1391
1392    fn ncols(&self) -> usize {
1393        self.ncols
1394    }
1395}
1396
1397pub struct PostgresSimpleSourceParser {
1398    rows: Vec<SimpleQueryMessage>,
1399    ncols: usize,
1400    current_col: usize,
1401    current_row: usize,
1402}
1403impl PostgresSimpleSourceParser {
1404    pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1405        Self {
1406            rows,
1407            ncols: schema.len(),
1408            current_row: 0,
1409            current_col: 0,
1410        }
1411    }
1412
1413    #[throws(PostgresSourceError)]
1414    fn next_loc(&mut self) -> (usize, usize) {
1415        let ret = (self.current_row, self.current_col);
1416        self.current_row += (self.current_col + 1) / self.ncols;
1417        self.current_col = (self.current_col + 1) % self.ncols;
1418        ret
1419    }
1420}
1421
1422impl PartitionParser<'_> for PostgresSimpleSourceParser {
1423    type TypeSystem = PostgresTypeSystem;
1424    type Error = PostgresSourceError;
1425
1426    #[throws(PostgresSourceError)]
1427    fn fetch_next(&mut self) -> (usize, bool) {
1428        self.current_row = 0;
1429        self.current_col = 0;
1430        if !self.rows.is_empty() {
1431            if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1432                self.current_row = 1;
1433            }
1434        }
1435
1436        (self.rows.len() - 1 - self.current_row, true) // last message is command complete
1437    }
1438}
1439
1440macro_rules! impl_simple_produce {
1441    ($($t: ty,)+) => {
1442        $(
1443            impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1444                type Error = PostgresSourceError;
1445
1446                #[throws(PostgresSourceError)]
1447                fn produce(&'r mut self) -> $t {
1448                    let (ridx, cidx) = self.next_loc()?;
1449                    let val = match &self.rows[ridx] {
1450                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1451                            Some(s) => s
1452                                .parse()
1453                                .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1454                            None => throw!(anyhow!(
1455                                "Cannot parse NULL in NOT NULL column."
1456                            )),
1457                        },
1458                        SimpleQueryMessage::CommandComplete(c) => {
1459                            panic!("get command: {}", c);
1460                        }
1461                        _ => {
1462                            panic!("what?");
1463                        }
1464                    };
1465                    val
1466                }
1467            }
1468
1469            impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1470                type Error = PostgresSourceError;
1471
1472                #[throws(PostgresSourceError)]
1473                fn produce(&'r mut self) -> Option<$t> {
1474                    let (ridx, cidx) = self.next_loc()?;
1475                    let val = match &self.rows[ridx] {
1476                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1477                            Some(s) => Some(
1478                                s.parse()
1479                                    .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1480                            ),
1481                            None => None,
1482                        },
1483                        SimpleQueryMessage::CommandComplete(c) => {
1484                            panic!("get command: {}", c);
1485                        }
1486                        _ => {
1487                            panic!("what?");
1488                        }
1489                    };
1490                    val
1491                }
1492            }
1493        )+
1494    };
1495}
1496
1497impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
1498
1499impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1500    type Error = PostgresSourceError;
1501
1502    #[throws(PostgresSourceError)]
1503    fn produce(&'r mut self) -> bool {
1504        let (ridx, cidx) = self.next_loc()?;
1505        let val = match &self.rows[ridx] {
1506            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1507                Some(s) => match s {
1508                    "t" => true,
1509                    "f" => false,
1510                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1511                },
1512                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1513            },
1514            SimpleQueryMessage::CommandComplete(c) => {
1515                panic!("get command: {}", c);
1516            }
1517            _ => {
1518                panic!("what?");
1519            }
1520        };
1521        val
1522    }
1523}
1524
1525impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1526    type Error = PostgresSourceError;
1527
1528    #[throws(PostgresSourceError)]
1529    fn produce(&'r mut self) -> Option<bool> {
1530        let (ridx, cidx) = self.next_loc()?;
1531        let val = match &self.rows[ridx] {
1532            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1533                Some(s) => match s {
1534                    "t" => Some(true),
1535                    "f" => Some(false),
1536                    _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1537                },
1538                None => None,
1539            },
1540            SimpleQueryMessage::CommandComplete(c) => {
1541                panic!("get command: {}", c);
1542            }
1543            _ => {
1544                panic!("what?");
1545            }
1546        };
1547        val
1548    }
1549}
1550
1551impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1552    type Error = PostgresSourceError;
1553
1554    #[throws(PostgresSourceError)]
1555    fn produce(&'r mut self) -> Decimal {
1556        let (ridx, cidx) = self.next_loc()?;
1557        let val = match &self.rows[ridx] {
1558            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1559                Some("Infinity") => Decimal::MAX,
1560                Some("-Infinity") => Decimal::MIN,
1561                Some(s) => s
1562                    .parse()
1563                    .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1564                None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1565            },
1566            SimpleQueryMessage::CommandComplete(c) => {
1567                panic!("get command: {}", c);
1568            }
1569            _ => {
1570                panic!("what?");
1571            }
1572        };
1573        val
1574    }
1575}
1576
1577impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1578    type Error = PostgresSourceError;
1579
1580    #[throws(PostgresSourceError)]
1581    fn produce(&'r mut self) -> Option<Decimal> {
1582        let (ridx, cidx) = self.next_loc()?;
1583        let val = match &self.rows[ridx] {
1584            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1585                Some("Infinity") => Some(Decimal::MAX),
1586                Some("-Infinity") => Some(Decimal::MIN),
1587                Some(s) => Some(
1588                    s.parse()
1589                        .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1590                ),
1591                None => None,
1592            },
1593            SimpleQueryMessage::CommandComplete(c) => {
1594                panic!("get command: {}", c);
1595            }
1596            _ => {
1597                panic!("what?");
1598            }
1599        };
1600        val
1601    }
1602}
1603
1604impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1605    type Error = PostgresSourceError;
1606
1607    #[throws(PostgresSourceError)]
1608    fn produce(&'r mut self) -> &'r str {
1609        let (ridx, cidx) = self.next_loc()?;
1610        let val = match &self.rows[ridx] {
1611            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1612                Some(s) => s,
1613                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1614            },
1615            SimpleQueryMessage::CommandComplete(c) => {
1616                panic!("get command: {}", c);
1617            }
1618            _ => {
1619                panic!("what?");
1620            }
1621        };
1622        val
1623    }
1624}
1625
1626impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1627    type Error = PostgresSourceError;
1628
1629    #[throws(PostgresSourceError)]
1630    fn produce(&'r mut self) -> Option<&'r str> {
1631        let (ridx, cidx) = self.next_loc()?;
1632        let val = match &self.rows[ridx] {
1633            SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1634            SimpleQueryMessage::CommandComplete(c) => {
1635                panic!("get command: {}", c);
1636            }
1637            _ => {
1638                panic!("what?");
1639            }
1640        };
1641        val
1642    }
1643}
1644
1645impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1646    type Error = PostgresSourceError;
1647
1648    #[throws(PostgresSourceError)]
1649    fn produce(&'r mut self) -> Vec<u8> {
1650        let (ridx, cidx) = self.next_loc()?;
1651        let val = match &self.rows[ridx] {
1652            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1653                Some(s) => {
1654                    let mut res = s.chars();
1655                    res.next();
1656                    res.next();
1657                    decode(
1658                        res.enumerate()
1659                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1660                            .chars()
1661                            .map(|c| c as u8)
1662                            .collect::<Vec<u8>>(),
1663                    )?
1664                }
1665                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1666            },
1667            SimpleQueryMessage::CommandComplete(c) => {
1668                panic!("get command: {}", c);
1669            }
1670            _ => {
1671                panic!("what?");
1672            }
1673        };
1674        val
1675    }
1676}
1677
1678impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1679    type Error = PostgresSourceError;
1680
1681    #[throws(PostgresSourceError)]
1682    fn produce(&'r mut self) -> Option<Vec<u8>> {
1683        let (ridx, cidx) = self.next_loc()?;
1684        let val = match &self.rows[ridx] {
1685            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1686                Some(s) => {
1687                    let mut res = s.chars();
1688                    res.next();
1689                    res.next();
1690                    Some(decode(
1691                        res.enumerate()
1692                            .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1693                            .chars()
1694                            .map(|c| c as u8)
1695                            .collect::<Vec<u8>>(),
1696                    )?)
1697                }
1698                None => None,
1699            },
1700            SimpleQueryMessage::CommandComplete(c) => {
1701                panic!("get command: {}", c);
1702            }
1703            _ => {
1704                panic!("what?");
1705            }
1706        };
1707        val
1708    }
1709}
1710
1711fn rem_first_and_last(value: &str) -> &str {
1712    let mut chars = value.chars();
1713    chars.next();
1714    chars.next_back();
1715    chars.as_str()
1716}
1717
1718macro_rules! impl_simple_vec_produce {
1719    ($($t: ty,)+) => {
1720        $(
1721            impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1722                type Error = PostgresSourceError;
1723
1724                #[throws(PostgresSourceError)]
1725                fn produce(&'r mut self) -> Vec<Option<$t>> {
1726                    let (ridx, cidx) = self.next_loc()?;
1727                    let val = match &self.rows[ridx] {
1728                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1729                            Some(s) => match s{
1730                                "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1731                                "{}" => vec![],
1732                                _ => rem_first_and_last(s).split(",").map(|v| {
1733                                    if v == "NULL" {
1734                                        Ok(None)
1735                                    } else {
1736                                        match v.parse() {
1737                                            Ok(v) => Ok(Some(v)),
1738                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1739                                        }
1740                                    }
1741                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1742                            },
1743                            None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1744                        },
1745                        SimpleQueryMessage::CommandComplete(c) => {
1746                            panic!("get command: {}", c);
1747                        }
1748                        _ => {
1749                            panic!("what?");
1750                        }
1751                    };
1752                    val
1753                }
1754            }
1755
1756            impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1757                type Error = PostgresSourceError;
1758
1759                #[throws(PostgresSourceError)]
1760                fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1761                    let (ridx, cidx) = self.next_loc()?;
1762                    let val = match &self.rows[ridx] {
1763
1764                        SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1765                            Some(s) => match s{
1766                                "" => None,
1767                                "{}" => Some(vec![]),
1768                                _ => Some(rem_first_and_last(s).split(",").map(|v| {
1769                                    if v == "NULL" {
1770                                        Ok(None)
1771                                    } else {
1772                                        match v.parse() {
1773                                            Ok(v) => Ok(Some(v)),
1774                                            Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1775                                        }
1776                                    }
1777                                }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1778                            },
1779                            None => None,
1780                        },
1781
1782                        SimpleQueryMessage::CommandComplete(c) => {
1783                            panic!("get command: {}", c);
1784                        }
1785                        _ => {
1786                            panic!("what?");
1787                        }
1788                    };
1789                    val
1790                }
1791            }
1792        )+
1793    };
1794}
1795impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1796
1797impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1798    type Error = PostgresSourceError;
1799
1800    #[throws(PostgresSourceError)]
1801    fn produce(&'r mut self) -> Vec<Option<bool>> {
1802        let (ridx, cidx) = self.next_loc()?;
1803        let val = match &self.rows[ridx] {
1804            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1805                Some(s) => match s {
1806                    "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1807                    "{}" => vec![],
1808                    _ => rem_first_and_last(s)
1809                        .split(',')
1810                        .map(|token| match token {
1811                            "NULL" => Ok(None),
1812                            "t" => Ok(Some(true)),
1813                            "f" => Ok(Some(false)),
1814                            _ => {
1815                                throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1816                            }
1817                        })
1818                        .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1819                },
1820                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1821            },
1822            SimpleQueryMessage::CommandComplete(c) => {
1823                panic!("get command: {}", c);
1824            }
1825            _ => {
1826                panic!("what?");
1827            }
1828        };
1829        val
1830    }
1831}
1832
1833impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1834    type Error = PostgresSourceError;
1835
1836    #[throws(PostgresSourceError)]
1837    fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1838        let (ridx, cidx) = self.next_loc()?;
1839        let val = match &self.rows[ridx] {
1840            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1841                Some(s) => match s {
1842                    "" => None,
1843                    "{}" => Some(vec![]),
1844                    _ => Some(
1845                        rem_first_and_last(s)
1846                            .split(',')
1847                            .map(|token| match token {
1848                                "NULL" => Ok(None),
1849                                "t" => Ok(Some(true)),
1850                                "f" => Ok(Some(false)),
1851                                _ => {
1852                                    throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1853                                        s.into()
1854                                    )))
1855                                }
1856                            })
1857                            .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1858                    ),
1859                },
1860                None => None,
1861            },
1862            SimpleQueryMessage::CommandComplete(c) => {
1863                panic!("get command: {}", c);
1864            }
1865            _ => {
1866                panic!("what?");
1867            }
1868        };
1869        val
1870    }
1871}
1872
1873impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1874    type Error = PostgresSourceError;
1875
1876    #[throws(PostgresSourceError)]
1877    fn produce(&'r mut self) -> NaiveDate {
1878        let (ridx, cidx) = self.next_loc()?;
1879        let val = match &self.rows[ridx] {
1880            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1881                Some(s) => match s {
1882                    "infinity" => NaiveDate::MAX,
1883                    "-infinity" => NaiveDate::MIN,
1884                    s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1885                        ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1886                    })?,
1887                },
1888                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1889            },
1890            SimpleQueryMessage::CommandComplete(c) => {
1891                panic!("get command: {}", c);
1892            }
1893            _ => {
1894                panic!("what?");
1895            }
1896        };
1897        val
1898    }
1899}
1900
1901impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1902    type Error = PostgresSourceError;
1903
1904    #[throws(PostgresSourceError)]
1905    fn produce(&'r mut self) -> Option<NaiveDate> {
1906        let (ridx, cidx) = self.next_loc()?;
1907        let val = match &self.rows[ridx] {
1908            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1909                Some(s) => match s {
1910                    "infinity" => Some(NaiveDate::MAX),
1911                    "-infinity" => Some(NaiveDate::MIN),
1912                    s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1913                        ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1914                    })?),
1915                },
1916                None => None,
1917            },
1918            SimpleQueryMessage::CommandComplete(c) => {
1919                panic!("get command: {}", c);
1920            }
1921            _ => {
1922                panic!("what?");
1923            }
1924        };
1925        val
1926    }
1927}
1928
1929impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1930    type Error = PostgresSourceError;
1931
1932    #[throws(PostgresSourceError)]
1933    fn produce(&'r mut self) -> NaiveTime {
1934        let (ridx, cidx) = self.next_loc()?;
1935        let val = match &self.rows[ridx] {
1936            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1937                Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1938                    .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1939                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1940            },
1941            SimpleQueryMessage::CommandComplete(c) => {
1942                panic!("get command: {}", c);
1943            }
1944            _ => {
1945                panic!("what?");
1946            }
1947        };
1948        val
1949    }
1950}
1951
1952impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1953    type Error = PostgresSourceError;
1954
1955    #[throws(PostgresSourceError)]
1956    fn produce(&'r mut self) -> Option<NaiveTime> {
1957        let (ridx, cidx) = self.next_loc()?;
1958        let val = match &self.rows[ridx] {
1959            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1960                Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1961                    ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1962                })?),
1963                None => None,
1964            },
1965            SimpleQueryMessage::CommandComplete(c) => {
1966                panic!("get command: {}", c);
1967            }
1968            _ => {
1969                panic!("what?");
1970            }
1971        };
1972        val
1973    }
1974}
1975
1976impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1977    type Error = PostgresSourceError;
1978
1979    #[throws(PostgresSourceError)]
1980    fn produce(&'r mut self) -> NaiveDateTime {
1981        let (ridx, cidx) = self.next_loc()?;
1982        let val =
1983            match &self.rows[ridx] {
1984                SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1985                    Some(s) => match s {
1986                        "infinity" => NaiveDateTime::MAX,
1987                        "-infinity" => NaiveDateTime::MIN,
1988                        s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1989                            |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1990                        )?,
1991                    },
1992                    None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1993                },
1994                SimpleQueryMessage::CommandComplete(c) => {
1995                    panic!("get command: {}", c);
1996                }
1997                _ => {
1998                    panic!("what?");
1999                }
2000            };
2001        val
2002    }
2003}
2004
2005impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2006    type Error = PostgresSourceError;
2007
2008    #[throws(PostgresSourceError)]
2009    fn produce(&'r mut self) -> Option<NaiveDateTime> {
2010        let (ridx, cidx) = self.next_loc()?;
2011        let val = match &self.rows[ridx] {
2012            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2013                Some(s) => match s {
2014                    "infinity" => Some(NaiveDateTime::MAX),
2015                    "-infinity" => Some(NaiveDateTime::MIN),
2016                    s => Some(
2017                        NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2018                            ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2019                        })?,
2020                    ),
2021                },
2022                None => None,
2023            },
2024            SimpleQueryMessage::CommandComplete(c) => {
2025                panic!("get command: {}", c);
2026            }
2027            _ => {
2028                panic!("what?");
2029            }
2030        };
2031        val
2032    }
2033}
2034
2035impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2036    type Error = PostgresSourceError;
2037
2038    #[throws(PostgresSourceError)]
2039    fn produce(&'r mut self) -> DateTime<Utc> {
2040        let (ridx, cidx) = self.next_loc()?;
2041        let val = match &self.rows[ridx] {
2042            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2043                Some("infinity") => DateTime::<Utc>::MAX_UTC,
2044                Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2045                Some(s) => {
2046                    let time_string = format!("{}:00", s).to_owned();
2047                    let slice: &str = &time_string[..];
2048                    let time: DateTime<FixedOffset> =
2049                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2050
2051                    time.with_timezone(&Utc)
2052                }
2053                None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2054            },
2055            SimpleQueryMessage::CommandComplete(c) => {
2056                panic!("get command: {}", c);
2057            }
2058            _ => {
2059                panic!("what?");
2060            }
2061        };
2062        val
2063    }
2064}
2065
2066impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2067    type Error = PostgresSourceError;
2068
2069    #[throws(PostgresSourceError)]
2070    fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2071        let (ridx, cidx) = self.next_loc()?;
2072        let val = match &self.rows[ridx] {
2073            SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2074                Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2075                Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2076                Some(s) => {
2077                    let time_string = format!("{}:00", s).to_owned();
2078                    let slice: &str = &time_string[..];
2079                    let time: DateTime<FixedOffset> =
2080                        DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2081
2082                    Some(time.with_timezone(&Utc))
2083                }
2084                None => None,
2085            },
2086            SimpleQueryMessage::CommandComplete(c) => {
2087                panic!("get command: {}", c);
2088            }
2089            _ => {
2090                panic!("what?");
2091            }
2092        };
2093        val
2094    }
2095}