1use super::{
2 errors::{ArrowDestinationError, Result},
3 typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
4};
5use crate::constants::SECONDS_IN_DAY;
6use arrow::array::{
7 ArrayBuilder, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int16Builder,
8 Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder,
9 Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
10 TimestampNanosecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder,
11};
12use arrow::datatypes::Field;
13use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
14use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
15use fehler::throws;
16
17pub trait ArrowAssoc {
19 type Builder: ArrayBuilder + Send;
20
21 fn builder(nrows: usize) -> Self::Builder;
22 fn append(builder: &mut Self::Builder, value: Self) -> Result<()>;
23 fn field(header: &str) -> Field;
24}
25
26macro_rules! impl_arrow_assoc {
27 ($T:ty, $AT:expr, $B:ty) => {
28 impl ArrowAssoc for $T {
29 type Builder = $B;
30
31 fn builder(nrows: usize) -> Self::Builder {
32 Self::Builder::with_capacity(nrows)
33 }
34
35 #[throws(ArrowDestinationError)]
36 fn append(builder: &mut Self::Builder, value: Self) {
37 builder.append_value(value);
38 }
39
40 fn field(header: &str) -> Field {
41 Field::new(header, $AT, false)
42 }
43 }
44
45 impl ArrowAssoc for Option<$T> {
46 type Builder = $B;
47
48 fn builder(nrows: usize) -> Self::Builder {
49 Self::Builder::with_capacity(nrows)
50 }
51
52 #[throws(ArrowDestinationError)]
53 fn append(builder: &mut Self::Builder, value: Self) {
54 builder.append_option(value);
55 }
56
57 fn field(header: &str) -> Field {
58 Field::new(header, $AT, true)
59 }
60 }
61 };
62}
63
64impl_arrow_assoc!(u16, ArrowDataType::UInt16, UInt16Builder);
65impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder);
66impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder);
67impl_arrow_assoc!(i16, ArrowDataType::Int16, Int16Builder);
68impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder);
69impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder);
70impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
71impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder);
72impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder);
73
74impl ArrowAssoc for &str {
75 type Builder = StringBuilder;
76
77 fn builder(nrows: usize) -> Self::Builder {
78 StringBuilder::with_capacity(1024, nrows)
79 }
80
81 #[throws(ArrowDestinationError)]
82 fn append(builder: &mut Self::Builder, value: Self) {
83 builder.append_value(value);
84 }
85
86 fn field(header: &str) -> Field {
87 Field::new(header, ArrowDataType::Utf8, false)
88 }
89}
90
91impl ArrowAssoc for Option<&str> {
92 type Builder = StringBuilder;
93
94 fn builder(nrows: usize) -> Self::Builder {
95 StringBuilder::with_capacity(1024, nrows)
96 }
97
98 #[throws(ArrowDestinationError)]
99 fn append(builder: &mut Self::Builder, value: Self) {
100 match value {
101 Some(s) => builder.append_value(s),
102 None => builder.append_null(),
103 }
104 }
105
106 fn field(header: &str) -> Field {
107 Field::new(header, ArrowDataType::Utf8, true)
108 }
109}
110
111impl ArrowAssoc for String {
112 type Builder = StringBuilder;
113
114 fn builder(nrows: usize) -> Self::Builder {
115 StringBuilder::with_capacity(1024, nrows)
116 }
117
118 #[throws(ArrowDestinationError)]
119 fn append(builder: &mut Self::Builder, value: String) {
120 builder.append_value(value.as_str());
121 }
122
123 fn field(header: &str) -> Field {
124 Field::new(header, ArrowDataType::Utf8, false)
125 }
126}
127
128impl ArrowAssoc for Option<String> {
129 type Builder = StringBuilder;
130
131 fn builder(nrows: usize) -> Self::Builder {
132 StringBuilder::with_capacity(1024, nrows)
133 }
134
135 #[throws(ArrowDestinationError)]
136 fn append(builder: &mut Self::Builder, value: Self) {
137 match value {
138 Some(s) => builder.append_value(s.as_str()),
139 None => builder.append_null(),
140 }
141 }
142
143 fn field(header: &str) -> Field {
144 Field::new(header, ArrowDataType::Utf8, true)
145 }
146}
147
148impl ArrowAssoc for DateTime<Utc> {
149 type Builder = TimestampNanosecondBuilder;
150
151 fn builder(nrows: usize) -> Self::Builder {
152 TimestampNanosecondBuilder::with_capacity(nrows)
153 }
154
155 #[throws(ArrowDestinationError)]
156 fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
157 builder.append_value(
158 value
159 .timestamp_nanos_opt()
160 .unwrap_or_else(|| panic!("out of range DateTime!")),
161 )
162 }
163
164 fn field(header: &str) -> Field {
165 Field::new(
166 header,
167 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
168 false,
169 )
170 }
171}
172
173impl ArrowAssoc for Option<DateTime<Utc>> {
174 type Builder = TimestampNanosecondBuilder;
175
176 fn builder(nrows: usize) -> Self::Builder {
177 TimestampNanosecondBuilder::with_capacity(nrows)
178 }
179
180 #[throws(ArrowDestinationError)]
181 fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
182 builder.append_option(value.map(|x| {
183 x.timestamp_nanos_opt()
184 .unwrap_or_else(|| panic!("out of range DateTime!"))
185 }))
186 }
187
188 fn field(header: &str) -> Field {
189 Field::new(
190 header,
191 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
192 true,
193 )
194 }
195}
196
197impl ArrowAssoc for DateTimeWrapperMicro {
198 type Builder = TimestampMicrosecondBuilder;
199
200 fn builder(nrows: usize) -> Self::Builder {
201 TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
202 }
203
204 #[throws(ArrowDestinationError)]
205 fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
206 builder.append_value(value.0.timestamp_micros());
207 }
208
209 fn field(header: &str) -> Field {
210 Field::new(
211 header,
212 ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
213 false,
214 )
215 }
216}
217
218impl ArrowAssoc for Option<DateTimeWrapperMicro> {
219 type Builder = TimestampMicrosecondBuilder;
220
221 fn builder(nrows: usize) -> Self::Builder {
222 TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
223 }
224
225 #[throws(ArrowDestinationError)]
226 fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
227 builder.append_option(value.map(|x| x.0.timestamp_micros()));
228 }
229
230 fn field(header: &str) -> Field {
231 Field::new(
232 header,
233 ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
234 true,
235 )
236 }
237}
238
239fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
240 match nd.and_hms_opt(0, 0, 0) {
241 Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
242 None => panic!("and_hms_opt got None from {:?}", nd),
243 }
244}
245
246fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
247 nd.and_utc()
248 .timestamp_nanos_opt()
249 .unwrap_or_else(|| panic!("out of range DateTime"))
250}
251
252impl ArrowAssoc for Option<NaiveDate> {
253 type Builder = Date32Builder;
254
255 fn builder(nrows: usize) -> Self::Builder {
256 Date32Builder::with_capacity(nrows)
257 }
258
259 fn append(builder: &mut Self::Builder, value: Option<NaiveDate>) -> Result<()> {
260 builder.append_option(value.map(naive_date_to_arrow));
261 Ok(())
262 }
263
264 fn field(header: &str) -> Field {
265 Field::new(header, ArrowDataType::Date32, true)
266 }
267}
268
269impl ArrowAssoc for NaiveDate {
270 type Builder = Date32Builder;
271
272 fn builder(nrows: usize) -> Self::Builder {
273 Date32Builder::with_capacity(nrows)
274 }
275
276 fn append(builder: &mut Self::Builder, value: NaiveDate) -> Result<()> {
277 builder.append_value(naive_date_to_arrow(value));
278 Ok(())
279 }
280
281 fn field(header: &str) -> Field {
282 Field::new(header, ArrowDataType::Date32, false)
283 }
284}
285
286impl ArrowAssoc for Option<NaiveDateTime> {
287 type Builder = TimestampNanosecondBuilder;
288
289 fn builder(nrows: usize) -> Self::Builder {
290 TimestampNanosecondBuilder::with_capacity(nrows)
291 }
292
293 fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
294 builder.append_option(value.map(naive_datetime_to_arrow));
295 Ok(())
296 }
297
298 fn field(header: &str) -> Field {
299 Field::new(
300 header,
301 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
302 true,
303 )
304 }
305}
306
307impl ArrowAssoc for NaiveDateTime {
308 type Builder = TimestampNanosecondBuilder;
309
310 fn builder(nrows: usize) -> Self::Builder {
311 TimestampNanosecondBuilder::with_capacity(nrows)
312 }
313
314 fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
315 builder.append_value(naive_datetime_to_arrow(value));
316 Ok(())
317 }
318
319 fn field(header: &str) -> Field {
320 Field::new(
321 header,
322 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
323 false,
324 )
325 }
326}
327
328impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
329 type Builder = TimestampMicrosecondBuilder;
330
331 fn builder(nrows: usize) -> Self::Builder {
332 TimestampMicrosecondBuilder::with_capacity(nrows)
333 }
334
335 fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
336 builder.append_option(match value {
337 Some(v) => Some(v.0.and_utc().timestamp_micros()),
338 None => None,
339 });
340 Ok(())
341 }
342
343 fn field(header: &str) -> Field {
344 Field::new(
345 header,
346 ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
347 true,
348 )
349 }
350}
351
352impl ArrowAssoc for NaiveDateTimeWrapperMicro {
353 type Builder = TimestampMicrosecondBuilder;
354
355 fn builder(nrows: usize) -> Self::Builder {
356 TimestampMicrosecondBuilder::with_capacity(nrows)
357 }
358
359 fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
360 builder.append_value(value.0.and_utc().timestamp_micros());
361 Ok(())
362 }
363
364 fn field(header: &str) -> Field {
365 Field::new(
366 header,
367 ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
368 false,
369 )
370 }
371}
372
373impl ArrowAssoc for Option<NaiveTime> {
374 type Builder = Time64NanosecondBuilder;
375
376 fn builder(nrows: usize) -> Self::Builder {
377 Time64NanosecondBuilder::with_capacity(nrows)
378 }
379
380 fn append(builder: &mut Self::Builder, value: Option<NaiveTime>) -> Result<()> {
381 builder.append_option(
382 value.map(|t| {
383 t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
384 }),
385 );
386 Ok(())
387 }
388
389 fn field(header: &str) -> Field {
390 Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), true)
391 }
392}
393
394impl ArrowAssoc for NaiveTime {
395 type Builder = Time64NanosecondBuilder;
396
397 fn builder(nrows: usize) -> Self::Builder {
398 Time64NanosecondBuilder::with_capacity(nrows)
399 }
400
401 fn append(builder: &mut Self::Builder, value: NaiveTime) -> Result<()> {
402 builder.append_value(
403 value.num_seconds_from_midnight() as i64 * 1_000_000_000 + value.nanosecond() as i64,
404 );
405 Ok(())
406 }
407
408 fn field(header: &str) -> Field {
409 Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), false)
410 }
411}
412
413impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
414 type Builder = Time64MicrosecondBuilder;
415
416 fn builder(nrows: usize) -> Self::Builder {
417 Time64MicrosecondBuilder::with_capacity(nrows)
418 }
419
420 fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
421 builder.append_option(value.map(|t| {
422 t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
423 }));
424 Ok(())
425 }
426
427 fn field(header: &str) -> Field {
428 Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
429 }
430}
431
432impl ArrowAssoc for NaiveTimeWrapperMicro {
433 type Builder = Time64MicrosecondBuilder;
434
435 fn builder(nrows: usize) -> Self::Builder {
436 Time64MicrosecondBuilder::with_capacity(nrows)
437 }
438
439 fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
440 builder.append_value(
441 value.0.num_seconds_from_midnight() as i64 * 1_000_000
442 + (value.0.nanosecond() as i64) / 1000,
443 );
444 Ok(())
445 }
446
447 fn field(header: &str) -> Field {
448 Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
449 }
450}
451
452impl ArrowAssoc for Option<Vec<u8>> {
453 type Builder = LargeBinaryBuilder;
454
455 fn builder(nrows: usize) -> Self::Builder {
456 LargeBinaryBuilder::with_capacity(1024, nrows)
457 }
458
459 fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
460 match value {
461 Some(v) => builder.append_value(v),
462 None => builder.append_null(),
463 };
464 Ok(())
465 }
466
467 fn field(header: &str) -> Field {
468 Field::new(header, ArrowDataType::LargeBinary, true)
469 }
470}
471
472impl ArrowAssoc for Vec<u8> {
473 type Builder = LargeBinaryBuilder;
474
475 fn builder(nrows: usize) -> Self::Builder {
476 LargeBinaryBuilder::with_capacity(1024, nrows)
477 }
478
479 fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
480 builder.append_value(value);
481 Ok(())
482 }
483
484 fn field(header: &str) -> Field {
485 Field::new(header, ArrowDataType::LargeBinary, false)
486 }
487}
488
489macro_rules! impl_arrow_array_assoc {
490 ($T:ty, $AT:expr, $B:ident) => {
491 impl ArrowAssoc for $T {
492 type Builder = LargeListBuilder<$B>;
493
494 fn builder(nrows: usize) -> Self::Builder {
495 LargeListBuilder::with_capacity($B::new(), nrows)
496 }
497
498 #[throws(ArrowDestinationError)]
499 fn append(builder: &mut Self::Builder, value: Self) {
500 builder.append_value(value);
501 }
502
503 fn field(header: &str) -> Field {
504 Field::new(
505 header,
506 ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field($AT, true))),
507 false,
508 )
509 }
510 }
511
512 impl ArrowAssoc for Option<$T> {
513 type Builder = LargeListBuilder<$B>;
514
515 fn builder(nrows: usize) -> Self::Builder {
516 LargeListBuilder::with_capacity($B::new(), nrows)
517 }
518
519 #[throws(ArrowDestinationError)]
520 fn append(builder: &mut Self::Builder, value: Self) {
521 builder.append_option(value);
522 }
523
524 fn field(header: &str) -> Field {
525 Field::new(
526 header,
527 ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field($AT, true))),
528 true,
529 )
530 }
531 }
532 };
533}
534
535impl_arrow_array_assoc!(Vec<Option<bool>>, ArrowDataType::Boolean, BooleanBuilder);
536impl_arrow_array_assoc!(Vec<Option<String>>, ArrowDataType::Utf8, StringBuilder);
537impl_arrow_array_assoc!(Vec<Option<i16>>, ArrowDataType::Int16, Int16Builder);
538impl_arrow_array_assoc!(Vec<Option<i32>>, ArrowDataType::Int32, Int32Builder);
539impl_arrow_array_assoc!(Vec<Option<i64>>, ArrowDataType::Int64, Int64Builder);
540impl_arrow_array_assoc!(Vec<Option<u16>>, ArrowDataType::UInt16, UInt16Builder);
541impl_arrow_array_assoc!(Vec<Option<u32>>, ArrowDataType::UInt32, UInt32Builder);
542impl_arrow_array_assoc!(Vec<Option<u64>>, ArrowDataType::UInt64, UInt64Builder);
543impl_arrow_array_assoc!(Vec<Option<f32>>, ArrowDataType::Float32, Float32Builder);
544impl_arrow_array_assoc!(Vec<Option<f64>>, ArrowDataType::Float64, Float64Builder);