connectorx/destinations/arrowstream/
arrow_assoc.rs
1use super::errors::{ArrowDestinationError, Result};
2use crate::constants::SECONDS_IN_DAY;
3use arrow::array::{
4 ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
5 Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
6 TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
7};
8use arrow::datatypes::Field;
9use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
10use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
11use fehler::throws;
12
13pub trait ArrowAssoc {
15 type Builder: ArrayBuilder + Send;
16
17 fn builder(nrows: usize) -> Self::Builder;
18 fn append(builder: &mut Self::Builder, value: Self) -> Result<()>;
19 fn field(header: &str) -> Field;
20}
21
22macro_rules! impl_arrow_assoc {
23 ($T:ty, $AT:expr, $B:ty) => {
24 impl ArrowAssoc for $T {
25 type Builder = $B;
26
27 fn builder(nrows: usize) -> Self::Builder {
28 Self::Builder::with_capacity(nrows)
29 }
30
31 #[throws(ArrowDestinationError)]
32 fn append(builder: &mut Self::Builder, value: Self) {
33 builder.append_value(value);
34 }
35
36 fn field(header: &str) -> Field {
37 Field::new(header, $AT, false)
38 }
39 }
40
41 impl ArrowAssoc for Option<$T> {
42 type Builder = $B;
43
44 fn builder(nrows: usize) -> Self::Builder {
45 Self::Builder::with_capacity(nrows)
46 }
47
48 #[throws(ArrowDestinationError)]
49 fn append(builder: &mut Self::Builder, value: Self) {
50 builder.append_option(value);
51 }
52
53 fn field(header: &str) -> Field {
54 Field::new(header, $AT, true)
55 }
56 }
57 };
58}
59
60impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder);
61impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder);
62impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder);
63impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder);
64impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
65impl_arrow_assoc!(f64, ArrowDataType::Float64, Float64Builder);
66impl_arrow_assoc!(bool, ArrowDataType::Boolean, BooleanBuilder);
67
68impl ArrowAssoc for &str {
69 type Builder = StringBuilder;
70
71 fn builder(nrows: usize) -> Self::Builder {
72 StringBuilder::with_capacity(1024, nrows)
73 }
74
75 #[throws(ArrowDestinationError)]
76 fn append(builder: &mut Self::Builder, value: Self) {
77 builder.append_value(value);
78 }
79
80 fn field(header: &str) -> Field {
81 Field::new(header, ArrowDataType::Utf8, false)
82 }
83}
84
85impl ArrowAssoc for Option<&str> {
86 type Builder = StringBuilder;
87
88 fn builder(nrows: usize) -> Self::Builder {
89 StringBuilder::with_capacity(1024, nrows)
90 }
91
92 #[throws(ArrowDestinationError)]
93 fn append(builder: &mut Self::Builder, value: Self) {
94 match value {
95 Some(s) => builder.append_value(s),
96 None => builder.append_null(),
97 }
98 }
99
100 fn field(header: &str) -> Field {
101 Field::new(header, ArrowDataType::Utf8, true)
102 }
103}
104
105impl ArrowAssoc for String {
106 type Builder = StringBuilder;
107
108 fn builder(nrows: usize) -> Self::Builder {
109 StringBuilder::with_capacity(1024, nrows)
110 }
111
112 #[throws(ArrowDestinationError)]
113 fn append(builder: &mut Self::Builder, value: String) {
114 builder.append_value(value.as_str());
115 }
116
117 fn field(header: &str) -> Field {
118 Field::new(header, ArrowDataType::Utf8, false)
119 }
120}
121
122impl ArrowAssoc for Option<String> {
123 type Builder = StringBuilder;
124
125 fn builder(nrows: usize) -> Self::Builder {
126 StringBuilder::with_capacity(1024, nrows)
127 }
128
129 #[throws(ArrowDestinationError)]
130 fn append(builder: &mut Self::Builder, value: Self) {
131 match value {
132 Some(s) => builder.append_value(s.as_str()),
133 None => builder.append_null(),
134 }
135 }
136
137 fn field(header: &str) -> Field {
138 Field::new(header, ArrowDataType::Utf8, true)
139 }
140}
141
142impl ArrowAssoc for DateTime<Utc> {
143 type Builder = TimestampNanosecondBuilder;
144
145 fn builder(nrows: usize) -> Self::Builder {
146 TimestampNanosecondBuilder::with_capacity(nrows)
147 }
148
149 #[throws(ArrowDestinationError)]
150 fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
151 builder.append_value(
152 value
153 .timestamp_nanos_opt()
154 .unwrap_or_else(|| panic!("out of range DateTime")),
155 )
156 }
157
158 fn field(header: &str) -> Field {
159 Field::new(
160 header,
161 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
162 false,
163 )
164 }
165}
166
167impl ArrowAssoc for Option<DateTime<Utc>> {
168 type Builder = TimestampNanosecondBuilder;
169
170 fn builder(nrows: usize) -> Self::Builder {
171 TimestampNanosecondBuilder::with_capacity(nrows)
172 }
173
174 #[throws(ArrowDestinationError)]
175 fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
176 builder.append_option(value.map(|x| {
177 x.timestamp_nanos_opt()
178 .unwrap_or_else(|| panic!("out of range DateTime"))
179 }))
180 }
181
182 fn field(header: &str) -> Field {
183 Field::new(
184 header,
185 ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
186 true,
187 )
188 }
189}
190
191fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
192 match nd.and_hms_opt(0, 0, 0) {
193 Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
194 None => panic!("and_hms_opt got None from {:?}", nd),
195 }
196}
197
198fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
199 nd.and_utc().timestamp_millis()
200}
201
202impl ArrowAssoc for Option<NaiveDate> {
203 type Builder = Date32Builder;
204
205 fn builder(nrows: usize) -> Self::Builder {
206 Date32Builder::with_capacity(nrows)
207 }
208
209 fn append(builder: &mut Self::Builder, value: Option<NaiveDate>) -> Result<()> {
210 builder.append_option(value.map(naive_date_to_arrow));
211 Ok(())
212 }
213
214 fn field(header: &str) -> Field {
215 Field::new(header, ArrowDataType::Date32, true)
216 }
217}
218
219impl ArrowAssoc for NaiveDate {
220 type Builder = Date32Builder;
221
222 fn builder(nrows: usize) -> Self::Builder {
223 Date32Builder::with_capacity(nrows)
224 }
225
226 fn append(builder: &mut Self::Builder, value: NaiveDate) -> Result<()> {
227 builder.append_value(naive_date_to_arrow(value));
228 Ok(())
229 }
230
231 fn field(header: &str) -> Field {
232 Field::new(header, ArrowDataType::Date32, false)
233 }
234}
235
236impl ArrowAssoc for Option<NaiveDateTime> {
237 type Builder = Date64Builder;
238
239 fn builder(nrows: usize) -> Self::Builder {
240 Date64Builder::with_capacity(nrows)
241 }
242
243 fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
244 builder.append_option(value.map(naive_datetime_to_arrow));
245 Ok(())
246 }
247
248 fn field(header: &str) -> Field {
249 Field::new(header, ArrowDataType::Date64, true)
250 }
251}
252
253impl ArrowAssoc for NaiveDateTime {
254 type Builder = Date64Builder;
255
256 fn builder(nrows: usize) -> Self::Builder {
257 Date64Builder::with_capacity(nrows)
258 }
259
260 fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
261 builder.append_value(naive_datetime_to_arrow(value));
262 Ok(())
263 }
264
265 fn field(header: &str) -> Field {
266 Field::new(header, ArrowDataType::Date64, false)
267 }
268}
269
270impl ArrowAssoc for Option<NaiveTime> {
271 type Builder = Time64NanosecondBuilder;
272
273 fn builder(nrows: usize) -> Self::Builder {
274 Time64NanosecondBuilder::with_capacity(nrows)
275 }
276
277 fn append(builder: &mut Self::Builder, value: Option<NaiveTime>) -> Result<()> {
278 builder.append_option(
279 value.map(|t| {
280 t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
281 }),
282 );
283 Ok(())
284 }
285
286 fn field(header: &str) -> Field {
287 Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), true)
288 }
289}
290
291impl ArrowAssoc for NaiveTime {
292 type Builder = Time64NanosecondBuilder;
293
294 fn builder(nrows: usize) -> Self::Builder {
295 Time64NanosecondBuilder::with_capacity(nrows)
296 }
297
298 fn append(builder: &mut Self::Builder, value: NaiveTime) -> Result<()> {
299 builder.append_value(
300 value.num_seconds_from_midnight() as i64 * 1_000_000_000 + value.nanosecond() as i64,
301 );
302 Ok(())
303 }
304
305 fn field(header: &str) -> Field {
306 Field::new(header, ArrowDataType::Time64(TimeUnit::Nanosecond), false)
307 }
308}
309
310impl ArrowAssoc for Option<Vec<u8>> {
311 type Builder = LargeBinaryBuilder;
312
313 fn builder(nrows: usize) -> Self::Builder {
314 LargeBinaryBuilder::with_capacity(1024, nrows)
315 }
316
317 fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
318 match value {
319 Some(v) => builder.append_value(v),
320 None => builder.append_null(),
321 };
322 Ok(())
323 }
324
325 fn field(header: &str) -> Field {
326 Field::new(header, ArrowDataType::LargeBinary, true)
327 }
328}
329
330impl ArrowAssoc for Vec<u8> {
331 type Builder = LargeBinaryBuilder;
332
333 fn builder(nrows: usize) -> Self::Builder {
334 LargeBinaryBuilder::with_capacity(1024, nrows)
335 }
336
337 fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
338 builder.append_value(value);
339 Ok(())
340 }
341
342 fn field(header: &str) -> Field {
343 Field::new(header, ArrowDataType::LargeBinary, false)
344 }
345}