connectorx/destinations/arrow/
arrow_assoc.rs

1use super::{
2    errors::{ArrowDestinationError, Result},
3    typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
4};
5use crate::constants::SECONDS_IN_DAY;
6use arrow::array::{
7    ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int16Builder,
8    Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder,
9    Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
10    TimestampNanosecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder,
11};
12use arrow::datatypes::Field;
13use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
15use fehler::throws;
16
17/// Associate arrow builder with native type
18pub trait ArrowAssoc {
19    type Builder: ArrayBuilder + Send;
20
21    fn builder(nrows: usize) -> Self::Builder;
22    fn append(builder: &mut Self::Builder, value: Self) -> Result<()>;
23    fn field(header: &str) -> Field;
24}
25
26macro_rules! impl_arrow_assoc {
27    ($T:ty, $AT:expr, $B:ty) => {
28        impl ArrowAssoc for $T {
29            type Builder = $B;
30
31            fn builder(nrows: usize) -> Self::Builder {
32                Self::Builder::with_capacity(nrows)
33            }
34
35            #[throws(ArrowDestinationError)]
36            fn append(builder: &mut Self::Builder, value: Self) {
37                builder.append_value(value);
38            }
39
40            fn field(header: &str) -> Field {
41                Field::new(header, $AT, false)
42            }
43        }
44
45        impl ArrowAssoc for Option<$T> {
46            type Builder = $B;
47
48            fn builder(nrows: usize) -> Self::Builder {
49                Self::Builder::with_capacity(nrows)
50            }
51
52            #[throws(ArrowDestinationError)]
53            fn append(builder: &mut Self::Builder, value: Self) {
54                builder.append_option(value);
55            }
56
57            fn field(header: &str) -> Field {
58                Field::new(header, $AT, true)
59            }
60        }
61    };
62}
63
64impl_arrow_assoc!(u16, ArrowDataType::UInt16, UInt16Builder);
65impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder);
66impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder);
67impl_arrow_assoc!(i16, ArrowDataType::Int16, Int16Builder);
68impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder);
69impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder);
70impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
71impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder);
72impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder);
73
74impl ArrowAssoc for &str {
75    type Builder = StringBuilder;
76
77    fn builder(nrows: usize) -> Self::Builder {
78        StringBuilder::with_capacity(1024, nrows)
79    }
80
81    #[throws(ArrowDestinationError)]
82    fn append(builder: &mut Self::Builder, value: Self) {
83        builder.append_value(value);
84    }
85
86    fn field(header: &str) -> Field {
87        Field::new(header, ArrowDataType::Utf8, false)
88    }
89}
90
91impl ArrowAssoc for Option<&str> {
92    type Builder = StringBuilder;
93
94    fn builder(nrows: usize) -> Self::Builder {
95        StringBuilder::with_capacity(1024, nrows)
96    }
97
98    #[throws(ArrowDestinationError)]
99    fn append(builder: &mut Self::Builder, value: Self) {
100        match value {
101            Some(s) => builder.append_value(s),
102            None => builder.append_null(),
103        }
104    }
105
106    fn field(header: &str) -> Field {
107        Field::new(header, ArrowDataType::Utf8, true)
108    }
109}
110
111impl ArrowAssoc for String {
112    type Builder = StringBuilder;
113
114    fn builder(nrows: usize) -> Self::Builder {
115        StringBuilder::with_capacity(1024, nrows)
116    }
117
118    #[throws(ArrowDestinationError)]
119    fn append(builder: &mut Self::Builder, value: String) {
120        builder.append_value(value.as_str());
121    }
122
123    fn field(header: &str) -> Field {
124        Field::new(header, ArrowDataType::Utf8, false)
125    }
126}
127
128impl ArrowAssoc for Option<String> {
129    type Builder = StringBuilder;
130
131    fn builder(nrows: usize) -> Self::Builder {
132        StringBuilder::with_capacity(1024, nrows)
133    }
134
135    #[throws(ArrowDestinationError)]
136    fn append(builder: &mut Self::Builder, value: Self) {
137        match value {
138            Some(s) => builder.append_value(s.as_str()),
139            None => builder.append_null(),
140        }
141    }
142
143    fn field(header: &str) -> Field {
144        Field::new(header, ArrowDataType::Utf8, true)
145    }
146}
147
148impl ArrowAssoc for DateTime<Utc> {
149    type Builder = TimestampNanosecondBuilder;
150
151    fn builder(nrows: usize) -> Self::Builder {
152        TimestampNanosecondBuilder::with_capacity(nrows)
153    }
154
155    #[throws(ArrowDestinationError)]
156    fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
157        builder.append_value(
158            value
159                .timestamp_nanos_opt()
160                .unwrap_or_else(|| panic!("out of range DateTime!")),
161        )
162    }
163
164    fn field(header: &str) -> Field {
165        Field::new(
166            header,
167            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
168            false,
169        )
170    }
171}
172
173impl ArrowAssoc for Option<DateTime<Utc>> {
174    type Builder = TimestampNanosecondBuilder;
175
176    fn builder(nrows: usize) -> Self::Builder {
177        TimestampNanosecondBuilder::with_capacity(nrows)
178    }
179
180    #[throws(ArrowDestinationError)]
181    fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
182        builder.append_option(value.map(|x| {
183            x.timestamp_nanos_opt()
184                .unwrap_or_else(|| panic!("out of range DateTime!"))
185        }))
186    }
187
188    fn field(header: &str) -> Field {
189        Field::new(
190            header,
191            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
192            true,
193        )
194    }
195}
196
197impl ArrowAssoc for DateTimeWrapperMicro {
198    type Builder = TimestampMicrosecondBuilder;
199
200    fn builder(nrows: usize) -> Self::Builder {
201        TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
202    }
203
204    #[throws(ArrowDestinationError)]
205    fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
206        builder.append_value(value.0.timestamp_micros());
207    }
208
209    fn field(header: &str) -> Field {
210        Field::new(
211            header,
212            ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
213            false,
214        )
215    }
216}
217
218impl ArrowAssoc for Option<DateTimeWrapperMicro> {
219    type Builder = TimestampMicrosecondBuilder;
220
221    fn builder(nrows: usize) -> Self::Builder {
222        TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
223    }
224
225    #[throws(ArrowDestinationError)]
226    fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
227        builder.append_option(value.map(|x| x.0.timestamp_micros()));
228    }
229
230    fn field(header: &str) -> Field {
231        Field::new(
232            header,
233            ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
234            true,
235        )
236    }
237}
238
239fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
240    match nd.and_hms_opt(0, 0, 0) {
241        Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
242        None => panic!("and_hms_opt got None from {:?}", nd),
243    }
244}
245
246fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
247    nd.and_utc()
248        .timestamp_nanos_opt()
249        .unwrap_or_else(|| panic!("out of range DateTime"))
250}
251
252impl ArrowAssoc for Option<NaiveDate> {
253    type Builder = Date32Builder;
254
255    fn builder(nrows: usize) -> Self::Builder {
256        Date32Builder::with_capacity(nrows)
257    }
258
259    fn append(builder: &mut Self::Builder, value: Option<NaiveDate>) -> Result<()> {
260        builder.append_option(value.map(naive_date_to_arrow));
261        Ok(())
262    }
263
264    fn field(header: &str) -> Field {
265        Field::new(header, ArrowDataType::Date32, true)
266    }
267}
268
269impl ArrowAssoc for NaiveDate {
270    type Builder = Date32Builder;
271
272    fn builder(nrows: usize) -> Self::Builder {
273        Date32Builder::with_capacity(nrows)
274    }
275
276    fn append(builder: &mut Self::Builder, value: NaiveDate) -> Result<()> {
277        builder.append_value(naive_date_to_arrow(value));
278        Ok(())
279    }
280
281    fn field(header: &str) -> Field {
282        Field::new(header, ArrowDataType::Date32, false)
283    }
284}
285
286impl ArrowAssoc for Option<NaiveDateTime> {
287    type Builder = TimestampNanosecondBuilder;
288
289    fn builder(nrows: usize) -> Self::Builder {
290        TimestampNanosecondBuilder::with_capacity(nrows)
291    }
292
293    fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
294        builder.append_option(value.map(naive_datetime_to_arrow));
295        Ok(())
296    }
297
298    fn field(header: &str) -> Field {
299        Field::new(
300            header,
301            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
302            true,
303        )
304    }
305}
306
307impl ArrowAssoc for NaiveDateTime {
308    type Builder = TimestampNanosecondBuilder;
309
310    fn builder(nrows: usize) -> Self::Builder {
311        TimestampNanosecondBuilder::with_capacity(nrows)
312    }
313
314    fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
315        builder.append_value(naive_datetime_to_arrow(value));
316        Ok(())
317    }
318
319    fn field(header: &str) -> Field {
320        Field::new(
321            header,
322            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
323            false,
324        )
325    }
326}
327
328impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
329    type Builder = TimestampMicrosecondBuilder;
330
331    fn builder(nrows: usize) -> Self::Builder {
332        TimestampMicrosecondBuilder::with_capacity(nrows)
333    }
334
335    fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
336        builder.append_option(match value {
337            Some(v) => Some(v.0.and_utc().timestamp_micros()),
338            None => None,
339        });
340        Ok(())
341    }
342
343    fn field(header: &str) -> Field {
344        Field::new(
345            header,
346            ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
347            true,
348        )
349    }
350}
351
352impl ArrowAssoc for NaiveDateTimeWrapperMicro {
353    type Builder = TimestampMicrosecondBuilder;
354
355    fn builder(nrows: usize) -> Self::Builder {
356        TimestampMicrosecondBuilder::with_capacity(nrows)
357    }
358
359    fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
360        builder.append_value(value.0.and_utc().timestamp_micros());
361        Ok(())
362    }
363
364    fn field(header: &str) -> Field {
365        Field::new(
366            header,
367            ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
368            false,
369        )
370    }
371}
372
373impl ArrowAssoc for Option<NaiveTime> {
374    type Builder = Time64NanosecondBuilder;
375
376    fn builder(nrows: usize) -> Self::Builder {
377        Time64NanosecondBuilder::with_capacity(nrows)
378    }
379
380    fn append(builder: &mut Self::Builder, value: Option<NaiveTime>) -> Result<()> {
381        builder.append_option(
382            value.map(|t| {
383                t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
384            }),
385        );
386        Ok(())
387    }
388
389    fn field(header: &str) -> Field {
390        Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), true)
391    }
392}
393
394impl ArrowAssoc for NaiveTime {
395    type Builder = Time64NanosecondBuilder;
396
397    fn builder(nrows: usize) -> Self::Builder {
398        Time64NanosecondBuilder::with_capacity(nrows)
399    }
400
401    fn append(builder: &mut Self::Builder, value: NaiveTime) -> Result<()> {
402        builder.append_value(
403            value.num_seconds_from_midnight() as i64 * 1_000_000_000 + value.nanosecond() as i64,
404        );
405        Ok(())
406    }
407
408    fn field(header: &str) -> Field {
409        Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), false)
410    }
411}
412
413impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
414    type Builder = Time64MicrosecondBuilder;
415
416    fn builder(nrows: usize) -> Self::Builder {
417        Time64MicrosecondBuilder::with_capacity(nrows)
418    }
419
420    fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
421        builder.append_option(value.map(|t| {
422            t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
423        }));
424        Ok(())
425    }
426
427    fn field(header: &str) -> Field {
428        Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
429    }
430}
431
432impl ArrowAssoc for NaiveTimeWrapperMicro {
433    type Builder = Time64MicrosecondBuilder;
434
435    fn builder(nrows: usize) -> Self::Builder {
436        Time64MicrosecondBuilder::with_capacity(nrows)
437    }
438
439    fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
440        builder.append_value(
441            value.0.num_seconds_from_midnight() as i64 * 1_000_000
442                + (value.0.nanosecond() as i64) / 1000,
443        );
444        Ok(())
445    }
446
447    fn field(header: &str) -> Field {
448        Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
449    }
450}
451
452impl ArrowAssoc for Option<Vec<u8>> {
453    type Builder = LargeBinaryBuilder;
454
455    fn builder(nrows: usize) -> Self::Builder {
456        LargeBinaryBuilder::with_capacity(1024, nrows)
457    }
458
459    fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
460        match value {
461            Some(v) => builder.append_value(v),
462            None => builder.append_null(),
463        };
464        Ok(())
465    }
466
467    fn field(header: &str) -> Field {
468        Field::new(header, ArrowDataType::LargeBinary, true)
469    }
470}
471
472impl ArrowAssoc for Vec<u8> {
473    type Builder = LargeBinaryBuilder;
474
475    fn builder(nrows: usize) -> Self::Builder {
476        LargeBinaryBuilder::with_capacity(1024, nrows)
477    }
478
479    fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
480        builder.append_value(value);
481        Ok(())
482    }
483
484    fn field(header: &str) -> Field {
485        Field::new(header, ArrowDataType::LargeBinary, false)
486    }
487}
488
489macro_rules! impl_arrow_array_assoc {
490    ($T:ty, $AT:expr, $B:ident) => {
491        impl ArrowAssoc for $T {
492            type Builder = LargeListBuilder<$B>;
493
494            fn builder(nrows: usize) -> Self::Builder {
495                LargeListBuilder::with_capacity($B::new(), nrows)
496            }
497
498            #[throws(ArrowDestinationError)]
499            fn append(builder: &mut Self::Builder, value: Self) {
500                builder.append_value(value);
501            }
502
503            fn field(header: &str) -> Field {
504                Field::new(
505                    header,
506                    ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field($AT, true))),
507                    false,
508                )
509            }
510        }
511
512        impl ArrowAssoc for Option<$T> {
513            type Builder = LargeListBuilder<$B>;
514
515            fn builder(nrows: usize) -> Self::Builder {
516                LargeListBuilder::with_capacity($B::new(), nrows)
517            }
518
519            #[throws(ArrowDestinationError)]
520            fn append(builder: &mut Self::Builder, value: Self) {
521                builder.append_option(value);
522            }
523
524            fn field(header: &str) -> Field {
525                Field::new(
526                    header,
527                    ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field($AT, true))),
528                    true,
529                )
530            }
531        }
532    };
533}
534
535impl_arrow_array_assoc!(Vec<Option<bool>>, ArrowDataType::Boolean, BooleanBuilder);
536impl_arrow_array_assoc!(Vec<Option<String>>, ArrowDataType::Utf8, StringBuilder);
537impl_arrow_array_assoc!(Vec<Option<i16>>, ArrowDataType::Int16, Int16Builder);
538impl_arrow_array_assoc!(Vec<Option<i32>>, ArrowDataType::Int32, Int32Builder);
539impl_arrow_array_assoc!(Vec<Option<i64>>, ArrowDataType::Int64, Int64Builder);
540impl_arrow_array_assoc!(Vec<Option<u16>>, ArrowDataType::UInt16, UInt16Builder);
541impl_arrow_array_assoc!(Vec<Option<u32>>, ArrowDataType::UInt32, UInt32Builder);
542impl_arrow_array_assoc!(Vec<Option<u64>>, ArrowDataType::UInt64, UInt64Builder);
543impl_arrow_array_assoc!(Vec<Option<f32>>, ArrowDataType::Float32, Float32Builder);
544impl_arrow_array_assoc!(Vec<Option<f64>>, ArrowDataType::Float64, Float64Builder);