1mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8pub use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use pgvector::{Bit, HalfVector, SparseVector, Vector};
11pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
12
13use crate::constants::DB_BUFFER_SIZE;
14use crate::{
15 data_order::DataOrder,
16 errors::ConnectorXError,
17 sources::{PartitionParser, Produce, Source, SourcePartition},
18 sql::{count_query, CXQuery},
19};
20use anyhow::anyhow;
21use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
22use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
23use fehler::{throw, throws};
24use hex::decode;
25use postgres::{
26 binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
27 fallible_iterator::FallibleIterator,
28 tls::{MakeTlsConnect, TlsConnect},
29 Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
30};
31use r2d2::{Pool, PooledConnection};
32use r2d2_postgres::PostgresConnectionManager;
33use rust_decimal::Decimal;
34use serde_json::{from_str, Value};
35use sqlparser::dialect::PostgreSqlDialect;
36use std::collections::HashMap;
37use std::convert::TryFrom;
38use std::marker::PhantomData;
39use uuid::Uuid;
40
41pub enum BinaryProtocol {}
43
44pub enum CSVProtocol {}
46
47pub enum CursorProtocol {}
49
50pub enum SimpleProtocol {}
52
53type PgManager<C> = PostgresConnectionManager<C>;
54type PgConn<C> = PooledConnection<PgManager<C>>;
55
56macro_rules! impl_produce_unimplemented {
57 ($(($protocol: ty, $t: ty, $msg: expr),)+) => {
58 $(
59 impl<'r> Produce<'r, $t> for $protocol {
60 type Error = PostgresSourceError;
61
62 #[throws(PostgresSourceError)]
63 fn produce(&'r mut self) -> $t {
64 unimplemented!($msg);
65 }
66 }
67
68 impl<'r> Produce<'r, Option<$t>> for $protocol {
69 type Error = PostgresSourceError;
70
71 #[throws(PostgresSourceError)]
72 fn produce(&'r mut self) -> Option<$t> {
73 unimplemented!($msg);
74 }
75 }
76 )+
77 };
78}
79
80impl_produce_unimplemented!(
81 (PostgresCSVSourceParser<'_>, HashMap<String, Option<String>>, "Please use `cursor` protocol for hstore type"),
82 (PostgresCSVSourceParser<'_>, Vector, "Please use `binary` protocol for vector type"),
83 (PostgresCSVSourceParser<'_>, HalfVector, "Please use `binary` protocol for halfvector type"),
84 (PostgresCSVSourceParser<'_>, Bit, "Please use `binary` protocol for bit type"),
85 (PostgresCSVSourceParser<'_>, SparseVector, "Please use `binary` protocol for sparsevector type"),
86
87
88 (PostgresSimpleSourceParser,HashMap<String, Option<String>>, "unimplemented"),
89 (PostgresSimpleSourceParser,Value, "unimplemented"),
90 (PostgresSimpleSourceParser, Vector, "Please use `binary` protocol for vector type"),
91 (PostgresSimpleSourceParser, HalfVector, "Please use `binary` protocol for halfvector type"),
92 (PostgresSimpleSourceParser, Bit, "Please use `binary` protocol for bit type"),
93 (PostgresSimpleSourceParser, SparseVector, "Please use `binary` protocol for sparsevector type"),
94
95);
96
97fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
99 let nrows: Option<R> = row.get(0);
100 nrows.expect("Could not parse int result from count_query")
101}
102
103#[throws(PostgresSourceError)]
104fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
105where
106 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
107 C::TlsConnect: Send,
108 C::Stream: Send,
109 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
110{
111 let dialect = PostgreSqlDialect {};
112
113 let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
114 let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
115 match col_type {
116 PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
117 PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
118 PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
119 _ => throw!(anyhow!(
120 "The result of the count query was not an int, aborting."
121 )),
122 }
123}
124
125pub struct PostgresSource<P, C>
126where
127 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
128 C::TlsConnect: Send,
129 C::Stream: Send,
130 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
131{
132 pool: Pool<PgManager<C>>,
133 origin_query: Option<String>,
134 queries: Vec<CXQuery<String>>,
135 names: Vec<String>,
136 schema: Vec<PostgresTypeSystem>,
137 pg_schema: Vec<postgres::types::Type>,
138 pre_execution_queries: Option<Vec<String>>,
139 _protocol: PhantomData<P>,
140}
141
142impl<P, C> PostgresSource<P, C>
143where
144 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
145 C::TlsConnect: Send,
146 C::Stream: Send,
147 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
148{
149 #[throws(PostgresSourceError)]
150 pub fn new(config: Config, tls: C, nconn: usize) -> Self {
151 let manager = PostgresConnectionManager::new(config, tls);
152 let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
153
154 Self {
155 pool,
156 origin_query: None,
157 queries: vec![],
158 names: vec![],
159 schema: vec![],
160 pg_schema: vec![],
161 pre_execution_queries: None,
162 _protocol: PhantomData,
163 }
164 }
165}
166
167impl<P, C> Source for PostgresSource<P, C>
168where
169 PostgresSourcePartition<P, C>:
170 SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
171 P: Send,
172 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
173 C::TlsConnect: Send,
174 C::Stream: Send,
175 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
176{
177 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
178 type Partition = PostgresSourcePartition<P, C>;
179 type TypeSystem = PostgresTypeSystem;
180 type Error = PostgresSourceError;
181
182 #[throws(PostgresSourceError)]
183 fn set_data_order(&mut self, data_order: DataOrder) {
184 if !matches!(data_order, DataOrder::RowMajor) {
185 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
186 }
187 }
188
189 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
190 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
191 }
192
193 fn set_origin_query(&mut self, query: Option<String>) {
194 self.origin_query = query;
195 }
196
197 fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
198 self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
199 }
200
201 #[throws(PostgresSourceError)]
202 fn fetch_metadata(&mut self) {
203 assert!(!self.queries.is_empty());
204
205 let mut conn = self.pool.get()?;
206 let first_query = &self.queries[0];
207
208 let stmt = conn.prepare(first_query.as_str())?;
209
210 let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
211 .columns()
212 .iter()
213 .map(|col| (col.name().to_string(), col.type_().clone()))
214 .unzip();
215
216 self.names = names;
217 self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
218 self.pg_schema = self
219 .schema
220 .iter()
221 .zip(pg_types.iter())
222 .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
223 .collect();
224 }
225
226 #[throws(PostgresSourceError)]
227 fn result_rows(&mut self) -> Option<usize> {
228 match &self.origin_query {
229 Some(q) => {
230 let cxq = CXQuery::Naked(q.clone());
231 let mut conn = self.pool.get()?;
232 let nrows = get_total_rows(&mut conn, &cxq)?;
233 Some(nrows)
234 }
235 None => None,
236 }
237 }
238
239 fn names(&self) -> Vec<String> {
240 self.names.clone()
241 }
242
243 fn schema(&self) -> Vec<Self::TypeSystem> {
244 self.schema.clone()
245 }
246
247 #[throws(PostgresSourceError)]
248 fn partition(self) -> Vec<Self::Partition> {
249 let mut ret = vec![];
250 for query in self.queries {
251 let mut conn = self.pool.get()?;
252
253 if let Some(pre_queries) = &self.pre_execution_queries {
254 for pre_query in pre_queries {
255 conn.query(pre_query, &[])?;
256 }
257 }
258
259 ret.push(PostgresSourcePartition::<P, C>::new(
260 conn,
261 &query,
262 &self.schema,
263 &self.pg_schema,
264 ));
265 }
266 ret
267 }
268}
269
270pub struct PostgresSourcePartition<P, C>
271where
272 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
273 C::TlsConnect: Send,
274 C::Stream: Send,
275 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
276{
277 conn: PgConn<C>,
278 query: CXQuery<String>,
279 schema: Vec<PostgresTypeSystem>,
280 pg_schema: Vec<postgres::types::Type>,
281 nrows: usize,
282 ncols: usize,
283 _protocol: PhantomData<P>,
284}
285
286impl<P, C> PostgresSourcePartition<P, C>
287where
288 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
289 C::TlsConnect: Send,
290 C::Stream: Send,
291 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
292{
293 pub fn new(
294 conn: PgConn<C>,
295 query: &CXQuery<String>,
296 schema: &[PostgresTypeSystem],
297 pg_schema: &[postgres::types::Type],
298 ) -> Self {
299 Self {
300 conn,
301 query: query.clone(),
302 schema: schema.to_vec(),
303 pg_schema: pg_schema.to_vec(),
304 nrows: 0,
305 ncols: schema.len(),
306 _protocol: PhantomData,
307 }
308 }
309}
310
311impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
312where
313 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
314 C::TlsConnect: Send,
315 C::Stream: Send,
316 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
317{
318 type TypeSystem = PostgresTypeSystem;
319 type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
320 type Error = PostgresSourceError;
321
322 #[throws(PostgresSourceError)]
323 fn result_rows(&mut self) -> () {
324 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
325 }
326
327 #[throws(PostgresSourceError)]
328 fn parser(&mut self) -> Self::Parser<'_> {
329 let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
330 let reader = self.conn.copy_out(&*query)?; let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
332
333 PostgresBinarySourcePartitionParser::new(iter, &self.schema)
334 }
335
336 fn nrows(&self) -> usize {
337 self.nrows
338 }
339
340 fn ncols(&self) -> usize {
341 self.ncols
342 }
343}
344
345impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
346where
347 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
348 C::TlsConnect: Send,
349 C::Stream: Send,
350 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
351{
352 type TypeSystem = PostgresTypeSystem;
353 type Parser<'a> = PostgresCSVSourceParser<'a>;
354 type Error = PostgresSourceError;
355
356 #[throws(PostgresSourceError)]
357 fn result_rows(&mut self) {
358 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
359 }
360
361 #[throws(PostgresSourceError)]
362 fn parser(&mut self) -> Self::Parser<'_> {
363 let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
364 let reader = self.conn.copy_out(&*query)?; let iter = ReaderBuilder::new()
366 .has_headers(false)
367 .from_reader(reader)
368 .into_records();
369
370 PostgresCSVSourceParser::new(iter, &self.schema)
371 }
372
373 fn nrows(&self) -> usize {
374 self.nrows
375 }
376
377 fn ncols(&self) -> usize {
378 self.ncols
379 }
380}
381
382impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
383where
384 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
385 C::TlsConnect: Send,
386 C::Stream: Send,
387 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
388{
389 type TypeSystem = PostgresTypeSystem;
390 type Parser<'a> = PostgresRawSourceParser<'a>;
391 type Error = PostgresSourceError;
392
393 #[throws(PostgresSourceError)]
394 fn result_rows(&mut self) {
395 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
396 }
397
398 #[throws(PostgresSourceError)]
399 fn parser(&mut self) -> Self::Parser<'_> {
400 let iter = self
401 .conn
402 .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; PostgresRawSourceParser::new(iter, &self.schema)
404 }
405
406 fn nrows(&self) -> usize {
407 self.nrows
408 }
409
410 fn ncols(&self) -> usize {
411 self.ncols
412 }
413}
414pub struct PostgresBinarySourcePartitionParser<'a> {
415 iter: BinaryCopyOutIter<'a>,
416 rowbuf: Vec<BinaryCopyOutRow>,
417 ncols: usize,
418 current_col: usize,
419 current_row: usize,
420 is_finished: bool,
421}
422
423impl<'a> PostgresBinarySourcePartitionParser<'a> {
424 pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
425 Self {
426 iter,
427 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
428 ncols: schema.len(),
429 current_row: 0,
430 current_col: 0,
431 is_finished: false,
432 }
433 }
434
435 #[throws(PostgresSourceError)]
436 fn next_loc(&mut self) -> (usize, usize) {
437 let ret = (self.current_row, self.current_col);
438 self.current_row += (self.current_col + 1) / self.ncols;
439 self.current_col = (self.current_col + 1) % self.ncols;
440 ret
441 }
442}
443
444impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
445 type TypeSystem = PostgresTypeSystem;
446 type Error = PostgresSourceError;
447
448 #[throws(PostgresSourceError)]
449 fn fetch_next(&mut self) -> (usize, bool) {
450 assert!(self.current_col == 0);
451 let remaining_rows = self.rowbuf.len() - self.current_row;
452 if remaining_rows > 0 {
453 return (remaining_rows, self.is_finished);
454 } else if self.is_finished {
455 return (0, self.is_finished);
456 }
457
458 if !self.rowbuf.is_empty() {
460 self.rowbuf.drain(..);
461 }
462 for _ in 0..DB_BUFFER_SIZE {
463 match self.iter.next()? {
464 Some(row) => {
465 self.rowbuf.push(row);
466 }
467 None => {
468 self.is_finished = true;
469 break;
470 }
471 }
472 }
473
474 self.current_row = 0;
476 self.current_col = 0;
477
478 (self.rowbuf.len(), self.is_finished)
479 }
480}
481
482macro_rules! impl_produce {
483 ($($t: ty,)+) => {
484 $(
485 impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
486 type Error = PostgresSourceError;
487
488 #[throws(PostgresSourceError)]
489 fn produce(&'r mut self) -> $t {
490 let (ridx, cidx) = self.next_loc()?;
491 let row = &self.rowbuf[ridx];
492 let val = row.try_get(cidx)?;
493 val
494 }
495 }
496
497 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
498 type Error = PostgresSourceError;
499
500 #[throws(PostgresSourceError)]
501 fn produce(&'r mut self) -> Option<$t> {
502 let (ridx, cidx) = self.next_loc()?;
503 let row = &self.rowbuf[ridx];
504 let val = row.try_get(cidx)?;
505 val
506 }
507 }
508 )+
509 };
510}
511
512impl_produce!(
513 i8,
514 i16,
515 i32,
516 i64,
517 u32,
518 f32,
519 f64,
520 Decimal,
521 bool,
522 &'r str,
523 Vec<u8>,
524 NaiveTime,
525 Uuid,
526 Value,
527 IpInet,
528 Vector,
529 HalfVector,
530 Bit,
531 SparseVector,
532 Vec<Option<bool>>,
533 Vec<Option<i16>>,
534 Vec<Option<i32>>,
535 Vec<Option<i64>>,
536 Vec<Option<Decimal>>,
537 Vec<Option<f32>>,
538 Vec<Option<f64>>,
539 Vec<Option<String>>,
540);
541
542impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
543 type Error = PostgresSourceError;
544
545 #[throws(PostgresSourceError)]
546 fn produce(&'r mut self) -> NaiveDateTime {
547 let (ridx, cidx) = self.next_loc()?;
548 let row = &self.rowbuf[ridx];
549 let val = row.try_get(cidx)?;
550 match val {
551 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
552 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
553 postgres::types::Timestamp::Value(t) => t,
554 }
555 }
556}
557
558impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
559 type Error = PostgresSourceError;
560
561 #[throws(PostgresSourceError)]
562 fn produce(&'r mut self) -> Option<NaiveDateTime> {
563 let (ridx, cidx) = self.next_loc()?;
564 let row = &self.rowbuf[ridx];
565 let val = row.try_get(cidx)?;
566 match val {
567 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
568 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
569 Some(postgres::types::Timestamp::Value(t)) => t,
570 None => None,
571 }
572 }
573}
574
575impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
576 type Error = PostgresSourceError;
577
578 #[throws(PostgresSourceError)]
579 fn produce(&'r mut self) -> DateTime<Utc> {
580 let (ridx, cidx) = self.next_loc()?;
581 let row = &self.rowbuf[ridx];
582 let val = row.try_get(cidx)?;
583 match val {
584 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
585 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
586 postgres::types::Timestamp::Value(t) => t,
587 }
588 }
589}
590
591impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
592 type Error = PostgresSourceError;
593
594 #[throws(PostgresSourceError)]
595 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
596 let (ridx, cidx) = self.next_loc()?;
597 let row = &self.rowbuf[ridx];
598 let val = row.try_get(cidx)?;
599 match val {
600 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
601 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
602 Some(postgres::types::Timestamp::Value(t)) => t,
603 None => None,
604 }
605 }
606}
607
608impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
609 type Error = PostgresSourceError;
610
611 #[throws(PostgresSourceError)]
612 fn produce(&'r mut self) -> NaiveDate {
613 let (ridx, cidx) = self.next_loc()?;
614 let row = &self.rowbuf[ridx];
615 let val = row.try_get(cidx)?;
616 match val {
617 postgres::types::Date::PosInfinity => NaiveDate::MAX,
618 postgres::types::Date::NegInfinity => NaiveDate::MIN,
619 postgres::types::Date::Value(t) => t,
620 }
621 }
622}
623
624impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
625 type Error = PostgresSourceError;
626
627 #[throws(PostgresSourceError)]
628 fn produce(&'r mut self) -> Option<NaiveDate> {
629 let (ridx, cidx) = self.next_loc()?;
630 let row = &self.rowbuf[ridx];
631 let val = row.try_get(cidx)?;
632 match val {
633 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
634 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
635 Some(postgres::types::Date::Value(t)) => t,
636 None => None,
637 }
638 }
639}
640
641impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
642 type Error = PostgresSourceError;
643 #[throws(PostgresSourceError)]
644 fn produce(&mut self) -> HashMap<String, Option<String>> {
645 unimplemented!("Please use `cursor` protocol for hstore type");
646 }
647}
648
649impl Produce<'_, Option<HashMap<String, Option<String>>>>
650 for PostgresBinarySourcePartitionParser<'_>
651{
652 type Error = PostgresSourceError;
653 #[throws(PostgresSourceError)]
654 fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
655 unimplemented!("Please use `cursor` protocol for hstore type");
656 }
657}
658
659pub struct PostgresCSVSourceParser<'a> {
660 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
661 rowbuf: Vec<StringRecord>,
662 ncols: usize,
663 current_col: usize,
664 current_row: usize,
665 is_finished: bool,
666}
667
668impl<'a> PostgresCSVSourceParser<'a> {
669 pub fn new(
670 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
671 schema: &[PostgresTypeSystem],
672 ) -> Self {
673 Self {
674 iter,
675 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
676 ncols: schema.len(),
677 current_row: 0,
678 current_col: 0,
679 is_finished: false,
680 }
681 }
682
683 #[throws(PostgresSourceError)]
684 fn next_loc(&mut self) -> (usize, usize) {
685 let ret = (self.current_row, self.current_col);
686 self.current_row += (self.current_col + 1) / self.ncols;
687 self.current_col = (self.current_col + 1) % self.ncols;
688 ret
689 }
690}
691
692impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
693 type Error = PostgresSourceError;
694 type TypeSystem = PostgresTypeSystem;
695
696 #[throws(PostgresSourceError)]
697 fn fetch_next(&mut self) -> (usize, bool) {
698 assert!(self.current_col == 0);
699 let remaining_rows = self.rowbuf.len() - self.current_row;
700 if remaining_rows > 0 {
701 return (remaining_rows, self.is_finished);
702 } else if self.is_finished {
703 return (0, self.is_finished);
704 }
705
706 if !self.rowbuf.is_empty() {
707 self.rowbuf.drain(..);
708 }
709 for _ in 0..DB_BUFFER_SIZE {
710 if let Some(row) = self.iter.next() {
711 self.rowbuf.push(row?);
712 } else {
713 self.is_finished = true;
714 break;
715 }
716 }
717 self.current_row = 0;
718 self.current_col = 0;
719 (self.rowbuf.len(), self.is_finished)
720 }
721}
722
723macro_rules! impl_csv_produce {
724 ($($t: ty,)+) => {
725 $(
726 impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
727 type Error = PostgresSourceError;
728
729 #[throws(PostgresSourceError)]
730 fn produce(&'r mut self) -> $t {
731 let (ridx, cidx) = self.next_loc()?;
732 self.rowbuf[ridx][cidx].parse().map_err(|_| {
733 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
734 })?
735 }
736 }
737
738 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
739 type Error = PostgresSourceError;
740
741 #[throws(PostgresSourceError)]
742 fn produce(&'r mut self) -> Option<$t> {
743 let (ridx, cidx) = self.next_loc()?;
744 match &self.rowbuf[ridx][cidx][..] {
745 "" => None,
746 v => Some(v.parse().map_err(|_| {
747 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
748 })?),
749 }
750 }
751 }
752 )+
753 };
754}
755
756impl_csv_produce!(i8, i16, i32, i64, u32, f32, f64, Uuid, IpInet,);
757
758macro_rules! impl_csv_vec_produce {
759 ($($t: ty,)+) => {
760 $(
761 impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
762 type Error = PostgresSourceError;
763
764 #[throws(PostgresSourceError)]
765 fn produce(&mut self) -> Vec<Option<$t>> {
766 let (ridx, cidx) = self.next_loc()?;
767 let s = &self.rowbuf[ridx][cidx][..];
768 match s {
769 "{}" => vec![],
770 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
771 s => s[1..s.len() - 1]
772 .split(",")
773 .map(|v| {
774 if v == "NULL" {
775 Ok(None)
776 } else {
777 match v.parse() {
778 Ok(v) => Ok(Some(v)),
779 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
780 }
781 }
782 })
783 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
784 }
785 }
786 }
787
788 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
789 type Error = PostgresSourceError;
790
791 #[throws(PostgresSourceError)]
792 fn produce(&mut self) -> Option<Vec<Option<$t>>> {
793 let (ridx, cidx) = self.next_loc()?;
794 let s = &self.rowbuf[ridx][cidx][..];
795 match s {
796 "" => None,
797 "{}" => Some(vec![]),
798 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
799 s => Some(
800 s[1..s.len() - 1]
801 .split(",")
802 .map(|v| {
803 if v == "NULL" {
804 Ok(None)
805 } else {
806 match v.parse() {
807 Ok(v) => Ok(Some(v)),
808 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
809 }
810 }
811 })
812 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
813 ),
814 }
815 }
816 }
817 )+
818 };
819}
820
821impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
822
823impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
824 type Error = PostgresSourceError;
825
826 #[throws(PostgresSourceError)]
827 fn produce(&mut self) -> bool {
828 let (ridx, cidx) = self.next_loc()?;
829 let ret = match &self.rowbuf[ridx][cidx][..] {
830 "t" => true,
831 "f" => false,
832 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
833 self.rowbuf[ridx][cidx].into()
834 ))),
835 };
836 ret
837 }
838}
839
840impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
841 type Error = PostgresSourceError;
842
843 #[throws(PostgresSourceError)]
844 fn produce(&mut self) -> Option<bool> {
845 let (ridx, cidx) = self.next_loc()?;
846 let ret = match &self.rowbuf[ridx][cidx][..] {
847 "" => None,
848 "t" => Some(true),
849 "f" => Some(false),
850 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
851 self.rowbuf[ridx][cidx].into()
852 ))),
853 };
854 ret
855 }
856}
857
858impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
859 type Error = PostgresSourceError;
860
861 #[throws(PostgresSourceError)]
862 fn produce(&mut self) -> Vec<Option<bool>> {
863 let (ridx, cidx) = self.next_loc()?;
864 let s = &self.rowbuf[ridx][cidx][..];
865 match s {
866 "{}" => vec![],
867 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
868 s => s[1..s.len() - 1]
869 .split(',')
870 .map(|v| match v {
871 "NULL" => Ok(None),
872 "t" => Ok(Some(true)),
873 "f" => Ok(Some(false)),
874 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
875 })
876 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
877 }
878 }
879}
880
881impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
882 type Error = PostgresSourceError;
883
884 #[throws(PostgresSourceError)]
885 fn produce(&mut self) -> Option<Vec<Option<bool>>> {
886 let (ridx, cidx) = self.next_loc()?;
887 let s = &self.rowbuf[ridx][cidx][..];
888 match s {
889 "" => None,
890 "{}" => Some(vec![]),
891 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
892 s => Some(
893 s[1..s.len() - 1]
894 .split(',')
895 .map(|v| match v {
896 "NULL" => Ok(None),
897 "t" => Ok(Some(true)),
898 "f" => Ok(Some(false)),
899 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
900 })
901 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
902 ),
903 }
904 }
905}
906
907impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
908 type Error = PostgresSourceError;
909
910 #[throws(PostgresSourceError)]
911 fn produce(&'r mut self) -> Decimal {
912 let (ridx, cidx) = self.next_loc()?;
913 match &self.rowbuf[ridx][cidx][..] {
914 "Infinity" => Decimal::MAX,
915 "-Infinity" => Decimal::MIN,
916 v => v
917 .parse()
918 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
919 }
920 }
921}
922
923impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
924 type Error = PostgresSourceError;
925
926 #[throws(PostgresSourceError)]
927 fn produce(&'r mut self) -> Option<Decimal> {
928 let (ridx, cidx) = self.next_loc()?;
929 match &self.rowbuf[ridx][cidx][..] {
930 "" => None,
931 "Infinity" => Some(Decimal::MAX),
932 "-Infinity" => Some(Decimal::MIN),
933 v => Some(
934 v.parse()
935 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
936 ),
937 }
938 }
939}
940
941impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
942 type Error = PostgresSourceError;
943
944 #[throws(PostgresSourceError)]
945 fn produce(&mut self) -> DateTime<Utc> {
946 let (ridx, cidx) = self.next_loc()?;
947 match &self.rowbuf[ridx][cidx][..] {
948 "infinity" => DateTime::<Utc>::MAX_UTC,
949 "-infinity" => DateTime::<Utc>::MIN_UTC,
950 v => format!("{}:00", v)
952 .parse()
953 .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
954 }
955 }
956}
957
958impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
959 type Error = PostgresSourceError;
960
961 #[throws(PostgresSourceError)]
962 fn produce(&mut self) -> Option<DateTime<Utc>> {
963 let (ridx, cidx) = self.next_loc()?;
964 match &self.rowbuf[ridx][cidx][..] {
965 "" => None,
966 "infinity" => Some(DateTime::<Utc>::MAX_UTC),
967 "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
968 v => {
969 Some(format!("{}:00", v).parse().map_err(|_| {
971 ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
972 })?)
973 }
974 }
975 }
976}
977
978impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
979 type Error = PostgresSourceError;
980
981 #[throws(PostgresSourceError)]
982 fn produce(&mut self) -> NaiveDate {
983 let (ridx, cidx) = self.next_loc()?;
984 match &self.rowbuf[ridx][cidx][..] {
985 "infinity" => NaiveDate::MAX,
986 "-infinity" => NaiveDate::MIN,
987 v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
988 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
989 }
990 }
991}
992
993impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
994 type Error = PostgresSourceError;
995
996 #[throws(PostgresSourceError)]
997 fn produce(&mut self) -> Option<NaiveDate> {
998 let (ridx, cidx) = self.next_loc()?;
999 match &self.rowbuf[ridx][cidx][..] {
1000 "" => None,
1001 "infinity" => Some(NaiveDate::MAX),
1002 "-infinity" => Some(NaiveDate::MIN),
1003 v => Some(
1004 NaiveDate::parse_from_str(v, "%Y-%m-%d")
1005 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
1006 ),
1007 }
1008 }
1009}
1010
1011impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
1012 type Error = PostgresSourceError;
1013
1014 #[throws(PostgresSourceError)]
1015 fn produce(&mut self) -> NaiveDateTime {
1016 let (ridx, cidx) = self.next_loc()?;
1017 match &self.rowbuf[ridx][cidx] {
1018 "infinity" => NaiveDateTime::MAX,
1019 "-infinity" => NaiveDateTime::MIN,
1020 v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
1021 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
1022 }
1023 }
1024}
1025
1026impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
1027 type Error = PostgresSourceError;
1028
1029 #[throws(PostgresSourceError)]
1030 fn produce(&mut self) -> Option<NaiveDateTime> {
1031 let (ridx, cidx) = self.next_loc()?;
1032 match &self.rowbuf[ridx][cidx][..] {
1033 "" => None,
1034 "infinity" => Some(NaiveDateTime::MAX),
1035 "-infinity" => Some(NaiveDateTime::MIN),
1036 v => Some(
1037 NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1038 ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1039 })?,
1040 ),
1041 }
1042 }
1043}
1044
1045impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1046 type Error = PostgresSourceError;
1047
1048 #[throws(PostgresSourceError)]
1049 fn produce(&mut self) -> NaiveTime {
1050 let (ridx, cidx) = self.next_loc()?;
1051 NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1052 ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1053 })?
1054 }
1055}
1056
1057impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1058 type Error = PostgresSourceError;
1059
1060 #[throws(PostgresSourceError)]
1061 fn produce(&mut self) -> Option<NaiveTime> {
1062 let (ridx, cidx) = self.next_loc()?;
1063 match &self.rowbuf[ridx][cidx][..] {
1064 "" => None,
1065 v => Some(
1066 NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1067 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1068 ),
1069 }
1070 }
1071}
1072
1073impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1074 type Error = PostgresSourceError;
1075
1076 #[throws(PostgresSourceError)]
1077 fn produce(&'r mut self) -> &'r str {
1078 let (ridx, cidx) = self.next_loc()?;
1079 &self.rowbuf[ridx][cidx]
1080 }
1081}
1082
1083impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1084 type Error = PostgresSourceError;
1085
1086 #[throws(PostgresSourceError)]
1087 fn produce(&'r mut self) -> Option<&'r str> {
1088 let (ridx, cidx) = self.next_loc()?;
1089 match &self.rowbuf[ridx][cidx][..] {
1090 "" => None,
1091 v => Some(v),
1092 }
1093 }
1094}
1095
1096impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1097 type Error = PostgresSourceError;
1098
1099 #[throws(PostgresSourceError)]
1100 fn produce(&'r mut self) -> Vec<u8> {
1101 let (ridx, cidx) = self.next_loc()?;
1102 decode(&self.rowbuf[ridx][cidx][2..])? }
1104}
1105
1106impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1107 type Error = PostgresSourceError;
1108
1109 #[throws(PostgresSourceError)]
1110 fn produce(&'r mut self) -> Option<Vec<u8>> {
1111 let (ridx, cidx) = self.next_loc()?;
1112 match &self.rowbuf[ridx][cidx] {
1113 "" => None,
1115 v => Some(decode(&v[2..])?),
1116 }
1117 }
1118}
1119
1120impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1121 type Error = PostgresSourceError;
1122
1123 #[throws(PostgresSourceError)]
1124 fn produce(&'r mut self) -> Value {
1125 let (ridx, cidx) = self.next_loc()?;
1126 let v = &self.rowbuf[ridx][cidx];
1127 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1128 }
1129}
1130
1131impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1132 type Error = PostgresSourceError;
1133
1134 #[throws(PostgresSourceError)]
1135 fn produce(&'r mut self) -> Option<Value> {
1136 let (ridx, cidx) = self.next_loc()?;
1137
1138 match &self.rowbuf[ridx][cidx][..] {
1139 "" => None,
1140 v => {
1141 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1142 }
1143 }
1144 }
1145}
1146
1147pub struct PostgresRawSourceParser<'a> {
1148 iter: RowIter<'a>,
1149 rowbuf: Vec<Row>,
1150 ncols: usize,
1151 current_col: usize,
1152 current_row: usize,
1153 is_finished: bool,
1154}
1155
1156impl<'a> PostgresRawSourceParser<'a> {
1157 pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1158 Self {
1159 iter,
1160 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1161 ncols: schema.len(),
1162 current_row: 0,
1163 current_col: 0,
1164 is_finished: false,
1165 }
1166 }
1167
1168 #[throws(PostgresSourceError)]
1169 fn next_loc(&mut self) -> (usize, usize) {
1170 let ret = (self.current_row, self.current_col);
1171 self.current_row += (self.current_col + 1) / self.ncols;
1172 self.current_col = (self.current_col + 1) % self.ncols;
1173 ret
1174 }
1175}
1176
1177impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1178 type TypeSystem = PostgresTypeSystem;
1179 type Error = PostgresSourceError;
1180
1181 #[throws(PostgresSourceError)]
1182 fn fetch_next(&mut self) -> (usize, bool) {
1183 assert!(self.current_col == 0);
1184 let remaining_rows = self.rowbuf.len() - self.current_row;
1185 if remaining_rows > 0 {
1186 return (remaining_rows, self.is_finished);
1187 } else if self.is_finished {
1188 return (0, self.is_finished);
1189 }
1190
1191 if !self.rowbuf.is_empty() {
1192 self.rowbuf.drain(..);
1193 }
1194 for _ in 0..DB_BUFFER_SIZE {
1195 if let Some(row) = self.iter.next()? {
1196 self.rowbuf.push(row);
1197 } else {
1198 self.is_finished = true;
1199 break;
1200 }
1201 }
1202 self.current_row = 0;
1203 self.current_col = 0;
1204 (self.rowbuf.len(), self.is_finished)
1205 }
1206}
1207
1208macro_rules! impl_produce {
1209 ($($t: ty,)+) => {
1210 $(
1211 impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1212 type Error = PostgresSourceError;
1213
1214 #[throws(PostgresSourceError)]
1215 fn produce(&'r mut self) -> $t {
1216 let (ridx, cidx) = self.next_loc()?;
1217 let row = &self.rowbuf[ridx];
1218 let val = row.try_get(cidx)?;
1219 val
1220 }
1221 }
1222
1223 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1224 type Error = PostgresSourceError;
1225
1226 #[throws(PostgresSourceError)]
1227 fn produce(&'r mut self) -> Option<$t> {
1228 let (ridx, cidx) = self.next_loc()?;
1229 let row = &self.rowbuf[ridx];
1230 let val = row.try_get(cidx)?;
1231 val
1232 }
1233 }
1234 )+
1235 };
1236}
1237
1238impl_produce!(
1239 i8,
1240 i16,
1241 i32,
1242 i64,
1243 u32,
1244 f32,
1245 f64,
1246 Decimal,
1247 bool,
1248 &'r str,
1249 Vec<u8>,
1250 NaiveTime,
1251 Uuid,
1252 Value,
1253 IpInet,
1254 Vector,
1255 HalfVector,
1256 Bit,
1257 SparseVector,
1258 HashMap<String, Option<String>>,
1259 Vec<Option<bool>>,
1260 Vec<Option<String>>,
1261 Vec<Option<i16>>,
1262 Vec<Option<i32>>,
1263 Vec<Option<i64>>,
1264 Vec<Option<f32>>,
1265 Vec<Option<f64>>,
1266 Vec<Option<Decimal>>,
1267);
1268
1269impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1270 type Error = PostgresSourceError;
1271
1272 #[throws(PostgresSourceError)]
1273 fn produce(&'r mut self) -> DateTime<Utc> {
1274 let (ridx, cidx) = self.next_loc()?;
1275 let row = &self.rowbuf[ridx];
1276 let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1277 match val {
1278 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1279 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1280 postgres::types::Timestamp::Value(t) => t,
1281 }
1282 }
1283}
1284
1285impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1286 type Error = PostgresSourceError;
1287
1288 #[throws(PostgresSourceError)]
1289 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1290 let (ridx, cidx) = self.next_loc()?;
1291 let row = &self.rowbuf[ridx];
1292 let val = row.try_get(cidx)?;
1293 match val {
1294 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1295 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1296 Some(postgres::types::Timestamp::Value(t)) => t,
1297 None => None,
1298 }
1299 }
1300}
1301
1302impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1303 type Error = PostgresSourceError;
1304
1305 #[throws(PostgresSourceError)]
1306 fn produce(&'r mut self) -> NaiveDateTime {
1307 let (ridx, cidx) = self.next_loc()?;
1308 let row = &self.rowbuf[ridx];
1309 let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1310 match val {
1311 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1312 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1313 postgres::types::Timestamp::Value(t) => t,
1314 }
1315 }
1316}
1317
1318impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1319 type Error = PostgresSourceError;
1320
1321 #[throws(PostgresSourceError)]
1322 fn produce(&'r mut self) -> Option<NaiveDateTime> {
1323 let (ridx, cidx) = self.next_loc()?;
1324 let row = &self.rowbuf[ridx];
1325 let val = row.try_get(cidx)?;
1326 match val {
1327 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1328 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1329 Some(postgres::types::Timestamp::Value(t)) => t,
1330 None => None,
1331 }
1332 }
1333}
1334
1335impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1336 type Error = PostgresSourceError;
1337
1338 #[throws(PostgresSourceError)]
1339 fn produce(&'r mut self) -> NaiveDate {
1340 let (ridx, cidx) = self.next_loc()?;
1341 let row = &self.rowbuf[ridx];
1342 let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1343 match val {
1344 postgres::types::Date::PosInfinity => NaiveDate::MAX,
1345 postgres::types::Date::NegInfinity => NaiveDate::MIN,
1346 postgres::types::Date::Value(t) => t,
1347 }
1348 }
1349}
1350
1351impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1352 type Error = PostgresSourceError;
1353
1354 #[throws(PostgresSourceError)]
1355 fn produce(&'r mut self) -> Option<NaiveDate> {
1356 let (ridx, cidx) = self.next_loc()?;
1357 let row = &self.rowbuf[ridx];
1358 let val = row.try_get(cidx)?;
1359 match val {
1360 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1361 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1362 Some(postgres::types::Date::Value(t)) => t,
1363 None => None,
1364 }
1365 }
1366}
1367
1368impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1369where
1370 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1371 C::TlsConnect: Send,
1372 C::Stream: Send,
1373 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1374{
1375 type TypeSystem = PostgresTypeSystem;
1376 type Parser<'a> = PostgresSimpleSourceParser;
1377 type Error = PostgresSourceError;
1378
1379 #[throws(PostgresSourceError)]
1380 fn result_rows(&mut self) {
1381 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1382 }
1383
1384 #[throws(PostgresSourceError)]
1385 fn parser(&mut self) -> Self::Parser<'_> {
1386 let rows = self.conn.simple_query(self.query.as_str())?; PostgresSimpleSourceParser::new(rows, &self.schema)
1388 }
1389
1390 fn nrows(&self) -> usize {
1391 self.nrows
1392 }
1393
1394 fn ncols(&self) -> usize {
1395 self.ncols
1396 }
1397}
1398
1399pub struct PostgresSimpleSourceParser {
1400 rows: Vec<SimpleQueryMessage>,
1401 ncols: usize,
1402 current_col: usize,
1403 current_row: usize,
1404}
1405impl PostgresSimpleSourceParser {
1406 pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1407 Self {
1408 rows,
1409 ncols: schema.len(),
1410 current_row: 0,
1411 current_col: 0,
1412 }
1413 }
1414
1415 #[throws(PostgresSourceError)]
1416 fn next_loc(&mut self) -> (usize, usize) {
1417 let ret = (self.current_row, self.current_col);
1418 self.current_row += (self.current_col + 1) / self.ncols;
1419 self.current_col = (self.current_col + 1) % self.ncols;
1420 ret
1421 }
1422}
1423
1424impl PartitionParser<'_> for PostgresSimpleSourceParser {
1425 type TypeSystem = PostgresTypeSystem;
1426 type Error = PostgresSourceError;
1427
1428 #[throws(PostgresSourceError)]
1429 fn fetch_next(&mut self) -> (usize, bool) {
1430 self.current_row = 0;
1431 self.current_col = 0;
1432 if !self.rows.is_empty() {
1433 if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1434 self.current_row = 1;
1435 }
1436 }
1437
1438 (self.rows.len() - 1 - self.current_row, true) }
1440}
1441
1442macro_rules! impl_simple_produce {
1443 ($($t: ty,)+) => {
1444 $(
1445 impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1446 type Error = PostgresSourceError;
1447
1448 #[throws(PostgresSourceError)]
1449 fn produce(&'r mut self) -> $t {
1450 let (ridx, cidx) = self.next_loc()?;
1451 let val = match &self.rows[ridx] {
1452 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1453 Some(s) => s
1454 .parse()
1455 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1456 None => throw!(anyhow!(
1457 "Cannot parse NULL in NOT NULL column."
1458 )),
1459 },
1460 SimpleQueryMessage::CommandComplete(c) => {
1461 panic!("get command: {}", c);
1462 }
1463 _ => {
1464 panic!("what?");
1465 }
1466 };
1467 val
1468 }
1469 }
1470
1471 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1472 type Error = PostgresSourceError;
1473
1474 #[throws(PostgresSourceError)]
1475 fn produce(&'r mut self) -> Option<$t> {
1476 let (ridx, cidx) = self.next_loc()?;
1477 let val = match &self.rows[ridx] {
1478 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1479 Some(s) => Some(
1480 s.parse()
1481 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1482 ),
1483 None => None,
1484 },
1485 SimpleQueryMessage::CommandComplete(c) => {
1486 panic!("get command: {}", c);
1487 }
1488 _ => {
1489 panic!("what?");
1490 }
1491 };
1492 val
1493 }
1494 }
1495 )+
1496 };
1497}
1498
1499impl_simple_produce!(i8, i16, i32, i64, u32, f32, f64, Uuid, IpInet,);
1500
1501impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1502 type Error = PostgresSourceError;
1503
1504 #[throws(PostgresSourceError)]
1505 fn produce(&'r mut self) -> bool {
1506 let (ridx, cidx) = self.next_loc()?;
1507 let val = match &self.rows[ridx] {
1508 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1509 Some(s) => match s {
1510 "t" => true,
1511 "f" => false,
1512 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1513 },
1514 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1515 },
1516 SimpleQueryMessage::CommandComplete(c) => {
1517 panic!("get command: {}", c);
1518 }
1519 _ => {
1520 panic!("what?");
1521 }
1522 };
1523 val
1524 }
1525}
1526
1527impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1528 type Error = PostgresSourceError;
1529
1530 #[throws(PostgresSourceError)]
1531 fn produce(&'r mut self) -> Option<bool> {
1532 let (ridx, cidx) = self.next_loc()?;
1533 let val = match &self.rows[ridx] {
1534 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1535 Some(s) => match s {
1536 "t" => Some(true),
1537 "f" => Some(false),
1538 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1539 },
1540 None => None,
1541 },
1542 SimpleQueryMessage::CommandComplete(c) => {
1543 panic!("get command: {}", c);
1544 }
1545 _ => {
1546 panic!("what?");
1547 }
1548 };
1549 val
1550 }
1551}
1552
1553impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1554 type Error = PostgresSourceError;
1555
1556 #[throws(PostgresSourceError)]
1557 fn produce(&'r mut self) -> Decimal {
1558 let (ridx, cidx) = self.next_loc()?;
1559 let val = match &self.rows[ridx] {
1560 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1561 Some("Infinity") => Decimal::MAX,
1562 Some("-Infinity") => Decimal::MIN,
1563 Some(s) => s
1564 .parse()
1565 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1566 None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1567 },
1568 SimpleQueryMessage::CommandComplete(c) => {
1569 panic!("get command: {}", c);
1570 }
1571 _ => {
1572 panic!("what?");
1573 }
1574 };
1575 val
1576 }
1577}
1578
1579impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1580 type Error = PostgresSourceError;
1581
1582 #[throws(PostgresSourceError)]
1583 fn produce(&'r mut self) -> Option<Decimal> {
1584 let (ridx, cidx) = self.next_loc()?;
1585 let val = match &self.rows[ridx] {
1586 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1587 Some("Infinity") => Some(Decimal::MAX),
1588 Some("-Infinity") => Some(Decimal::MIN),
1589 Some(s) => Some(
1590 s.parse()
1591 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1592 ),
1593 None => None,
1594 },
1595 SimpleQueryMessage::CommandComplete(c) => {
1596 panic!("get command: {}", c);
1597 }
1598 _ => {
1599 panic!("what?");
1600 }
1601 };
1602 val
1603 }
1604}
1605
1606impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1607 type Error = PostgresSourceError;
1608
1609 #[throws(PostgresSourceError)]
1610 fn produce(&'r mut self) -> &'r str {
1611 let (ridx, cidx) = self.next_loc()?;
1612 let val = match &self.rows[ridx] {
1613 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1614 Some(s) => s,
1615 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1616 },
1617 SimpleQueryMessage::CommandComplete(c) => {
1618 panic!("get command: {}", c);
1619 }
1620 _ => {
1621 panic!("what?");
1622 }
1623 };
1624 val
1625 }
1626}
1627
1628impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1629 type Error = PostgresSourceError;
1630
1631 #[throws(PostgresSourceError)]
1632 fn produce(&'r mut self) -> Option<&'r str> {
1633 let (ridx, cidx) = self.next_loc()?;
1634 let val = match &self.rows[ridx] {
1635 SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1636 SimpleQueryMessage::CommandComplete(c) => {
1637 panic!("get command: {}", c);
1638 }
1639 _ => {
1640 panic!("what?");
1641 }
1642 };
1643 val
1644 }
1645}
1646
1647impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1648 type Error = PostgresSourceError;
1649
1650 #[throws(PostgresSourceError)]
1651 fn produce(&'r mut self) -> Vec<u8> {
1652 let (ridx, cidx) = self.next_loc()?;
1653 let val = match &self.rows[ridx] {
1654 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1655 Some(s) => {
1656 let mut res = s.chars();
1657 res.next();
1658 res.next();
1659 decode(
1660 res.enumerate()
1661 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1662 .chars()
1663 .map(|c| c as u8)
1664 .collect::<Vec<u8>>(),
1665 )?
1666 }
1667 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1668 },
1669 SimpleQueryMessage::CommandComplete(c) => {
1670 panic!("get command: {}", c);
1671 }
1672 _ => {
1673 panic!("what?");
1674 }
1675 };
1676 val
1677 }
1678}
1679
1680impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1681 type Error = PostgresSourceError;
1682
1683 #[throws(PostgresSourceError)]
1684 fn produce(&'r mut self) -> Option<Vec<u8>> {
1685 let (ridx, cidx) = self.next_loc()?;
1686 let val = match &self.rows[ridx] {
1687 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1688 Some(s) => {
1689 let mut res = s.chars();
1690 res.next();
1691 res.next();
1692 Some(decode(
1693 res.enumerate()
1694 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1695 .chars()
1696 .map(|c| c as u8)
1697 .collect::<Vec<u8>>(),
1698 )?)
1699 }
1700 None => None,
1701 },
1702 SimpleQueryMessage::CommandComplete(c) => {
1703 panic!("get command: {}", c);
1704 }
1705 _ => {
1706 panic!("what?");
1707 }
1708 };
1709 val
1710 }
1711}
1712
1713fn rem_first_and_last(value: &str) -> &str {
1714 let mut chars = value.chars();
1715 chars.next();
1716 chars.next_back();
1717 chars.as_str()
1718}
1719
1720macro_rules! impl_simple_vec_produce {
1721 ($($t: ty,)+) => {
1722 $(
1723 impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1724 type Error = PostgresSourceError;
1725
1726 #[throws(PostgresSourceError)]
1727 fn produce(&'r mut self) -> Vec<Option<$t>> {
1728 let (ridx, cidx) = self.next_loc()?;
1729 let val = match &self.rows[ridx] {
1730 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1731 Some(s) => match s{
1732 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1733 "{}" => vec![],
1734 _ => rem_first_and_last(s).split(",").map(|v| {
1735 if v == "NULL" {
1736 Ok(None)
1737 } else {
1738 match v.parse() {
1739 Ok(v) => Ok(Some(v)),
1740 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1741 }
1742 }
1743 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1744 },
1745 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1746 },
1747 SimpleQueryMessage::CommandComplete(c) => {
1748 panic!("get command: {}", c);
1749 }
1750 _ => {
1751 panic!("what?");
1752 }
1753 };
1754 val
1755 }
1756 }
1757
1758 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1759 type Error = PostgresSourceError;
1760
1761 #[throws(PostgresSourceError)]
1762 fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1763 let (ridx, cidx) = self.next_loc()?;
1764 let val = match &self.rows[ridx] {
1765
1766 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1767 Some(s) => match s{
1768 "" => None,
1769 "{}" => Some(vec![]),
1770 _ => Some(rem_first_and_last(s).split(",").map(|v| {
1771 if v == "NULL" {
1772 Ok(None)
1773 } else {
1774 match v.parse() {
1775 Ok(v) => Ok(Some(v)),
1776 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1777 }
1778 }
1779 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1780 },
1781 None => None,
1782 },
1783
1784 SimpleQueryMessage::CommandComplete(c) => {
1785 panic!("get command: {}", c);
1786 }
1787 _ => {
1788 panic!("what?");
1789 }
1790 };
1791 val
1792 }
1793 }
1794 )+
1795 };
1796}
1797impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1798
1799impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1800 type Error = PostgresSourceError;
1801
1802 #[throws(PostgresSourceError)]
1803 fn produce(&'r mut self) -> Vec<Option<bool>> {
1804 let (ridx, cidx) = self.next_loc()?;
1805 let val = match &self.rows[ridx] {
1806 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1807 Some(s) => match s {
1808 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1809 "{}" => vec![],
1810 _ => rem_first_and_last(s)
1811 .split(',')
1812 .map(|token| match token {
1813 "NULL" => Ok(None),
1814 "t" => Ok(Some(true)),
1815 "f" => Ok(Some(false)),
1816 _ => {
1817 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1818 }
1819 })
1820 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1821 },
1822 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1823 },
1824 SimpleQueryMessage::CommandComplete(c) => {
1825 panic!("get command: {}", c);
1826 }
1827 _ => {
1828 panic!("what?");
1829 }
1830 };
1831 val
1832 }
1833}
1834
1835impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1836 type Error = PostgresSourceError;
1837
1838 #[throws(PostgresSourceError)]
1839 fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1840 let (ridx, cidx) = self.next_loc()?;
1841 let val = match &self.rows[ridx] {
1842 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1843 Some(s) => match s {
1844 "" => None,
1845 "{}" => Some(vec![]),
1846 _ => Some(
1847 rem_first_and_last(s)
1848 .split(',')
1849 .map(|token| match token {
1850 "NULL" => Ok(None),
1851 "t" => Ok(Some(true)),
1852 "f" => Ok(Some(false)),
1853 _ => {
1854 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1855 s.into()
1856 )))
1857 }
1858 })
1859 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1860 ),
1861 },
1862 None => None,
1863 },
1864 SimpleQueryMessage::CommandComplete(c) => {
1865 panic!("get command: {}", c);
1866 }
1867 _ => {
1868 panic!("what?");
1869 }
1870 };
1871 val
1872 }
1873}
1874
1875impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1876 type Error = PostgresSourceError;
1877
1878 #[throws(PostgresSourceError)]
1879 fn produce(&'r mut self) -> NaiveDate {
1880 let (ridx, cidx) = self.next_loc()?;
1881 let val = match &self.rows[ridx] {
1882 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1883 Some(s) => match s {
1884 "infinity" => NaiveDate::MAX,
1885 "-infinity" => NaiveDate::MIN,
1886 s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1887 ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1888 })?,
1889 },
1890 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1891 },
1892 SimpleQueryMessage::CommandComplete(c) => {
1893 panic!("get command: {}", c);
1894 }
1895 _ => {
1896 panic!("what?");
1897 }
1898 };
1899 val
1900 }
1901}
1902
1903impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1904 type Error = PostgresSourceError;
1905
1906 #[throws(PostgresSourceError)]
1907 fn produce(&'r mut self) -> Option<NaiveDate> {
1908 let (ridx, cidx) = self.next_loc()?;
1909 let val = match &self.rows[ridx] {
1910 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1911 Some(s) => match s {
1912 "infinity" => Some(NaiveDate::MAX),
1913 "-infinity" => Some(NaiveDate::MIN),
1914 s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1915 ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1916 })?),
1917 },
1918 None => None,
1919 },
1920 SimpleQueryMessage::CommandComplete(c) => {
1921 panic!("get command: {}", c);
1922 }
1923 _ => {
1924 panic!("what?");
1925 }
1926 };
1927 val
1928 }
1929}
1930
1931impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1932 type Error = PostgresSourceError;
1933
1934 #[throws(PostgresSourceError)]
1935 fn produce(&'r mut self) -> NaiveTime {
1936 let (ridx, cidx) = self.next_loc()?;
1937 let val = match &self.rows[ridx] {
1938 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1939 Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1940 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1941 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1942 },
1943 SimpleQueryMessage::CommandComplete(c) => {
1944 panic!("get command: {}", c);
1945 }
1946 _ => {
1947 panic!("what?");
1948 }
1949 };
1950 val
1951 }
1952}
1953
1954impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1955 type Error = PostgresSourceError;
1956
1957 #[throws(PostgresSourceError)]
1958 fn produce(&'r mut self) -> Option<NaiveTime> {
1959 let (ridx, cidx) = self.next_loc()?;
1960 let val = match &self.rows[ridx] {
1961 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1962 Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1963 ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1964 })?),
1965 None => None,
1966 },
1967 SimpleQueryMessage::CommandComplete(c) => {
1968 panic!("get command: {}", c);
1969 }
1970 _ => {
1971 panic!("what?");
1972 }
1973 };
1974 val
1975 }
1976}
1977
1978impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1979 type Error = PostgresSourceError;
1980
1981 #[throws(PostgresSourceError)]
1982 fn produce(&'r mut self) -> NaiveDateTime {
1983 let (ridx, cidx) = self.next_loc()?;
1984 let val =
1985 match &self.rows[ridx] {
1986 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1987 Some(s) => match s {
1988 "infinity" => NaiveDateTime::MAX,
1989 "-infinity" => NaiveDateTime::MIN,
1990 s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1991 |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1992 )?,
1993 },
1994 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1995 },
1996 SimpleQueryMessage::CommandComplete(c) => {
1997 panic!("get command: {}", c);
1998 }
1999 _ => {
2000 panic!("what?");
2001 }
2002 };
2003 val
2004 }
2005}
2006
2007impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2008 type Error = PostgresSourceError;
2009
2010 #[throws(PostgresSourceError)]
2011 fn produce(&'r mut self) -> Option<NaiveDateTime> {
2012 let (ridx, cidx) = self.next_loc()?;
2013 let val = match &self.rows[ridx] {
2014 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2015 Some(s) => match s {
2016 "infinity" => Some(NaiveDateTime::MAX),
2017 "-infinity" => Some(NaiveDateTime::MIN),
2018 s => Some(
2019 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2020 ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2021 })?,
2022 ),
2023 },
2024 None => None,
2025 },
2026 SimpleQueryMessage::CommandComplete(c) => {
2027 panic!("get command: {}", c);
2028 }
2029 _ => {
2030 panic!("what?");
2031 }
2032 };
2033 val
2034 }
2035}
2036
2037impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2038 type Error = PostgresSourceError;
2039
2040 #[throws(PostgresSourceError)]
2041 fn produce(&'r mut self) -> DateTime<Utc> {
2042 let (ridx, cidx) = self.next_loc()?;
2043 let val = match &self.rows[ridx] {
2044 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2045 Some("infinity") => DateTime::<Utc>::MAX_UTC,
2046 Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2047 Some(s) => {
2048 let time_string = format!("{}:00", s).to_owned();
2049 let slice: &str = &time_string[..];
2050 let time: DateTime<FixedOffset> =
2051 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2052
2053 time.with_timezone(&Utc)
2054 }
2055 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2056 },
2057 SimpleQueryMessage::CommandComplete(c) => {
2058 panic!("get command: {}", c);
2059 }
2060 _ => {
2061 panic!("what?");
2062 }
2063 };
2064 val
2065 }
2066}
2067
2068impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2069 type Error = PostgresSourceError;
2070
2071 #[throws(PostgresSourceError)]
2072 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2073 let (ridx, cidx) = self.next_loc()?;
2074 let val = match &self.rows[ridx] {
2075 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2076 Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2077 Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2078 Some(s) => {
2079 let time_string = format!("{}:00", s).to_owned();
2080 let slice: &str = &time_string[..];
2081 let time: DateTime<FixedOffset> =
2082 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2083
2084 Some(time.with_timezone(&Utc))
2085 }
2086 None => None,
2087 },
2088 SimpleQueryMessage::CommandComplete(c) => {
2089 panic!("get command: {}", c);
2090 }
2091 _ => {
2092 panic!("what?");
2093 }
2094 };
2095 val
2096 }
2097}