connectorx/destinations/arrow/
mod.rs1mod arrow_assoc;
4mod errors;
5mod funcs;
6pub mod typesystem;
7
8pub use self::errors::{ArrowDestinationError, Result};
9pub use self::typesystem::ArrowTypeSystem;
10use super::{Consume, Destination, DestinationPartition};
11use crate::constants::RECORD_BATCH_SIZE;
12use crate::data_order::DataOrder;
13use crate::typesystem::{Realize, TypeAssoc, TypeSystem};
14use anyhow::anyhow;
15use arrow::{datatypes::Schema, record_batch::RecordBatch};
16use arrow_assoc::ArrowAssoc;
17use fehler::{throw, throws};
18use funcs::{FFinishBuilder, FNewBuilder, FNewField};
19use itertools::Itertools;
20use std::{
21 any::Any,
22 sync::{Arc, Mutex},
23};
24
25#[cfg(feature = "dst_polars")]
26use {
27 arrow::ffi::to_ffi,
28 polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs},
29 polars_arrow::ffi::{import_array_from_c, import_field_from_c},
30 std::iter::FromIterator,
31 std::mem::transmute,
32};
33
34type Builder = Box<dyn Any + Send>;
35type Builders = Vec<Builder>;
36
37pub struct ArrowDestination {
38 schema: Vec<ArrowTypeSystem>,
39 names: Vec<String>,
40 data: Arc<Mutex<Vec<RecordBatch>>>,
41 arrow_schema: Arc<Schema>,
42 batch_size: usize,
43}
44
45impl Default for ArrowDestination {
46 fn default() -> Self {
47 ArrowDestination {
48 schema: vec![],
49 names: vec![],
50 data: Arc::new(Mutex::new(vec![])),
51 arrow_schema: Arc::new(Schema::empty()),
52 batch_size: RECORD_BATCH_SIZE,
53 }
54 }
55}
56
57impl ArrowDestination {
58 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub fn new_with_batch_size(batch_size: usize) -> Self {
63 ArrowDestination {
64 schema: vec![],
65 names: vec![],
66 data: Arc::new(Mutex::new(vec![])),
67 arrow_schema: Arc::new(Schema::empty()),
68 batch_size,
69 }
70 }
71}
72
73impl Destination for ArrowDestination {
74 const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::ColumnMajor, DataOrder::RowMajor];
75 type TypeSystem = ArrowTypeSystem;
76 type Partition<'a> = ArrowPartitionWriter;
77 type Error = ArrowDestinationError;
78
79 fn needs_count(&self) -> bool {
80 false
81 }
82
83 #[throws(ArrowDestinationError)]
84 fn allocate<S: AsRef<str>>(
85 &mut self,
86 _nrow: usize,
87 names: &[S],
88 schema: &[ArrowTypeSystem],
89 data_order: DataOrder,
90 ) {
91 if !matches!(data_order, DataOrder::RowMajor) {
93 throw!(crate::errors::ConnectorXError::UnsupportedDataOrder(
94 data_order
95 ))
96 }
97
98 self.schema = schema.to_vec();
100 self.names = names.iter().map(|n| n.as_ref().to_string()).collect();
101 let fields = self
102 .schema
103 .iter()
104 .zip_eq(&self.names)
105 .map(|(&dt, h)| Ok(Realize::<FNewField>::realize(dt)?(h.as_str())))
106 .collect::<Result<Vec<_>>>()?;
107 self.arrow_schema = Arc::new(Schema::new(fields));
108 }
109
110 #[throws(ArrowDestinationError)]
111 fn partition(&mut self, counts: usize) -> Vec<Self::Partition<'_>> {
112 let mut partitions = vec![];
113 for _ in 0..counts {
114 partitions.push(ArrowPartitionWriter::new(
115 self.schema.clone(),
116 Arc::clone(&self.data),
117 Arc::clone(&self.arrow_schema),
118 self.batch_size,
119 )?);
120 }
121 partitions
122 }
123
124 fn schema(&self) -> &[ArrowTypeSystem] {
125 self.schema.as_slice()
126 }
127}
128
129impl ArrowDestination {
130 #[throws(ArrowDestinationError)]
131 pub fn arrow(self) -> Vec<RecordBatch> {
132 let lock = Arc::try_unwrap(self.data).map_err(|_| anyhow!("Partitions are not freed"))?;
133 lock.into_inner()
134 .map_err(|e| anyhow!("mutex poisoned {}", e))?
135 }
136
137 #[cfg(feature = "dst_polars")]
138 #[throws(ArrowDestinationError)]
139 pub fn polars(self) -> DataFrame {
140 let rbs = self.arrow()?;
142
143 let mut lf_vec = vec![];
145
146 for chunk in rbs.into_iter() {
147 let mut columns = Vec::with_capacity(chunk.num_columns());
149
150 for (i, col) in chunk.columns().iter().enumerate() {
152 let array = col.to_data();
154
155 let (out_array, out_schema) = to_ffi(&array).unwrap();
157
158 let field = unsafe {
160 import_field_from_c(transmute::<
161 &arrow::ffi::FFI_ArrowSchema,
162 &polars_arrow::ffi::ArrowSchema,
163 >(&out_schema))
164 }?;
165
166 let data = unsafe {
168 import_array_from_c(
169 transmute::<arrow::ffi::FFI_ArrowArray, polars_arrow::ffi::ArrowArray>(
170 out_array,
171 ),
172 field.dtype().clone(),
173 )
174 }?;
175
176 columns.push(Series::from_arrow(
178 PlSmallStr::from(chunk.schema().field(i).name()),
179 data,
180 )?);
181 }
182
183 lf_vec.push(DataFrame::from_iter(columns).lazy());
185 }
186
187 let union_args = UnionArgs::default();
189 concat(lf_vec, union_args)?.collect()?
190 }
191
192 #[throws(ArrowDestinationError)]
193 pub fn record_batch(&mut self) -> Option<RecordBatch> {
194 let mut guard = self
195 .data
196 .lock()
197 .map_err(|e| anyhow!("mutex poisoned {}", e))?;
198 (*guard).pop()
199 }
200
201 pub fn empty_batch(&self) -> RecordBatch {
202 RecordBatch::new_empty(self.arrow_schema.clone())
203 }
204
205 pub fn arrow_schema(&self) -> Arc<Schema> {
206 self.arrow_schema.clone()
207 }
208
209 pub fn names(&self) -> &[String] {
210 self.names.as_slice()
211 }
212}
213
214pub struct ArrowPartitionWriter {
215 schema: Vec<ArrowTypeSystem>,
216 builders: Option<Builders>,
217 current_row: usize,
218 current_col: usize,
219 data: Arc<Mutex<Vec<RecordBatch>>>,
220 arrow_schema: Arc<Schema>,
221 batch_size: usize,
222}
223
224impl ArrowPartitionWriter {
227 #[throws(ArrowDestinationError)]
228 fn new(
229 schema: Vec<ArrowTypeSystem>,
230 data: Arc<Mutex<Vec<RecordBatch>>>,
231 arrow_schema: Arc<Schema>,
232 batch_size: usize,
233 ) -> Self {
234 let mut pw = ArrowPartitionWriter {
235 schema,
236 builders: None,
237 current_row: 0,
238 current_col: 0,
239 data,
240 arrow_schema,
241 batch_size,
242 };
243 pw.allocate()?;
244 pw
245 }
246
247 #[throws(ArrowDestinationError)]
248 fn allocate(&mut self) {
249 let builders = self
250 .schema
251 .iter()
252 .map(|dt| Ok(Realize::<FNewBuilder>::realize(*dt)?(self.batch_size)))
253 .collect::<Result<Vec<_>>>()?;
254 self.builders.replace(builders);
255 }
256
257 #[throws(ArrowDestinationError)]
258 fn flush(&mut self) {
259 let builders = self
260 .builders
261 .take()
262 .unwrap_or_else(|| panic!("arrow builder is none when flush!"));
263 let columns = builders
264 .into_iter()
265 .zip(self.schema.iter())
266 .map(|(builder, &dt)| Realize::<FFinishBuilder>::realize(dt)?(builder))
267 .collect::<std::result::Result<Vec<_>, crate::errors::ConnectorXError>>()?;
268 let rb = RecordBatch::try_new(Arc::clone(&self.arrow_schema), columns)?;
269 {
270 let mut guard = self
271 .data
272 .lock()
273 .map_err(|e| anyhow!("mutex poisoned {}", e))?;
274 let inner_data = &mut *guard;
275 inner_data.push(rb);
276 }
277
278 self.current_row = 0;
279 self.current_col = 0;
280 }
281}
282
283impl<'a> DestinationPartition<'a> for ArrowPartitionWriter {
284 type TypeSystem = ArrowTypeSystem;
285 type Error = ArrowDestinationError;
286
287 #[throws(ArrowDestinationError)]
288 fn finalize(&mut self) {
289 if self.builders.is_some() {
290 self.flush()?;
291 }
292 }
293
294 #[throws(ArrowDestinationError)]
295 fn aquire_row(&mut self, _n: usize) -> usize {
296 self.current_row
297 }
298
299 fn ncols(&self) -> usize {
300 self.schema.len()
301 }
302}
303
304impl<'a, T> Consume<T> for ArrowPartitionWriter
305where
306 T: TypeAssoc<<Self as DestinationPartition<'a>>::TypeSystem> + ArrowAssoc + 'static,
307{
308 type Error = ArrowDestinationError;
309
310 #[throws(ArrowDestinationError)]
311 fn consume(&mut self, value: T) {
312 let col = self.current_col;
313 self.current_col = (self.current_col + 1) % self.ncols();
314 self.schema[col].check::<T>()?;
315
316 loop {
317 match &mut self.builders {
318 Some(builders) => {
319 <T as ArrowAssoc>::append(
320 builders[col]
321 .downcast_mut::<T::Builder>()
322 .ok_or_else(|| anyhow!("cannot cast arrow builder for append"))?,
323 value,
324 )?;
325 break;
326 }
327 None => self.allocate()?, }
329 }
330
331 if self.current_col == 0 {
333 self.current_row += 1;
334 if self.current_row >= self.batch_size {
335 self.flush()?;
336 self.allocate()?;
337 }
338 }
339 }
340}