1mod connection;
4mod errors;
5mod typesystem;
6
7pub use self::errors::PostgresSourceError;
8pub use cidr_02::IpInet;
9pub use connection::rewrite_tls_args;
10pub use pgvector::{Bit, HalfVector, SparseVector, Vector};
11pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
12
13use crate::constants::DB_BUFFER_SIZE;
14use crate::{
15 data_order::DataOrder,
16 errors::ConnectorXError,
17 sources::{PartitionParser, Produce, Source, SourcePartition},
18 sql::{count_query, CXQuery},
19};
20use anyhow::anyhow;
21use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
22use csv::{ReaderBuilder, StringRecord, StringRecordsIntoIter};
23use fehler::{throw, throws};
24use hex::decode;
25use postgres::{
26 binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow},
27 fallible_iterator::FallibleIterator,
28 tls::{MakeTlsConnect, TlsConnect},
29 Config, CopyOutReader, Row, RowIter, SimpleQueryMessage, Socket,
30};
31use r2d2::{Pool, PooledConnection};
32use r2d2_postgres::PostgresConnectionManager;
33use rust_decimal::Decimal;
34use serde_json::{from_str, Value};
35use sqlparser::dialect::PostgreSqlDialect;
36use std::collections::HashMap;
37use std::convert::TryFrom;
38use std::marker::PhantomData;
39use uuid::Uuid;
40
41pub enum BinaryProtocol {}
43
44pub enum CSVProtocol {}
46
47pub enum CursorProtocol {}
49
50pub enum SimpleProtocol {}
52
53type PgManager<C> = PostgresConnectionManager<C>;
54type PgConn<C> = PooledConnection<PgManager<C>>;
55
56macro_rules! impl_produce_unimplemented {
57 ($(($protocol: ty, $t: ty, $msg: expr),)+) => {
58 $(
59 impl<'r> Produce<'r, $t> for $protocol {
60 type Error = PostgresSourceError;
61
62 #[throws(PostgresSourceError)]
63 fn produce(&'r mut self) -> $t {
64 unimplemented!($msg);
65 }
66 }
67
68 impl<'r> Produce<'r, Option<$t>> for $protocol {
69 type Error = PostgresSourceError;
70
71 #[throws(PostgresSourceError)]
72 fn produce(&'r mut self) -> Option<$t> {
73 unimplemented!($msg);
74 }
75 }
76 )+
77 };
78}
79
80impl_produce_unimplemented!(
81 (PostgresCSVSourceParser<'_>, HashMap<String, Option<String>>, "Please use `cursor` protocol for hstore type"),
82 (PostgresCSVSourceParser<'_>, Vector, "Please use `binary` protocol for vector type"),
83 (PostgresCSVSourceParser<'_>, HalfVector, "Please use `binary` protocol for halfvector type"),
84 (PostgresCSVSourceParser<'_>, Bit, "Please use `binary` protocol for bit type"),
85 (PostgresCSVSourceParser<'_>, SparseVector, "Please use `binary` protocol for sparsevector type"),
86
87
88 (PostgresSimpleSourceParser,HashMap<String, Option<String>>, "unimplemented"),
89 (PostgresSimpleSourceParser,Value, "unimplemented"),
90 (PostgresSimpleSourceParser, Vector, "Please use `binary` protocol for vector type"),
91 (PostgresSimpleSourceParser, HalfVector, "Please use `binary` protocol for halfvector type"),
92 (PostgresSimpleSourceParser, Bit, "Please use `binary` protocol for bit type"),
93 (PostgresSimpleSourceParser, SparseVector, "Please use `binary` protocol for sparsevector type"),
94
95);
96
97fn convert_row<'b, R: TryFrom<usize> + postgres::types::FromSql<'b> + Clone>(row: &'b Row) -> R {
99 let nrows: Option<R> = row.get(0);
100 nrows.expect("Could not parse int result from count_query")
101}
102
103#[throws(PostgresSourceError)]
104fn get_total_rows<C>(conn: &mut PgConn<C>, query: &CXQuery<String>) -> usize
105where
106 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
107 C::TlsConnect: Send,
108 C::Stream: Send,
109 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
110{
111 let dialect = PostgreSqlDialect {};
112
113 let row = conn.query_one(count_query(query, &dialect)?.as_str(), &[])?;
114 let col_type = PostgresTypeSystem::from(row.columns()[0].type_());
115 match col_type {
116 PostgresTypeSystem::Int2(_) => convert_row::<i16>(&row) as usize,
117 PostgresTypeSystem::Int4(_) => convert_row::<i32>(&row) as usize,
118 PostgresTypeSystem::Int8(_) => convert_row::<i64>(&row) as usize,
119 _ => throw!(anyhow!(
120 "The result of the count query was not an int, aborting."
121 )),
122 }
123}
124
125pub struct PostgresSource<P, C>
126where
127 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
128 C::TlsConnect: Send,
129 C::Stream: Send,
130 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
131{
132 pool: Pool<PgManager<C>>,
133 origin_query: Option<String>,
134 queries: Vec<CXQuery<String>>,
135 names: Vec<String>,
136 schema: Vec<PostgresTypeSystem>,
137 pg_schema: Vec<postgres::types::Type>,
138 pre_execution_queries: Option<Vec<String>>,
139 _protocol: PhantomData<P>,
140}
141
142impl<P, C> PostgresSource<P, C>
143where
144 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
145 C::TlsConnect: Send,
146 C::Stream: Send,
147 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
148{
149 #[throws(PostgresSourceError)]
150 pub fn new(config: Config, tls: C, nconn: usize) -> Self {
151 let manager = PostgresConnectionManager::new(config, tls);
152 let pool = Pool::builder().max_size(nconn as u32).build(manager)?;
153
154 Self {
155 pool,
156 origin_query: None,
157 queries: vec![],
158 names: vec![],
159 schema: vec![],
160 pg_schema: vec![],
161 pre_execution_queries: None,
162 _protocol: PhantomData,
163 }
164 }
165}
166
167impl<P, C> Source for PostgresSource<P, C>
168where
169 PostgresSourcePartition<P, C>:
170 SourcePartition<TypeSystem = PostgresTypeSystem, Error = PostgresSourceError>,
171 P: Send,
172 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
173 C::TlsConnect: Send,
174 C::Stream: Send,
175 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
176{
177 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
178 type Partition = PostgresSourcePartition<P, C>;
179 type TypeSystem = PostgresTypeSystem;
180 type Error = PostgresSourceError;
181
182 #[throws(PostgresSourceError)]
183 fn set_data_order(&mut self, data_order: DataOrder) {
184 if !matches!(data_order, DataOrder::RowMajor) {
185 throw!(ConnectorXError::UnsupportedDataOrder(data_order));
186 }
187 }
188
189 fn set_queries<Q: ToString>(&mut self, queries: &[CXQuery<Q>]) {
190 self.queries = queries.iter().map(|q| q.map(Q::to_string)).collect();
191 }
192
193 fn set_origin_query(&mut self, query: Option<String>) {
194 self.origin_query = query;
195 }
196
197 fn set_pre_execution_queries(&mut self, pre_execution_queries: Option<&[String]>) {
198 self.pre_execution_queries = pre_execution_queries.map(|s| s.to_vec());
199 }
200
201 #[throws(PostgresSourceError)]
202 fn fetch_metadata(&mut self) {
203 assert!(!self.queries.is_empty());
204
205 let mut conn = self.pool.get()?;
206 let first_query = &self.queries[0];
207
208 let stmt = conn.prepare(first_query.as_str())?;
209
210 let (names, pg_types): (Vec<String>, Vec<postgres::types::Type>) = stmt
211 .columns()
212 .iter()
213 .map(|col| (col.name().to_string(), col.type_().clone()))
214 .unzip();
215
216 self.names = names;
217 self.schema = pg_types.iter().map(PostgresTypeSystem::from).collect();
218 self.pg_schema = self
219 .schema
220 .iter()
221 .zip(pg_types.iter())
222 .map(|(t1, t2)| PostgresTypePairs(t2, t1).into())
223 .collect();
224 }
225
226 #[throws(PostgresSourceError)]
227 fn result_rows(&mut self) -> Option<usize> {
228 match &self.origin_query {
229 Some(q) => {
230 let cxq = CXQuery::Naked(q.clone());
231 let mut conn = self.pool.get()?;
232 let nrows = get_total_rows(&mut conn, &cxq)?;
233 Some(nrows)
234 }
235 None => None,
236 }
237 }
238
239 fn names(&self) -> Vec<String> {
240 self.names.clone()
241 }
242
243 fn schema(&self) -> Vec<Self::TypeSystem> {
244 self.schema.clone()
245 }
246
247 #[throws(PostgresSourceError)]
248 fn partition(self) -> Vec<Self::Partition> {
249 let mut ret = vec![];
250 for query in self.queries {
251 let mut conn = self.pool.get()?;
252
253 if let Some(pre_queries) = &self.pre_execution_queries {
254 for pre_query in pre_queries {
255 conn.query(pre_query, &[])?;
256 }
257 }
258
259 ret.push(PostgresSourcePartition::<P, C>::new(
260 conn,
261 &query,
262 &self.schema,
263 &self.pg_schema,
264 ));
265 }
266 ret
267 }
268}
269
270pub struct PostgresSourcePartition<P, C>
271where
272 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
273 C::TlsConnect: Send,
274 C::Stream: Send,
275 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
276{
277 conn: PgConn<C>,
278 query: CXQuery<String>,
279 schema: Vec<PostgresTypeSystem>,
280 pg_schema: Vec<postgres::types::Type>,
281 nrows: usize,
282 ncols: usize,
283 _protocol: PhantomData<P>,
284}
285
286impl<P, C> PostgresSourcePartition<P, C>
287where
288 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
289 C::TlsConnect: Send,
290 C::Stream: Send,
291 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
292{
293 pub fn new(
294 conn: PgConn<C>,
295 query: &CXQuery<String>,
296 schema: &[PostgresTypeSystem],
297 pg_schema: &[postgres::types::Type],
298 ) -> Self {
299 Self {
300 conn,
301 query: query.clone(),
302 schema: schema.to_vec(),
303 pg_schema: pg_schema.to_vec(),
304 nrows: 0,
305 ncols: schema.len(),
306 _protocol: PhantomData,
307 }
308 }
309}
310
311impl<C> SourcePartition for PostgresSourcePartition<BinaryProtocol, C>
312where
313 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
314 C::TlsConnect: Send,
315 C::Stream: Send,
316 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
317{
318 type TypeSystem = PostgresTypeSystem;
319 type Parser<'a> = PostgresBinarySourcePartitionParser<'a>;
320 type Error = PostgresSourceError;
321
322 #[throws(PostgresSourceError)]
323 fn result_rows(&mut self) -> () {
324 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
325 }
326
327 #[throws(PostgresSourceError)]
328 fn parser(&mut self) -> Self::Parser<'_> {
329 let query = format!("COPY ({}) TO STDOUT WITH BINARY", self.query);
330 let reader = self.conn.copy_out(&*query)?; let iter = BinaryCopyOutIter::new(reader, &self.pg_schema);
332
333 PostgresBinarySourcePartitionParser::new(iter, &self.schema)
334 }
335
336 fn nrows(&self) -> usize {
337 self.nrows
338 }
339
340 fn ncols(&self) -> usize {
341 self.ncols
342 }
343}
344
345impl<C> SourcePartition for PostgresSourcePartition<CSVProtocol, C>
346where
347 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
348 C::TlsConnect: Send,
349 C::Stream: Send,
350 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
351{
352 type TypeSystem = PostgresTypeSystem;
353 type Parser<'a> = PostgresCSVSourceParser<'a>;
354 type Error = PostgresSourceError;
355
356 #[throws(PostgresSourceError)]
357 fn result_rows(&mut self) {
358 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
359 }
360
361 #[throws(PostgresSourceError)]
362 fn parser(&mut self) -> Self::Parser<'_> {
363 let query = format!("COPY ({}) TO STDOUT WITH CSV", self.query);
364 let reader = self.conn.copy_out(&*query)?; let iter = ReaderBuilder::new()
366 .has_headers(false)
367 .from_reader(reader)
368 .into_records();
369
370 PostgresCSVSourceParser::new(iter, &self.schema)
371 }
372
373 fn nrows(&self) -> usize {
374 self.nrows
375 }
376
377 fn ncols(&self) -> usize {
378 self.ncols
379 }
380}
381
382impl<C> SourcePartition for PostgresSourcePartition<CursorProtocol, C>
383where
384 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
385 C::TlsConnect: Send,
386 C::Stream: Send,
387 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
388{
389 type TypeSystem = PostgresTypeSystem;
390 type Parser<'a> = PostgresRawSourceParser<'a>;
391 type Error = PostgresSourceError;
392
393 #[throws(PostgresSourceError)]
394 fn result_rows(&mut self) {
395 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
396 }
397
398 #[throws(PostgresSourceError)]
399 fn parser(&mut self) -> Self::Parser<'_> {
400 let iter = self
401 .conn
402 .query_raw::<_, bool, _>(self.query.as_str(), vec![])?; PostgresRawSourceParser::new(iter, &self.schema)
404 }
405
406 fn nrows(&self) -> usize {
407 self.nrows
408 }
409
410 fn ncols(&self) -> usize {
411 self.ncols
412 }
413}
414pub struct PostgresBinarySourcePartitionParser<'a> {
415 iter: BinaryCopyOutIter<'a>,
416 rowbuf: Vec<BinaryCopyOutRow>,
417 ncols: usize,
418 current_col: usize,
419 current_row: usize,
420 is_finished: bool,
421}
422
423impl<'a> PostgresBinarySourcePartitionParser<'a> {
424 pub fn new(iter: BinaryCopyOutIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
425 Self {
426 iter,
427 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
428 ncols: schema.len(),
429 current_row: 0,
430 current_col: 0,
431 is_finished: false,
432 }
433 }
434
435 #[throws(PostgresSourceError)]
436 fn next_loc(&mut self) -> (usize, usize) {
437 let ret = (self.current_row, self.current_col);
438 self.current_row += (self.current_col + 1) / self.ncols;
439 self.current_col = (self.current_col + 1) % self.ncols;
440 ret
441 }
442}
443
444impl<'a> PartitionParser<'a> for PostgresBinarySourcePartitionParser<'a> {
445 type TypeSystem = PostgresTypeSystem;
446 type Error = PostgresSourceError;
447
448 #[throws(PostgresSourceError)]
449 fn fetch_next(&mut self) -> (usize, bool) {
450 assert!(self.current_col == 0);
451 let remaining_rows = self.rowbuf.len() - self.current_row;
452 if remaining_rows > 0 {
453 return (remaining_rows, self.is_finished);
454 } else if self.is_finished {
455 return (0, self.is_finished);
456 }
457
458 if !self.rowbuf.is_empty() {
460 self.rowbuf.drain(..);
461 }
462 for _ in 0..DB_BUFFER_SIZE {
463 match self.iter.next()? {
464 Some(row) => {
465 self.rowbuf.push(row);
466 }
467 None => {
468 self.is_finished = true;
469 break;
470 }
471 }
472 }
473
474 self.current_row = 0;
476 self.current_col = 0;
477
478 (self.rowbuf.len(), self.is_finished)
479 }
480}
481
482macro_rules! impl_produce {
483 ($($t: ty,)+) => {
484 $(
485 impl<'r, 'a> Produce<'r, $t> for PostgresBinarySourcePartitionParser<'a> {
486 type Error = PostgresSourceError;
487
488 #[throws(PostgresSourceError)]
489 fn produce(&'r mut self) -> $t {
490 let (ridx, cidx) = self.next_loc()?;
491 let row = &self.rowbuf[ridx];
492 let val = row.try_get(cidx)?;
493 val
494 }
495 }
496
497 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresBinarySourcePartitionParser<'a> {
498 type Error = PostgresSourceError;
499
500 #[throws(PostgresSourceError)]
501 fn produce(&'r mut self) -> Option<$t> {
502 let (ridx, cidx) = self.next_loc()?;
503 let row = &self.rowbuf[ridx];
504 let val = row.try_get(cidx)?;
505 val
506 }
507 }
508 )+
509 };
510}
511
512impl_produce!(
513 i8,
514 i16,
515 i32,
516 i64,
517 f32,
518 f64,
519 Decimal,
520 bool,
521 &'r str,
522 Vec<u8>,
523 NaiveTime,
524 Uuid,
525 Value,
526 IpInet,
527 Vector,
528 HalfVector,
529 Bit,
530 SparseVector,
531 Vec<Option<bool>>,
532 Vec<Option<i16>>,
533 Vec<Option<i32>>,
534 Vec<Option<i64>>,
535 Vec<Option<Decimal>>,
536 Vec<Option<f32>>,
537 Vec<Option<f64>>,
538 Vec<Option<String>>,
539);
540
541impl<'r> Produce<'r, NaiveDateTime> for PostgresBinarySourcePartitionParser<'_> {
542 type Error = PostgresSourceError;
543
544 #[throws(PostgresSourceError)]
545 fn produce(&'r mut self) -> NaiveDateTime {
546 let (ridx, cidx) = self.next_loc()?;
547 let row = &self.rowbuf[ridx];
548 let val = row.try_get(cidx)?;
549 match val {
550 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
551 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
552 postgres::types::Timestamp::Value(t) => t,
553 }
554 }
555}
556
557impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresBinarySourcePartitionParser<'_> {
558 type Error = PostgresSourceError;
559
560 #[throws(PostgresSourceError)]
561 fn produce(&'r mut self) -> Option<NaiveDateTime> {
562 let (ridx, cidx) = self.next_loc()?;
563 let row = &self.rowbuf[ridx];
564 let val = row.try_get(cidx)?;
565 match val {
566 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
567 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
568 Some(postgres::types::Timestamp::Value(t)) => t,
569 None => None,
570 }
571 }
572}
573
574impl<'r> Produce<'r, DateTime<Utc>> for PostgresBinarySourcePartitionParser<'_> {
575 type Error = PostgresSourceError;
576
577 #[throws(PostgresSourceError)]
578 fn produce(&'r mut self) -> DateTime<Utc> {
579 let (ridx, cidx) = self.next_loc()?;
580 let row = &self.rowbuf[ridx];
581 let val = row.try_get(cidx)?;
582 match val {
583 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
584 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
585 postgres::types::Timestamp::Value(t) => t,
586 }
587 }
588}
589
590impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresBinarySourcePartitionParser<'_> {
591 type Error = PostgresSourceError;
592
593 #[throws(PostgresSourceError)]
594 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
595 let (ridx, cidx) = self.next_loc()?;
596 let row = &self.rowbuf[ridx];
597 let val = row.try_get(cidx)?;
598 match val {
599 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
600 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
601 Some(postgres::types::Timestamp::Value(t)) => t,
602 None => None,
603 }
604 }
605}
606
607impl<'r> Produce<'r, NaiveDate> for PostgresBinarySourcePartitionParser<'_> {
608 type Error = PostgresSourceError;
609
610 #[throws(PostgresSourceError)]
611 fn produce(&'r mut self) -> NaiveDate {
612 let (ridx, cidx) = self.next_loc()?;
613 let row = &self.rowbuf[ridx];
614 let val = row.try_get(cidx)?;
615 match val {
616 postgres::types::Date::PosInfinity => NaiveDate::MAX,
617 postgres::types::Date::NegInfinity => NaiveDate::MIN,
618 postgres::types::Date::Value(t) => t,
619 }
620 }
621}
622
623impl<'r> Produce<'r, Option<NaiveDate>> for PostgresBinarySourcePartitionParser<'_> {
624 type Error = PostgresSourceError;
625
626 #[throws(PostgresSourceError)]
627 fn produce(&'r mut self) -> Option<NaiveDate> {
628 let (ridx, cidx) = self.next_loc()?;
629 let row = &self.rowbuf[ridx];
630 let val = row.try_get(cidx)?;
631 match val {
632 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
633 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
634 Some(postgres::types::Date::Value(t)) => t,
635 None => None,
636 }
637 }
638}
639
640impl Produce<'_, HashMap<String, Option<String>>> for PostgresBinarySourcePartitionParser<'_> {
641 type Error = PostgresSourceError;
642 #[throws(PostgresSourceError)]
643 fn produce(&mut self) -> HashMap<String, Option<String>> {
644 unimplemented!("Please use `cursor` protocol for hstore type");
645 }
646}
647
648impl Produce<'_, Option<HashMap<String, Option<String>>>>
649 for PostgresBinarySourcePartitionParser<'_>
650{
651 type Error = PostgresSourceError;
652 #[throws(PostgresSourceError)]
653 fn produce(&mut self) -> Option<HashMap<String, Option<String>>> {
654 unimplemented!("Please use `cursor` protocol for hstore type");
655 }
656}
657
658pub struct PostgresCSVSourceParser<'a> {
659 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
660 rowbuf: Vec<StringRecord>,
661 ncols: usize,
662 current_col: usize,
663 current_row: usize,
664 is_finished: bool,
665}
666
667impl<'a> PostgresCSVSourceParser<'a> {
668 pub fn new(
669 iter: StringRecordsIntoIter<CopyOutReader<'a>>,
670 schema: &[PostgresTypeSystem],
671 ) -> Self {
672 Self {
673 iter,
674 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
675 ncols: schema.len(),
676 current_row: 0,
677 current_col: 0,
678 is_finished: false,
679 }
680 }
681
682 #[throws(PostgresSourceError)]
683 fn next_loc(&mut self) -> (usize, usize) {
684 let ret = (self.current_row, self.current_col);
685 self.current_row += (self.current_col + 1) / self.ncols;
686 self.current_col = (self.current_col + 1) % self.ncols;
687 ret
688 }
689}
690
691impl<'a> PartitionParser<'a> for PostgresCSVSourceParser<'a> {
692 type Error = PostgresSourceError;
693 type TypeSystem = PostgresTypeSystem;
694
695 #[throws(PostgresSourceError)]
696 fn fetch_next(&mut self) -> (usize, bool) {
697 assert!(self.current_col == 0);
698 let remaining_rows = self.rowbuf.len() - self.current_row;
699 if remaining_rows > 0 {
700 return (remaining_rows, self.is_finished);
701 } else if self.is_finished {
702 return (0, self.is_finished);
703 }
704
705 if !self.rowbuf.is_empty() {
706 self.rowbuf.drain(..);
707 }
708 for _ in 0..DB_BUFFER_SIZE {
709 if let Some(row) = self.iter.next() {
710 self.rowbuf.push(row?);
711 } else {
712 self.is_finished = true;
713 break;
714 }
715 }
716 self.current_row = 0;
717 self.current_col = 0;
718 (self.rowbuf.len(), self.is_finished)
719 }
720}
721
722macro_rules! impl_csv_produce {
723 ($($t: ty,)+) => {
724 $(
725 impl<'r, 'a> Produce<'r, $t> for PostgresCSVSourceParser<'a> {
726 type Error = PostgresSourceError;
727
728 #[throws(PostgresSourceError)]
729 fn produce(&'r mut self) -> $t {
730 let (ridx, cidx) = self.next_loc()?;
731 self.rowbuf[ridx][cidx].parse().map_err(|_| {
732 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
733 })?
734 }
735 }
736
737 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresCSVSourceParser<'a> {
738 type Error = PostgresSourceError;
739
740 #[throws(PostgresSourceError)]
741 fn produce(&'r mut self) -> Option<$t> {
742 let (ridx, cidx) = self.next_loc()?;
743 match &self.rowbuf[ridx][cidx][..] {
744 "" => None,
745 v => Some(v.parse().map_err(|_| {
746 ConnectorXError::cannot_produce::<$t>(Some(self.rowbuf[ridx][cidx].into()))
747 })?),
748 }
749 }
750 }
751 )+
752 };
753}
754
755impl_csv_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
756
757macro_rules! impl_csv_vec_produce {
758 ($($t: ty,)+) => {
759 $(
760 impl<'r, 'a> Produce<'r, Vec<Option<$t>>> for PostgresCSVSourceParser<'a> {
761 type Error = PostgresSourceError;
762
763 #[throws(PostgresSourceError)]
764 fn produce(&mut self) -> Vec<Option<$t>> {
765 let (ridx, cidx) = self.next_loc()?;
766 let s = &self.rowbuf[ridx][cidx][..];
767 match s {
768 "{}" => vec![],
769 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
770 s => s[1..s.len() - 1]
771 .split(",")
772 .map(|v| {
773 if v == "NULL" {
774 Ok(None)
775 } else {
776 match v.parse() {
777 Ok(v) => Ok(Some(v)),
778 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
779 }
780 }
781 })
782 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
783 }
784 }
785 }
786
787 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresCSVSourceParser<'a> {
788 type Error = PostgresSourceError;
789
790 #[throws(PostgresSourceError)]
791 fn produce(&mut self) -> Option<Vec<Option<$t>>> {
792 let (ridx, cidx) = self.next_loc()?;
793 let s = &self.rowbuf[ridx][cidx][..];
794 match s {
795 "" => None,
796 "{}" => Some(vec![]),
797 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<$t>(Some(s.into()))),
798 s => Some(
799 s[1..s.len() - 1]
800 .split(",")
801 .map(|v| {
802 if v == "NULL" {
803 Ok(None)
804 } else {
805 match v.parse() {
806 Ok(v) => Ok(Some(v)),
807 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))
808 }
809 }
810 })
811 .collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?,
812 ),
813 }
814 }
815 }
816 )+
817 };
818}
819
820impl_csv_vec_produce!(i8, i16, i32, i64, f32, f64, Decimal, String,);
821
822impl Produce<'_, bool> for PostgresCSVSourceParser<'_> {
823 type Error = PostgresSourceError;
824
825 #[throws(PostgresSourceError)]
826 fn produce(&mut self) -> bool {
827 let (ridx, cidx) = self.next_loc()?;
828 let ret = match &self.rowbuf[ridx][cidx][..] {
829 "t" => true,
830 "f" => false,
831 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
832 self.rowbuf[ridx][cidx].into()
833 ))),
834 };
835 ret
836 }
837}
838
839impl Produce<'_, Option<bool>> for PostgresCSVSourceParser<'_> {
840 type Error = PostgresSourceError;
841
842 #[throws(PostgresSourceError)]
843 fn produce(&mut self) -> Option<bool> {
844 let (ridx, cidx) = self.next_loc()?;
845 let ret = match &self.rowbuf[ridx][cidx][..] {
846 "" => None,
847 "t" => Some(true),
848 "f" => Some(false),
849 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(
850 self.rowbuf[ridx][cidx].into()
851 ))),
852 };
853 ret
854 }
855}
856
857impl Produce<'_, Vec<Option<bool>>> for PostgresCSVSourceParser<'_> {
858 type Error = PostgresSourceError;
859
860 #[throws(PostgresSourceError)]
861 fn produce(&mut self) -> Vec<Option<bool>> {
862 let (ridx, cidx) = self.next_loc()?;
863 let s = &self.rowbuf[ridx][cidx][..];
864 match s {
865 "{}" => vec![],
866 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
867 s => s[1..s.len() - 1]
868 .split(',')
869 .map(|v| match v {
870 "NULL" => Ok(None),
871 "t" => Ok(Some(true)),
872 "f" => Ok(Some(false)),
873 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
874 })
875 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
876 }
877 }
878}
879
880impl Produce<'_, Option<Vec<Option<bool>>>> for PostgresCSVSourceParser<'_> {
881 type Error = PostgresSourceError;
882
883 #[throws(PostgresSourceError)]
884 fn produce(&mut self) -> Option<Vec<Option<bool>>> {
885 let (ridx, cidx) = self.next_loc()?;
886 let s = &self.rowbuf[ridx][cidx][..];
887 match s {
888 "" => None,
889 "{}" => Some(vec![]),
890 _ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
891 s => Some(
892 s[1..s.len() - 1]
893 .split(',')
894 .map(|v| match v {
895 "NULL" => Ok(None),
896 "t" => Ok(Some(true)),
897 "f" => Ok(Some(false)),
898 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
899 })
900 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
901 ),
902 }
903 }
904}
905
906impl<'r> Produce<'r, Decimal> for PostgresCSVSourceParser<'_> {
907 type Error = PostgresSourceError;
908
909 #[throws(PostgresSourceError)]
910 fn produce(&'r mut self) -> Decimal {
911 let (ridx, cidx) = self.next_loc()?;
912 match &self.rowbuf[ridx][cidx][..] {
913 "Infinity" => Decimal::MAX,
914 "-Infinity" => Decimal::MIN,
915 v => v
916 .parse()
917 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
918 }
919 }
920}
921
922impl<'r> Produce<'r, Option<Decimal>> for PostgresCSVSourceParser<'_> {
923 type Error = PostgresSourceError;
924
925 #[throws(PostgresSourceError)]
926 fn produce(&'r mut self) -> Option<Decimal> {
927 let (ridx, cidx) = self.next_loc()?;
928 match &self.rowbuf[ridx][cidx][..] {
929 "" => None,
930 "Infinity" => Some(Decimal::MAX),
931 "-Infinity" => Some(Decimal::MIN),
932 v => Some(
933 v.parse()
934 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(v.into())))?,
935 ),
936 }
937 }
938}
939
940impl Produce<'_, DateTime<Utc>> for PostgresCSVSourceParser<'_> {
941 type Error = PostgresSourceError;
942
943 #[throws(PostgresSourceError)]
944 fn produce(&mut self) -> DateTime<Utc> {
945 let (ridx, cidx) = self.next_loc()?;
946 match &self.rowbuf[ridx][cidx][..] {
947 "infinity" => DateTime::<Utc>::MAX_UTC,
948 "-infinity" => DateTime::<Utc>::MIN_UTC,
949 v => format!("{}:00", v)
951 .parse()
952 .map_err(|_| ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into())))?,
953 }
954 }
955}
956
957impl Produce<'_, Option<DateTime<Utc>>> for PostgresCSVSourceParser<'_> {
958 type Error = PostgresSourceError;
959
960 #[throws(PostgresSourceError)]
961 fn produce(&mut self) -> Option<DateTime<Utc>> {
962 let (ridx, cidx) = self.next_loc()?;
963 match &self.rowbuf[ridx][cidx][..] {
964 "" => None,
965 "infinity" => Some(DateTime::<Utc>::MAX_UTC),
966 "-infinity" => Some(DateTime::<Utc>::MIN_UTC),
967 v => {
968 Some(format!("{}:00", v).parse().map_err(|_| {
970 ConnectorXError::cannot_produce::<DateTime<Utc>>(Some(v.into()))
971 })?)
972 }
973 }
974 }
975}
976
977impl Produce<'_, NaiveDate> for PostgresCSVSourceParser<'_> {
978 type Error = PostgresSourceError;
979
980 #[throws(PostgresSourceError)]
981 fn produce(&mut self) -> NaiveDate {
982 let (ridx, cidx) = self.next_loc()?;
983 match &self.rowbuf[ridx][cidx][..] {
984 "infinity" => NaiveDate::MAX,
985 "-infinity" => NaiveDate::MIN,
986 v => NaiveDate::parse_from_str(v, "%Y-%m-%d")
987 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
988 }
989 }
990}
991
992impl Produce<'_, Option<NaiveDate>> for PostgresCSVSourceParser<'_> {
993 type Error = PostgresSourceError;
994
995 #[throws(PostgresSourceError)]
996 fn produce(&mut self) -> Option<NaiveDate> {
997 let (ridx, cidx) = self.next_loc()?;
998 match &self.rowbuf[ridx][cidx][..] {
999 "" => None,
1000 "infinity" => Some(NaiveDate::MAX),
1001 "-infinity" => Some(NaiveDate::MIN),
1002 v => Some(
1003 NaiveDate::parse_from_str(v, "%Y-%m-%d")
1004 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDate>(Some(v.into())))?,
1005 ),
1006 }
1007 }
1008}
1009
1010impl Produce<'_, NaiveDateTime> for PostgresCSVSourceParser<'_> {
1011 type Error = PostgresSourceError;
1012
1013 #[throws(PostgresSourceError)]
1014 fn produce(&mut self) -> NaiveDateTime {
1015 let (ridx, cidx) = self.next_loc()?;
1016 match &self.rowbuf[ridx][cidx] {
1017 "infinity" => NaiveDateTime::MAX,
1018 "-infinity" => NaiveDateTime::MIN,
1019 v => NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
1020 .map_err(|_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into())))?,
1021 }
1022 }
1023}
1024
1025impl Produce<'_, Option<NaiveDateTime>> for PostgresCSVSourceParser<'_> {
1026 type Error = PostgresSourceError;
1027
1028 #[throws(PostgresSourceError)]
1029 fn produce(&mut self) -> Option<NaiveDateTime> {
1030 let (ridx, cidx) = self.next_loc()?;
1031 match &self.rowbuf[ridx][cidx][..] {
1032 "" => None,
1033 "infinity" => Some(NaiveDateTime::MAX),
1034 "-infinity" => Some(NaiveDateTime::MIN),
1035 v => Some(
1036 NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
1037 ConnectorXError::cannot_produce::<NaiveDateTime>(Some(v.into()))
1038 })?,
1039 ),
1040 }
1041 }
1042}
1043
1044impl Produce<'_, NaiveTime> for PostgresCSVSourceParser<'_> {
1045 type Error = PostgresSourceError;
1046
1047 #[throws(PostgresSourceError)]
1048 fn produce(&mut self) -> NaiveTime {
1049 let (ridx, cidx) = self.next_loc()?;
1050 NaiveTime::parse_from_str(&self.rowbuf[ridx][cidx], "%H:%M:%S%.f").map_err(|_| {
1051 ConnectorXError::cannot_produce::<NaiveTime>(Some(self.rowbuf[ridx][cidx].into()))
1052 })?
1053 }
1054}
1055
1056impl Produce<'_, Option<NaiveTime>> for PostgresCSVSourceParser<'_> {
1057 type Error = PostgresSourceError;
1058
1059 #[throws(PostgresSourceError)]
1060 fn produce(&mut self) -> Option<NaiveTime> {
1061 let (ridx, cidx) = self.next_loc()?;
1062 match &self.rowbuf[ridx][cidx][..] {
1063 "" => None,
1064 v => Some(
1065 NaiveTime::parse_from_str(v, "%H:%M:%S%.f")
1066 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(v.into())))?,
1067 ),
1068 }
1069 }
1070}
1071
1072impl<'r> Produce<'r, &'r str> for PostgresCSVSourceParser<'_> {
1073 type Error = PostgresSourceError;
1074
1075 #[throws(PostgresSourceError)]
1076 fn produce(&'r mut self) -> &'r str {
1077 let (ridx, cidx) = self.next_loc()?;
1078 &self.rowbuf[ridx][cidx]
1079 }
1080}
1081
1082impl<'r> Produce<'r, Option<&'r str>> for PostgresCSVSourceParser<'_> {
1083 type Error = PostgresSourceError;
1084
1085 #[throws(PostgresSourceError)]
1086 fn produce(&'r mut self) -> Option<&'r str> {
1087 let (ridx, cidx) = self.next_loc()?;
1088 match &self.rowbuf[ridx][cidx][..] {
1089 "" => None,
1090 v => Some(v),
1091 }
1092 }
1093}
1094
1095impl<'r> Produce<'r, Vec<u8>> for PostgresCSVSourceParser<'_> {
1096 type Error = PostgresSourceError;
1097
1098 #[throws(PostgresSourceError)]
1099 fn produce(&'r mut self) -> Vec<u8> {
1100 let (ridx, cidx) = self.next_loc()?;
1101 decode(&self.rowbuf[ridx][cidx][2..])? }
1103}
1104
1105impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresCSVSourceParser<'_> {
1106 type Error = PostgresSourceError;
1107
1108 #[throws(PostgresSourceError)]
1109 fn produce(&'r mut self) -> Option<Vec<u8>> {
1110 let (ridx, cidx) = self.next_loc()?;
1111 match &self.rowbuf[ridx][cidx] {
1112 "" => None,
1114 v => Some(decode(&v[2..])?),
1115 }
1116 }
1117}
1118
1119impl<'r> Produce<'r, Value> for PostgresCSVSourceParser<'_> {
1120 type Error = PostgresSourceError;
1121
1122 #[throws(PostgresSourceError)]
1123 fn produce(&'r mut self) -> Value {
1124 let (ridx, cidx) = self.next_loc()?;
1125 let v = &self.rowbuf[ridx][cidx];
1126 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1127 }
1128}
1129
1130impl<'r> Produce<'r, Option<Value>> for PostgresCSVSourceParser<'_> {
1131 type Error = PostgresSourceError;
1132
1133 #[throws(PostgresSourceError)]
1134 fn produce(&'r mut self) -> Option<Value> {
1135 let (ridx, cidx) = self.next_loc()?;
1136
1137 match &self.rowbuf[ridx][cidx][..] {
1138 "" => None,
1139 v => {
1140 from_str(v).map_err(|_| ConnectorXError::cannot_produce::<Value>(Some(v.into())))?
1141 }
1142 }
1143 }
1144}
1145
1146pub struct PostgresRawSourceParser<'a> {
1147 iter: RowIter<'a>,
1148 rowbuf: Vec<Row>,
1149 ncols: usize,
1150 current_col: usize,
1151 current_row: usize,
1152 is_finished: bool,
1153}
1154
1155impl<'a> PostgresRawSourceParser<'a> {
1156 pub fn new(iter: RowIter<'a>, schema: &[PostgresTypeSystem]) -> Self {
1157 Self {
1158 iter,
1159 rowbuf: Vec::with_capacity(DB_BUFFER_SIZE),
1160 ncols: schema.len(),
1161 current_row: 0,
1162 current_col: 0,
1163 is_finished: false,
1164 }
1165 }
1166
1167 #[throws(PostgresSourceError)]
1168 fn next_loc(&mut self) -> (usize, usize) {
1169 let ret = (self.current_row, self.current_col);
1170 self.current_row += (self.current_col + 1) / self.ncols;
1171 self.current_col = (self.current_col + 1) % self.ncols;
1172 ret
1173 }
1174}
1175
1176impl<'a> PartitionParser<'a> for PostgresRawSourceParser<'a> {
1177 type TypeSystem = PostgresTypeSystem;
1178 type Error = PostgresSourceError;
1179
1180 #[throws(PostgresSourceError)]
1181 fn fetch_next(&mut self) -> (usize, bool) {
1182 assert!(self.current_col == 0);
1183 let remaining_rows = self.rowbuf.len() - self.current_row;
1184 if remaining_rows > 0 {
1185 return (remaining_rows, self.is_finished);
1186 } else if self.is_finished {
1187 return (0, self.is_finished);
1188 }
1189
1190 if !self.rowbuf.is_empty() {
1191 self.rowbuf.drain(..);
1192 }
1193 for _ in 0..DB_BUFFER_SIZE {
1194 if let Some(row) = self.iter.next()? {
1195 self.rowbuf.push(row);
1196 } else {
1197 self.is_finished = true;
1198 break;
1199 }
1200 }
1201 self.current_row = 0;
1202 self.current_col = 0;
1203 (self.rowbuf.len(), self.is_finished)
1204 }
1205}
1206
1207macro_rules! impl_produce {
1208 ($($t: ty,)+) => {
1209 $(
1210 impl<'r, 'a> Produce<'r, $t> for PostgresRawSourceParser<'a> {
1211 type Error = PostgresSourceError;
1212
1213 #[throws(PostgresSourceError)]
1214 fn produce(&'r mut self) -> $t {
1215 let (ridx, cidx) = self.next_loc()?;
1216 let row = &self.rowbuf[ridx];
1217 let val = row.try_get(cidx)?;
1218 val
1219 }
1220 }
1221
1222 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresRawSourceParser<'a> {
1223 type Error = PostgresSourceError;
1224
1225 #[throws(PostgresSourceError)]
1226 fn produce(&'r mut self) -> Option<$t> {
1227 let (ridx, cidx) = self.next_loc()?;
1228 let row = &self.rowbuf[ridx];
1229 let val = row.try_get(cidx)?;
1230 val
1231 }
1232 }
1233 )+
1234 };
1235}
1236
1237impl_produce!(
1238 i8,
1239 i16,
1240 i32,
1241 i64,
1242 f32,
1243 f64,
1244 Decimal,
1245 bool,
1246 &'r str,
1247 Vec<u8>,
1248 NaiveTime,
1249 Uuid,
1250 Value,
1251 IpInet,
1252 Vector,
1253 HalfVector,
1254 Bit,
1255 SparseVector,
1256 HashMap<String, Option<String>>,
1257 Vec<Option<bool>>,
1258 Vec<Option<String>>,
1259 Vec<Option<i16>>,
1260 Vec<Option<i32>>,
1261 Vec<Option<i64>>,
1262 Vec<Option<f32>>,
1263 Vec<Option<f64>>,
1264 Vec<Option<Decimal>>,
1265);
1266
1267impl<'r> Produce<'r, DateTime<Utc>> for PostgresRawSourceParser<'_> {
1268 type Error = PostgresSourceError;
1269
1270 #[throws(PostgresSourceError)]
1271 fn produce(&'r mut self) -> DateTime<Utc> {
1272 let (ridx, cidx) = self.next_loc()?;
1273 let row = &self.rowbuf[ridx];
1274 let val: postgres::types::Timestamp<DateTime<Utc>> = row.try_get(cidx)?;
1275 match val {
1276 postgres::types::Timestamp::PosInfinity => DateTime::<Utc>::MAX_UTC,
1277 postgres::types::Timestamp::NegInfinity => DateTime::<Utc>::MIN_UTC,
1278 postgres::types::Timestamp::Value(t) => t,
1279 }
1280 }
1281}
1282
1283impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresRawSourceParser<'_> {
1284 type Error = PostgresSourceError;
1285
1286 #[throws(PostgresSourceError)]
1287 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
1288 let (ridx, cidx) = self.next_loc()?;
1289 let row = &self.rowbuf[ridx];
1290 let val = row.try_get(cidx)?;
1291 match val {
1292 Some(postgres::types::Timestamp::PosInfinity) => Some(DateTime::<Utc>::MAX_UTC),
1293 Some(postgres::types::Timestamp::NegInfinity) => Some(DateTime::<Utc>::MIN_UTC),
1294 Some(postgres::types::Timestamp::Value(t)) => t,
1295 None => None,
1296 }
1297 }
1298}
1299
1300impl<'r> Produce<'r, NaiveDateTime> for PostgresRawSourceParser<'_> {
1301 type Error = PostgresSourceError;
1302
1303 #[throws(PostgresSourceError)]
1304 fn produce(&'r mut self) -> NaiveDateTime {
1305 let (ridx, cidx) = self.next_loc()?;
1306 let row = &self.rowbuf[ridx];
1307 let val: postgres::types::Timestamp<NaiveDateTime> = row.try_get(cidx)?;
1308 match val {
1309 postgres::types::Timestamp::PosInfinity => NaiveDateTime::MAX,
1310 postgres::types::Timestamp::NegInfinity => NaiveDateTime::MIN,
1311 postgres::types::Timestamp::Value(t) => t,
1312 }
1313 }
1314}
1315
1316impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresRawSourceParser<'_> {
1317 type Error = PostgresSourceError;
1318
1319 #[throws(PostgresSourceError)]
1320 fn produce(&'r mut self) -> Option<NaiveDateTime> {
1321 let (ridx, cidx) = self.next_loc()?;
1322 let row = &self.rowbuf[ridx];
1323 let val = row.try_get(cidx)?;
1324 match val {
1325 Some(postgres::types::Timestamp::PosInfinity) => Some(NaiveDateTime::MAX),
1326 Some(postgres::types::Timestamp::NegInfinity) => Some(NaiveDateTime::MIN),
1327 Some(postgres::types::Timestamp::Value(t)) => t,
1328 None => None,
1329 }
1330 }
1331}
1332
1333impl<'r> Produce<'r, NaiveDate> for PostgresRawSourceParser<'_> {
1334 type Error = PostgresSourceError;
1335
1336 #[throws(PostgresSourceError)]
1337 fn produce(&'r mut self) -> NaiveDate {
1338 let (ridx, cidx) = self.next_loc()?;
1339 let row = &self.rowbuf[ridx];
1340 let val: postgres::types::Date<NaiveDate> = row.try_get(cidx)?;
1341 match val {
1342 postgres::types::Date::PosInfinity => NaiveDate::MAX,
1343 postgres::types::Date::NegInfinity => NaiveDate::MIN,
1344 postgres::types::Date::Value(t) => t,
1345 }
1346 }
1347}
1348
1349impl<'r> Produce<'r, Option<NaiveDate>> for PostgresRawSourceParser<'_> {
1350 type Error = PostgresSourceError;
1351
1352 #[throws(PostgresSourceError)]
1353 fn produce(&'r mut self) -> Option<NaiveDate> {
1354 let (ridx, cidx) = self.next_loc()?;
1355 let row = &self.rowbuf[ridx];
1356 let val = row.try_get(cidx)?;
1357 match val {
1358 Some(postgres::types::Date::PosInfinity) => Some(NaiveDate::MAX),
1359 Some(postgres::types::Date::NegInfinity) => Some(NaiveDate::MIN),
1360 Some(postgres::types::Date::Value(t)) => t,
1361 None => None,
1362 }
1363 }
1364}
1365
1366impl<C> SourcePartition for PostgresSourcePartition<SimpleProtocol, C>
1367where
1368 C: MakeTlsConnect<Socket> + Clone + 'static + Sync + Send,
1369 C::TlsConnect: Send,
1370 C::Stream: Send,
1371 <C::TlsConnect as TlsConnect<Socket>>::Future: Send,
1372{
1373 type TypeSystem = PostgresTypeSystem;
1374 type Parser<'a> = PostgresSimpleSourceParser;
1375 type Error = PostgresSourceError;
1376
1377 #[throws(PostgresSourceError)]
1378 fn result_rows(&mut self) {
1379 self.nrows = get_total_rows(&mut self.conn, &self.query)?;
1380 }
1381
1382 #[throws(PostgresSourceError)]
1383 fn parser(&mut self) -> Self::Parser<'_> {
1384 let rows = self.conn.simple_query(self.query.as_str())?; PostgresSimpleSourceParser::new(rows, &self.schema)
1386 }
1387
1388 fn nrows(&self) -> usize {
1389 self.nrows
1390 }
1391
1392 fn ncols(&self) -> usize {
1393 self.ncols
1394 }
1395}
1396
1397pub struct PostgresSimpleSourceParser {
1398 rows: Vec<SimpleQueryMessage>,
1399 ncols: usize,
1400 current_col: usize,
1401 current_row: usize,
1402}
1403impl PostgresSimpleSourceParser {
1404 pub fn new(rows: Vec<SimpleQueryMessage>, schema: &[PostgresTypeSystem]) -> Self {
1405 Self {
1406 rows,
1407 ncols: schema.len(),
1408 current_row: 0,
1409 current_col: 0,
1410 }
1411 }
1412
1413 #[throws(PostgresSourceError)]
1414 fn next_loc(&mut self) -> (usize, usize) {
1415 let ret = (self.current_row, self.current_col);
1416 self.current_row += (self.current_col + 1) / self.ncols;
1417 self.current_col = (self.current_col + 1) % self.ncols;
1418 ret
1419 }
1420}
1421
1422impl PartitionParser<'_> for PostgresSimpleSourceParser {
1423 type TypeSystem = PostgresTypeSystem;
1424 type Error = PostgresSourceError;
1425
1426 #[throws(PostgresSourceError)]
1427 fn fetch_next(&mut self) -> (usize, bool) {
1428 self.current_row = 0;
1429 self.current_col = 0;
1430 if !self.rows.is_empty() {
1431 if let SimpleQueryMessage::RowDescription(_) = &self.rows[0] {
1432 self.current_row = 1;
1433 }
1434 }
1435
1436 (self.rows.len() - 1 - self.current_row, true) }
1438}
1439
1440macro_rules! impl_simple_produce {
1441 ($($t: ty,)+) => {
1442 $(
1443 impl<'r> Produce<'r, $t> for PostgresSimpleSourceParser {
1444 type Error = PostgresSourceError;
1445
1446 #[throws(PostgresSourceError)]
1447 fn produce(&'r mut self) -> $t {
1448 let (ridx, cidx) = self.next_loc()?;
1449 let val = match &self.rows[ridx] {
1450 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1451 Some(s) => s
1452 .parse()
1453 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1454 None => throw!(anyhow!(
1455 "Cannot parse NULL in NOT NULL column."
1456 )),
1457 },
1458 SimpleQueryMessage::CommandComplete(c) => {
1459 panic!("get command: {}", c);
1460 }
1461 _ => {
1462 panic!("what?");
1463 }
1464 };
1465 val
1466 }
1467 }
1468
1469 impl<'r, 'a> Produce<'r, Option<$t>> for PostgresSimpleSourceParser {
1470 type Error = PostgresSourceError;
1471
1472 #[throws(PostgresSourceError)]
1473 fn produce(&'r mut self) -> Option<$t> {
1474 let (ridx, cidx) = self.next_loc()?;
1475 let val = match &self.rows[ridx] {
1476 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1477 Some(s) => Some(
1478 s.parse()
1479 .map_err(|_| ConnectorXError::cannot_produce::<$t>(Some(s.into())))?,
1480 ),
1481 None => None,
1482 },
1483 SimpleQueryMessage::CommandComplete(c) => {
1484 panic!("get command: {}", c);
1485 }
1486 _ => {
1487 panic!("what?");
1488 }
1489 };
1490 val
1491 }
1492 }
1493 )+
1494 };
1495}
1496
1497impl_simple_produce!(i8, i16, i32, i64, f32, f64, Uuid, IpInet,);
1498
1499impl<'r> Produce<'r, bool> for PostgresSimpleSourceParser {
1500 type Error = PostgresSourceError;
1501
1502 #[throws(PostgresSourceError)]
1503 fn produce(&'r mut self) -> bool {
1504 let (ridx, cidx) = self.next_loc()?;
1505 let val = match &self.rows[ridx] {
1506 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1507 Some(s) => match s {
1508 "t" => true,
1509 "f" => false,
1510 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1511 },
1512 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1513 },
1514 SimpleQueryMessage::CommandComplete(c) => {
1515 panic!("get command: {}", c);
1516 }
1517 _ => {
1518 panic!("what?");
1519 }
1520 };
1521 val
1522 }
1523}
1524
1525impl<'r> Produce<'r, Option<bool>> for PostgresSimpleSourceParser {
1526 type Error = PostgresSourceError;
1527
1528 #[throws(PostgresSourceError)]
1529 fn produce(&'r mut self) -> Option<bool> {
1530 let (ridx, cidx) = self.next_loc()?;
1531 let val = match &self.rows[ridx] {
1532 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1533 Some(s) => match s {
1534 "t" => Some(true),
1535 "f" => Some(false),
1536 _ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
1537 },
1538 None => None,
1539 },
1540 SimpleQueryMessage::CommandComplete(c) => {
1541 panic!("get command: {}", c);
1542 }
1543 _ => {
1544 panic!("what?");
1545 }
1546 };
1547 val
1548 }
1549}
1550
1551impl<'r> Produce<'r, Decimal> for PostgresSimpleSourceParser {
1552 type Error = PostgresSourceError;
1553
1554 #[throws(PostgresSourceError)]
1555 fn produce(&'r mut self) -> Decimal {
1556 let (ridx, cidx) = self.next_loc()?;
1557 let val = match &self.rows[ridx] {
1558 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1559 Some("Infinity") => Decimal::MAX,
1560 Some("-Infinity") => Decimal::MIN,
1561 Some(s) => s
1562 .parse()
1563 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1564 None => throw!(anyhow!("Cannot parse NULL in NOT NULL column.")),
1565 },
1566 SimpleQueryMessage::CommandComplete(c) => {
1567 panic!("get command: {}", c);
1568 }
1569 _ => {
1570 panic!("what?");
1571 }
1572 };
1573 val
1574 }
1575}
1576
1577impl<'r> Produce<'r, Option<Decimal>> for PostgresSimpleSourceParser {
1578 type Error = PostgresSourceError;
1579
1580 #[throws(PostgresSourceError)]
1581 fn produce(&'r mut self) -> Option<Decimal> {
1582 let (ridx, cidx) = self.next_loc()?;
1583 let val = match &self.rows[ridx] {
1584 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1585 Some("Infinity") => Some(Decimal::MAX),
1586 Some("-Infinity") => Some(Decimal::MIN),
1587 Some(s) => Some(
1588 s.parse()
1589 .map_err(|_| ConnectorXError::cannot_produce::<Decimal>(Some(s.into())))?,
1590 ),
1591 None => None,
1592 },
1593 SimpleQueryMessage::CommandComplete(c) => {
1594 panic!("get command: {}", c);
1595 }
1596 _ => {
1597 panic!("what?");
1598 }
1599 };
1600 val
1601 }
1602}
1603
1604impl<'r> Produce<'r, &'r str> for PostgresSimpleSourceParser {
1605 type Error = PostgresSourceError;
1606
1607 #[throws(PostgresSourceError)]
1608 fn produce(&'r mut self) -> &'r str {
1609 let (ridx, cidx) = self.next_loc()?;
1610 let val = match &self.rows[ridx] {
1611 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1612 Some(s) => s,
1613 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1614 },
1615 SimpleQueryMessage::CommandComplete(c) => {
1616 panic!("get command: {}", c);
1617 }
1618 _ => {
1619 panic!("what?");
1620 }
1621 };
1622 val
1623 }
1624}
1625
1626impl<'r> Produce<'r, Option<&'r str>> for PostgresSimpleSourceParser {
1627 type Error = PostgresSourceError;
1628
1629 #[throws(PostgresSourceError)]
1630 fn produce(&'r mut self) -> Option<&'r str> {
1631 let (ridx, cidx) = self.next_loc()?;
1632 let val = match &self.rows[ridx] {
1633 SimpleQueryMessage::Row(row) => row.try_get(cidx)?,
1634 SimpleQueryMessage::CommandComplete(c) => {
1635 panic!("get command: {}", c);
1636 }
1637 _ => {
1638 panic!("what?");
1639 }
1640 };
1641 val
1642 }
1643}
1644
1645impl<'r> Produce<'r, Vec<u8>> for PostgresSimpleSourceParser {
1646 type Error = PostgresSourceError;
1647
1648 #[throws(PostgresSourceError)]
1649 fn produce(&'r mut self) -> Vec<u8> {
1650 let (ridx, cidx) = self.next_loc()?;
1651 let val = match &self.rows[ridx] {
1652 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1653 Some(s) => {
1654 let mut res = s.chars();
1655 res.next();
1656 res.next();
1657 decode(
1658 res.enumerate()
1659 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1660 .chars()
1661 .map(|c| c as u8)
1662 .collect::<Vec<u8>>(),
1663 )?
1664 }
1665 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1666 },
1667 SimpleQueryMessage::CommandComplete(c) => {
1668 panic!("get command: {}", c);
1669 }
1670 _ => {
1671 panic!("what?");
1672 }
1673 };
1674 val
1675 }
1676}
1677
1678impl<'r> Produce<'r, Option<Vec<u8>>> for PostgresSimpleSourceParser {
1679 type Error = PostgresSourceError;
1680
1681 #[throws(PostgresSourceError)]
1682 fn produce(&'r mut self) -> Option<Vec<u8>> {
1683 let (ridx, cidx) = self.next_loc()?;
1684 let val = match &self.rows[ridx] {
1685 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1686 Some(s) => {
1687 let mut res = s.chars();
1688 res.next();
1689 res.next();
1690 Some(decode(
1691 res.enumerate()
1692 .fold(String::new(), |acc, (_i, c)| format!("{}{}", acc, c))
1693 .chars()
1694 .map(|c| c as u8)
1695 .collect::<Vec<u8>>(),
1696 )?)
1697 }
1698 None => None,
1699 },
1700 SimpleQueryMessage::CommandComplete(c) => {
1701 panic!("get command: {}", c);
1702 }
1703 _ => {
1704 panic!("what?");
1705 }
1706 };
1707 val
1708 }
1709}
1710
1711fn rem_first_and_last(value: &str) -> &str {
1712 let mut chars = value.chars();
1713 chars.next();
1714 chars.next_back();
1715 chars.as_str()
1716}
1717
1718macro_rules! impl_simple_vec_produce {
1719 ($($t: ty,)+) => {
1720 $(
1721 impl<'r> Produce<'r, Vec<Option<$t>>> for PostgresSimpleSourceParser {
1722 type Error = PostgresSourceError;
1723
1724 #[throws(PostgresSourceError)]
1725 fn produce(&'r mut self) -> Vec<Option<$t>> {
1726 let (ridx, cidx) = self.next_loc()?;
1727 let val = match &self.rows[ridx] {
1728 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1729 Some(s) => match s{
1730 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1731 "{}" => vec![],
1732 _ => rem_first_and_last(s).split(",").map(|v| {
1733 if v == "NULL" {
1734 Ok(None)
1735 } else {
1736 match v.parse() {
1737 Ok(v) => Ok(Some(v)),
1738 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1739 }
1740 }
1741 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?
1742 },
1743 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1744 },
1745 SimpleQueryMessage::CommandComplete(c) => {
1746 panic!("get command: {}", c);
1747 }
1748 _ => {
1749 panic!("what?");
1750 }
1751 };
1752 val
1753 }
1754 }
1755
1756 impl<'r, 'a> Produce<'r, Option<Vec<Option<$t>>>> for PostgresSimpleSourceParser {
1757 type Error = PostgresSourceError;
1758
1759 #[throws(PostgresSourceError)]
1760 fn produce(&'r mut self) -> Option<Vec<Option<$t>>> {
1761 let (ridx, cidx) = self.next_loc()?;
1762 let val = match &self.rows[ridx] {
1763
1764 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1765 Some(s) => match s{
1766 "" => None,
1767 "{}" => Some(vec![]),
1768 _ => Some(rem_first_and_last(s).split(",").map(|v| {
1769 if v == "NULL" {
1770 Ok(None)
1771 } else {
1772 match v.parse() {
1773 Ok(v) => Ok(Some(v)),
1774 Err(e) => Err(e).map_err(|_| ConnectorXError::cannot_produce::<Vec<$t>>(Some(s.into())))
1775 }
1776 }
1777 }).collect::<Result<Vec<Option<$t>>, ConnectorXError>>()?)
1778 },
1779 None => None,
1780 },
1781
1782 SimpleQueryMessage::CommandComplete(c) => {
1783 panic!("get command: {}", c);
1784 }
1785 _ => {
1786 panic!("what?");
1787 }
1788 };
1789 val
1790 }
1791 }
1792 )+
1793 };
1794}
1795impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
1796
1797impl<'r> Produce<'r, Vec<Option<bool>>> for PostgresSimpleSourceParser {
1798 type Error = PostgresSourceError;
1799
1800 #[throws(PostgresSourceError)]
1801 fn produce(&'r mut self) -> Vec<Option<bool>> {
1802 let (ridx, cidx) = self.next_loc()?;
1803 let val = match &self.rows[ridx] {
1804 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1805 Some(s) => match s {
1806 "" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1807 "{}" => vec![],
1808 _ => rem_first_and_last(s)
1809 .split(',')
1810 .map(|token| match token {
1811 "NULL" => Ok(None),
1812 "t" => Ok(Some(true)),
1813 "f" => Ok(Some(false)),
1814 _ => {
1815 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1816 }
1817 })
1818 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1819 },
1820 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1821 },
1822 SimpleQueryMessage::CommandComplete(c) => {
1823 panic!("get command: {}", c);
1824 }
1825 _ => {
1826 panic!("what?");
1827 }
1828 };
1829 val
1830 }
1831}
1832
1833impl<'r> Produce<'r, Option<Vec<Option<bool>>>> for PostgresSimpleSourceParser {
1834 type Error = PostgresSourceError;
1835
1836 #[throws(PostgresSourceError)]
1837 fn produce(&'r mut self) -> Option<Vec<Option<bool>>> {
1838 let (ridx, cidx) = self.next_loc()?;
1839 let val = match &self.rows[ridx] {
1840 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1841 Some(s) => match s {
1842 "" => None,
1843 "{}" => Some(vec![]),
1844 _ => Some(
1845 rem_first_and_last(s)
1846 .split(',')
1847 .map(|token| match token {
1848 "NULL" => Ok(None),
1849 "t" => Ok(Some(true)),
1850 "f" => Ok(Some(false)),
1851 _ => {
1852 throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(
1853 s.into()
1854 )))
1855 }
1856 })
1857 .collect::<Result<Vec<Option<bool>>, ConnectorXError>>()?,
1858 ),
1859 },
1860 None => None,
1861 },
1862 SimpleQueryMessage::CommandComplete(c) => {
1863 panic!("get command: {}", c);
1864 }
1865 _ => {
1866 panic!("what?");
1867 }
1868 };
1869 val
1870 }
1871}
1872
1873impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
1874 type Error = PostgresSourceError;
1875
1876 #[throws(PostgresSourceError)]
1877 fn produce(&'r mut self) -> NaiveDate {
1878 let (ridx, cidx) = self.next_loc()?;
1879 let val = match &self.rows[ridx] {
1880 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1881 Some(s) => match s {
1882 "infinity" => NaiveDate::MAX,
1883 "-infinity" => NaiveDate::MIN,
1884 s => NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1885 ConnectorXError::cannot_produce::<NaiveDate>(Some(s.into()))
1886 })?,
1887 },
1888 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1889 },
1890 SimpleQueryMessage::CommandComplete(c) => {
1891 panic!("get command: {}", c);
1892 }
1893 _ => {
1894 panic!("what?");
1895 }
1896 };
1897 val
1898 }
1899}
1900
1901impl<'r> Produce<'r, Option<NaiveDate>> for PostgresSimpleSourceParser {
1902 type Error = PostgresSourceError;
1903
1904 #[throws(PostgresSourceError)]
1905 fn produce(&'r mut self) -> Option<NaiveDate> {
1906 let (ridx, cidx) = self.next_loc()?;
1907 let val = match &self.rows[ridx] {
1908 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1909 Some(s) => match s {
1910 "infinity" => Some(NaiveDate::MAX),
1911 "-infinity" => Some(NaiveDate::MIN),
1912 s => Some(NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| {
1913 ConnectorXError::cannot_produce::<Option<NaiveDate>>(Some(s.into()))
1914 })?),
1915 },
1916 None => None,
1917 },
1918 SimpleQueryMessage::CommandComplete(c) => {
1919 panic!("get command: {}", c);
1920 }
1921 _ => {
1922 panic!("what?");
1923 }
1924 };
1925 val
1926 }
1927}
1928
1929impl<'r> Produce<'r, NaiveTime> for PostgresSimpleSourceParser {
1930 type Error = PostgresSourceError;
1931
1932 #[throws(PostgresSourceError)]
1933 fn produce(&'r mut self) -> NaiveTime {
1934 let (ridx, cidx) = self.next_loc()?;
1935 let val = match &self.rows[ridx] {
1936 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1937 Some(s) => NaiveTime::parse_from_str(s, "%H:%M:%S%.f")
1938 .map_err(|_| ConnectorXError::cannot_produce::<NaiveTime>(Some(s.into())))?,
1939 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1940 },
1941 SimpleQueryMessage::CommandComplete(c) => {
1942 panic!("get command: {}", c);
1943 }
1944 _ => {
1945 panic!("what?");
1946 }
1947 };
1948 val
1949 }
1950}
1951
1952impl<'r> Produce<'r, Option<NaiveTime>> for PostgresSimpleSourceParser {
1953 type Error = PostgresSourceError;
1954
1955 #[throws(PostgresSourceError)]
1956 fn produce(&'r mut self) -> Option<NaiveTime> {
1957 let (ridx, cidx) = self.next_loc()?;
1958 let val = match &self.rows[ridx] {
1959 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1960 Some(s) => Some(NaiveTime::parse_from_str(s, "%H:%M:%S%.f").map_err(|_| {
1961 ConnectorXError::cannot_produce::<Option<NaiveTime>>(Some(s.into()))
1962 })?),
1963 None => None,
1964 },
1965 SimpleQueryMessage::CommandComplete(c) => {
1966 panic!("get command: {}", c);
1967 }
1968 _ => {
1969 panic!("what?");
1970 }
1971 };
1972 val
1973 }
1974}
1975
1976impl<'r> Produce<'r, NaiveDateTime> for PostgresSimpleSourceParser {
1977 type Error = PostgresSourceError;
1978
1979 #[throws(PostgresSourceError)]
1980 fn produce(&'r mut self) -> NaiveDateTime {
1981 let (ridx, cidx) = self.next_loc()?;
1982 let val =
1983 match &self.rows[ridx] {
1984 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1985 Some(s) => match s {
1986 "infinity" => NaiveDateTime::MAX,
1987 "-infinity" => NaiveDateTime::MIN,
1988 s => NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(
1989 |_| ConnectorXError::cannot_produce::<NaiveDateTime>(Some(s.into())),
1990 )?,
1991 },
1992 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1993 },
1994 SimpleQueryMessage::CommandComplete(c) => {
1995 panic!("get command: {}", c);
1996 }
1997 _ => {
1998 panic!("what?");
1999 }
2000 };
2001 val
2002 }
2003}
2004
2005impl<'r> Produce<'r, Option<NaiveDateTime>> for PostgresSimpleSourceParser {
2006 type Error = PostgresSourceError;
2007
2008 #[throws(PostgresSourceError)]
2009 fn produce(&'r mut self) -> Option<NaiveDateTime> {
2010 let (ridx, cidx) = self.next_loc()?;
2011 let val = match &self.rows[ridx] {
2012 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2013 Some(s) => match s {
2014 "infinity" => Some(NaiveDateTime::MAX),
2015 "-infinity" => Some(NaiveDateTime::MIN),
2016 s => Some(
2017 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").map_err(|_| {
2018 ConnectorXError::cannot_produce::<Option<NaiveDateTime>>(Some(s.into()))
2019 })?,
2020 ),
2021 },
2022 None => None,
2023 },
2024 SimpleQueryMessage::CommandComplete(c) => {
2025 panic!("get command: {}", c);
2026 }
2027 _ => {
2028 panic!("what?");
2029 }
2030 };
2031 val
2032 }
2033}
2034
2035impl<'r> Produce<'r, DateTime<Utc>> for PostgresSimpleSourceParser {
2036 type Error = PostgresSourceError;
2037
2038 #[throws(PostgresSourceError)]
2039 fn produce(&'r mut self) -> DateTime<Utc> {
2040 let (ridx, cidx) = self.next_loc()?;
2041 let val = match &self.rows[ridx] {
2042 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2043 Some("infinity") => DateTime::<Utc>::MAX_UTC,
2044 Some("-infinity") => DateTime::<Utc>::MIN_UTC,
2045 Some(s) => {
2046 let time_string = format!("{}:00", s).to_owned();
2047 let slice: &str = &time_string[..];
2048 let time: DateTime<FixedOffset> =
2049 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2050
2051 time.with_timezone(&Utc)
2052 }
2053 None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
2054 },
2055 SimpleQueryMessage::CommandComplete(c) => {
2056 panic!("get command: {}", c);
2057 }
2058 _ => {
2059 panic!("what?");
2060 }
2061 };
2062 val
2063 }
2064}
2065
2066impl<'r> Produce<'r, Option<DateTime<Utc>>> for PostgresSimpleSourceParser {
2067 type Error = PostgresSourceError;
2068
2069 #[throws(PostgresSourceError)]
2070 fn produce(&'r mut self) -> Option<DateTime<Utc>> {
2071 let (ridx, cidx) = self.next_loc()?;
2072 let val = match &self.rows[ridx] {
2073 SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
2074 Some("infinity") => Some(DateTime::<Utc>::MAX_UTC),
2075 Some("-infinity") => Some(DateTime::<Utc>::MIN_UTC),
2076 Some(s) => {
2077 let time_string = format!("{}:00", s).to_owned();
2078 let slice: &str = &time_string[..];
2079 let time: DateTime<FixedOffset> =
2080 DateTime::parse_from_str(slice, "%Y-%m-%d %H:%M:%S%.f%:z").unwrap();
2081
2082 Some(time.with_timezone(&Utc))
2083 }
2084 None => None,
2085 },
2086 SimpleQueryMessage::CommandComplete(c) => {
2087 panic!("get command: {}", c);
2088 }
2089 _ => {
2090 panic!("what?");
2091 }
2092 };
2093 val
2094 }
2095}