connectorx/destinations/arrowstream/
arrow_assoc.rs

1use super::errors::{ArrowDestinationError, Result};
2use crate::constants::SECONDS_IN_DAY;
3use arrow::array::{
4    ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
5    Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
6    TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
7};
8use arrow::datatypes::Field;
9use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
10use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
11use fehler::throws;
12
13/// Associate arrow builder with native type
14pub trait ArrowAssoc {
15    type Builder: ArrayBuilder + Send;
16
17    fn builder(nrows: usize) -> Self::Builder;
18    fn append(builder: &mut Self::Builder, value: Self) -> Result<()>;
19    fn field(header: &str) -> Field;
20}
21
22macro_rules! impl_arrow_assoc {
23    ($T:ty, $AT:expr, $B:ty) => {
24        impl ArrowAssoc for $T {
25            type Builder = $B;
26
27            fn builder(nrows: usize) -> Self::Builder {
28                Self::Builder::with_capacity(nrows)
29            }
30
31            #[throws(ArrowDestinationError)]
32            fn append(builder: &mut Self::Builder, value: Self) {
33                builder.append_value(value);
34            }
35
36            fn field(header: &str) -> Field {
37                Field::new(header, $AT, false)
38            }
39        }
40
41        impl ArrowAssoc for Option<$T> {
42            type Builder = $B;
43
44            fn builder(nrows: usize) -> Self::Builder {
45                Self::Builder::with_capacity(nrows)
46            }
47
48            #[throws(ArrowDestinationError)]
49            fn append(builder: &mut Self::Builder, value: Self) {
50                builder.append_option(value);
51            }
52
53            fn field(header: &str) -> Field {
54                Field::new(header, $AT, true)
55            }
56        }
57    };
58}
59
60impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder);
61impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder);
62impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder);
63impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder);
64impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
65impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder);
66impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder);
67
68impl ArrowAssoc for &str {
69    type Builder = StringBuilder;
70
71    fn builder(nrows: usize) -> Self::Builder {
72        StringBuilder::with_capacity(1024, nrows)
73    }
74
75    #[throws(ArrowDestinationError)]
76    fn append(builder: &mut Self::Builder, value: Self) {
77        builder.append_value(value);
78    }
79
80    fn field(header: &str) -> Field {
81        Field::new(header, ArrowDataType::Utf8, false)
82    }
83}
84
85impl ArrowAssoc for Option<&str> {
86    type Builder = StringBuilder;
87
88    fn builder(nrows: usize) -> Self::Builder {
89        StringBuilder::with_capacity(1024, nrows)
90    }
91
92    #[throws(ArrowDestinationError)]
93    fn append(builder: &mut Self::Builder, value: Self) {
94        match value {
95            Some(s) => builder.append_value(s),
96            None => builder.append_null(),
97        }
98    }
99
100    fn field(header: &str) -> Field {
101        Field::new(header, ArrowDataType::Utf8, true)
102    }
103}
104
105impl ArrowAssoc for String {
106    type Builder = StringBuilder;
107
108    fn builder(nrows: usize) -> Self::Builder {
109        StringBuilder::with_capacity(1024, nrows)
110    }
111
112    #[throws(ArrowDestinationError)]
113    fn append(builder: &mut Self::Builder, value: String) {
114        builder.append_value(value.as_str());
115    }
116
117    fn field(header: &str) -> Field {
118        Field::new(header, ArrowDataType::Utf8, false)
119    }
120}
121
122impl ArrowAssoc for Option<String> {
123    type Builder = StringBuilder;
124
125    fn builder(nrows: usize) -> Self::Builder {
126        StringBuilder::with_capacity(1024, nrows)
127    }
128
129    #[throws(ArrowDestinationError)]
130    fn append(builder: &mut Self::Builder, value: Self) {
131        match value {
132            Some(s) => builder.append_value(s.as_str()),
133            None => builder.append_null(),
134        }
135    }
136
137    fn field(header: &str) -> Field {
138        Field::new(header, ArrowDataType::Utf8, true)
139    }
140}
141
142impl ArrowAssoc for DateTime<Utc> {
143    type Builder = TimestampNanosecondBuilder;
144
145    fn builder(nrows: usize) -> Self::Builder {
146        TimestampNanosecondBuilder::with_capacity(nrows)
147    }
148
149    #[throws(ArrowDestinationError)]
150    fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
151        builder.append_value(
152            value
153                .timestamp_nanos_opt()
154                .unwrap_or_else(|| panic!("out of range DateTime")),
155        )
156    }
157
158    fn field(header: &str) -> Field {
159        Field::new(
160            header,
161            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
162            false,
163        )
164    }
165}
166
167impl ArrowAssoc for Option<DateTime<Utc>> {
168    type Builder = TimestampNanosecondBuilder;
169
170    fn builder(nrows: usize) -> Self::Builder {
171        TimestampNanosecondBuilder::with_capacity(nrows)
172    }
173
174    #[throws(ArrowDestinationError)]
175    fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
176        builder.append_option(value.map(|x| {
177            x.timestamp_nanos_opt()
178                .unwrap_or_else(|| panic!("out of range DateTime"))
179        }))
180    }
181
182    fn field(header: &str) -> Field {
183        Field::new(
184            header,
185            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
186            true,
187        )
188    }
189}
190
191fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
192    match nd.and_hms_opt(0, 0, 0) {
193        Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
194        None => panic!("and_hms_opt got None from {:?}", nd),
195    }
196}
197
198fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
199    nd.and_utc().timestamp_millis()
200}
201
202impl ArrowAssoc for Option<NaiveDate> {
203    type Builder = Date32Builder;
204
205    fn builder(nrows: usize) -> Self::Builder {
206        Date32Builder::with_capacity(nrows)
207    }
208
209    fn append(builder: &mut Self::Builder, value: Option<NaiveDate>) -> Result<()> {
210        builder.append_option(value.map(naive_date_to_arrow));
211        Ok(())
212    }
213
214    fn field(header: &str) -> Field {
215        Field::new(header, ArrowDataType::Date32, true)
216    }
217}
218
219impl ArrowAssoc for NaiveDate {
220    type Builder = Date32Builder;
221
222    fn builder(nrows: usize) -> Self::Builder {
223        Date32Builder::with_capacity(nrows)
224    }
225
226    fn append(builder: &mut Self::Builder, value: NaiveDate) -> Result<()> {
227        builder.append_value(naive_date_to_arrow(value));
228        Ok(())
229    }
230
231    fn field(header: &str) -> Field {
232        Field::new(header, ArrowDataType::Date32, false)
233    }
234}
235
236impl ArrowAssoc for Option<NaiveDateTime> {
237    type Builder = Date64Builder;
238
239    fn builder(nrows: usize) -> Self::Builder {
240        Date64Builder::with_capacity(nrows)
241    }
242
243    fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
244        builder.append_option(value.map(naive_datetime_to_arrow));
245        Ok(())
246    }
247
248    fn field(header: &str) -> Field {
249        Field::new(header, ArrowDataType::Date64, true)
250    }
251}
252
253impl ArrowAssoc for NaiveDateTime {
254    type Builder = Date64Builder;
255
256    fn builder(nrows: usize) -> Self::Builder {
257        Date64Builder::with_capacity(nrows)
258    }
259
260    fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
261        builder.append_value(naive_datetime_to_arrow(value));
262        Ok(())
263    }
264
265    fn field(header: &str) -> Field {
266        Field::new(header, ArrowDataType::Date64, false)
267    }
268}
269
270impl ArrowAssoc for Option<NaiveTime> {
271    type Builder = Time64NanosecondBuilder;
272
273    fn builder(nrows: usize) -> Self::Builder {
274        Time64NanosecondBuilder::with_capacity(nrows)
275    }
276
277    fn append(builder: &mut Self::Builder, value: Option<NaiveTime>) -> Result<()> {
278        builder.append_option(
279            value.map(|t| {
280                t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
281            }),
282        );
283        Ok(())
284    }
285
286    fn field(header: &str) -> Field {
287        Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), true)
288    }
289}
290
291impl ArrowAssoc for NaiveTime {
292    type Builder = Time64NanosecondBuilder;
293
294    fn builder(nrows: usize) -> Self::Builder {
295        Time64NanosecondBuilder::with_capacity(nrows)
296    }
297
298    fn append(builder: &mut Self::Builder, value: NaiveTime) -> Result<()> {
299        builder.append_value(
300            value.num_seconds_from_midnight() as i64 * 1_000_000_000 + value.nanosecond() as i64,
301        );
302        Ok(())
303    }
304
305    fn field(header: &str) -> Field {
306        Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), false)
307    }
308}
309
310impl ArrowAssoc for Option<Vec<u8>> {
311    type Builder = LargeBinaryBuilder;
312
313    fn builder(nrows: usize) -> Self::Builder {
314        LargeBinaryBuilder::with_capacity(1024, nrows)
315    }
316
317    fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
318        match value {
319            Some(v) => builder.append_value(v),
320            None => builder.append_null(),
321        };
322        Ok(())
323    }
324
325    fn field(header: &str) -> Field {
326        Field::new(header, ArrowDataType::LargeBinary, true)
327    }
328}
329
330impl ArrowAssoc for Vec<u8> {
331    type Builder = LargeBinaryBuilder;
332
333    fn builder(nrows: usize) -> Self::Builder {
334        LargeBinaryBuilder::with_capacity(1024, nrows)
335    }
336
337    fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
338        builder.append_value(value);
339        Ok(())
340    }
341
342    fn field(header: &str) -> Field {
343        Field::new(header, ArrowDataType::LargeBinary, false)
344    }
345}