connectorx/destinations/arrow/
mod.rs

1//! Destination implementation for Arrow and Polars.
2
3mod arrow_assoc;
4mod errors;
5mod funcs;
6pub mod typesystem;
7
8pub use self::errors::{ArrowDestinationError, Result};
9pub use self::typesystem::ArrowTypeSystem;
10use super::{Consume, Destination, DestinationPartition};
11use crate::constants::RECORD_BATCH_SIZE;
12use crate::data_order::DataOrder;
13use crate::typesystem::{Realize, TypeAssoc, TypeSystem};
14use anyhow::anyhow;
15use arrow::{datatypes::Schema, record_batch::RecordBatch};
16use arrow_assoc::ArrowAssoc;
17use fehler::{throw, throws};
18use funcs::{FFinishBuilder, FNewBuilder, FNewField};
19use itertools::Itertools;
20use std::{
21    any::Any,
22    sync::{Arc, Mutex},
23};
24
25#[cfg(feature = "dst_polars")]
26use {
27    arrow::ffi::to_ffi,
28    polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs},
29    polars_arrow::ffi::{import_array_from_c, import_field_from_c},
30    std::iter::FromIterator,
31    std::mem::transmute,
32};
33
34type Builder = Box<dyn Any + Send>;
35type Builders = Vec<Builder>;
36
37pub struct ArrowDestination {
38    schema: Vec<ArrowTypeSystem>,
39    names: Vec<String>,
40    data: Arc<Mutex<Vec<RecordBatch>>>,
41    arrow_schema: Arc<Schema>,
42    batch_size: usize,
43}
44
45impl Default for ArrowDestination {
46    fn default() -> Self {
47        ArrowDestination {
48            schema: vec![],
49            names: vec![],
50            data: Arc::new(Mutex::new(vec![])),
51            arrow_schema: Arc::new(Schema::empty()),
52            batch_size: RECORD_BATCH_SIZE,
53        }
54    }
55}
56
57impl ArrowDestination {
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    pub fn new_with_batch_size(batch_size: usize) -> Self {
63        ArrowDestination {
64            schema: vec![],
65            names: vec![],
66            data: Arc::new(Mutex::new(vec![])),
67            arrow_schema: Arc::new(Schema::empty()),
68            batch_size,
69        }
70    }
71}
72
73impl Destination for ArrowDestination {
74    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::ColumnMajor, DataOrder::RowMajor];
75    type TypeSystem = ArrowTypeSystem;
76    type Partition<'a> = ArrowPartitionWriter;
77    type Error = ArrowDestinationError;
78
79    fn needs_count(&self) -> bool {
80        false
81    }
82
83    #[throws(ArrowDestinationError)]
84    fn allocate<S: AsRef<str>>(
85        &mut self,
86        _nrow: usize,
87        names: &[S],
88        schema: &[ArrowTypeSystem],
89        data_order: DataOrder,
90    ) {
91        // todo: support colmajor
92        if !matches!(data_order, DataOrder::RowMajor) {
93            throw!(crate::errors::ConnectorXError::UnsupportedDataOrder(
94                data_order
95            ))
96        }
97
98        // parse the metadata
99        self.schema = schema.to_vec();
100        self.names = names.iter().map(|n| n.as_ref().to_string()).collect();
101        let fields = self
102            .schema
103            .iter()
104            .zip_eq(&self.names)
105            .map(|(&dt, h)| Ok(Realize::<FNewField>::realize(dt)?(h.as_str())))
106            .collect::<Result<Vec<_>>>()?;
107        self.arrow_schema = Arc::new(Schema::new(fields));
108    }
109
110    #[throws(ArrowDestinationError)]
111    fn partition(&mut self, counts: usize) -> Vec<Self::Partition<'_>> {
112        let mut partitions = vec![];
113        for _ in 0..counts {
114            partitions.push(ArrowPartitionWriter::new(
115                self.schema.clone(),
116                Arc::clone(&self.data),
117                Arc::clone(&self.arrow_schema),
118                self.batch_size,
119            )?);
120        }
121        partitions
122    }
123
124    fn schema(&self) -> &[ArrowTypeSystem] {
125        self.schema.as_slice()
126    }
127}
128
129impl ArrowDestination {
130    #[throws(ArrowDestinationError)]
131    pub fn arrow(self) -> Vec<RecordBatch> {
132        let lock = Arc::try_unwrap(self.data).map_err(|_| anyhow!("Partitions are not freed"))?;
133        lock.into_inner()
134            .map_err(|e| anyhow!("mutex poisoned {}", e))?
135    }
136
137    #[cfg(feature = "dst_polars")]
138    #[throws(ArrowDestinationError)]
139    pub fn polars(self) -> DataFrame {
140        // Convert to arrow first
141        let rbs = self.arrow()?;
142
143        // Ready LazyFrame vector for the chunks
144        let mut lf_vec = vec![];
145
146        for chunk in rbs.into_iter() {
147            // Column vector
148            let mut columns = Vec::with_capacity(chunk.num_columns());
149
150            // Arrow stores data by columns, therefore need to be Zero-copied by column
151            for (i, col) in chunk.columns().iter().enumerate() {
152                // Convert to ArrayData (arrow-rs)
153                let array = col.to_data();
154
155                // Convert to ffi with arrow-rs
156                let (out_array, out_schema) = to_ffi(&array).unwrap();
157
158                // Import field from ffi with polars
159                let field = unsafe {
160                    import_field_from_c(transmute::<
161                        &arrow::ffi::FFI_ArrowSchema,
162                        &polars_arrow::ffi::ArrowSchema,
163                    >(&out_schema))
164                }?;
165
166                // Import data from ffi with polars
167                let data = unsafe {
168                    import_array_from_c(
169                        transmute::<arrow::ffi::FFI_ArrowArray, polars_arrow::ffi::ArrowArray>(
170                            out_array,
171                        ),
172                        field.dtype().clone(),
173                    )
174                }?;
175
176                // Create Polars series from arrow column
177                columns.push(Series::from_arrow(
178                    PlSmallStr::from(chunk.schema().field(i).name()),
179                    data,
180                )?);
181            }
182
183            // Create DataFrame from the columns
184            lf_vec.push(DataFrame::from_iter(columns).lazy());
185        }
186
187        // Concat the chunks
188        let union_args = UnionArgs::default();
189        concat(lf_vec, union_args)?.collect()?
190    }
191
192    #[throws(ArrowDestinationError)]
193    pub fn record_batch(&mut self) -> Option<RecordBatch> {
194        let mut guard = self
195            .data
196            .lock()
197            .map_err(|e| anyhow!("mutex poisoned {}", e))?;
198        (*guard).pop()
199    }
200
201    pub fn empty_batch(&self) -> RecordBatch {
202        RecordBatch::new_empty(self.arrow_schema.clone())
203    }
204
205    pub fn arrow_schema(&self) -> Arc<Schema> {
206        self.arrow_schema.clone()
207    }
208
209    pub fn names(&self) -> &[String] {
210        self.names.as_slice()
211    }
212}
213
214pub struct ArrowPartitionWriter {
215    schema: Vec<ArrowTypeSystem>,
216    builders: Option<Builders>,
217    current_row: usize,
218    current_col: usize,
219    data: Arc<Mutex<Vec<RecordBatch>>>,
220    arrow_schema: Arc<Schema>,
221    batch_size: usize,
222}
223
224// unsafe impl Sync for ArrowPartitionWriter {}
225
226impl ArrowPartitionWriter {
227    #[throws(ArrowDestinationError)]
228    fn new(
229        schema: Vec<ArrowTypeSystem>,
230        data: Arc<Mutex<Vec<RecordBatch>>>,
231        arrow_schema: Arc<Schema>,
232        batch_size: usize,
233    ) -> Self {
234        let mut pw = ArrowPartitionWriter {
235            schema,
236            builders: None,
237            current_row: 0,
238            current_col: 0,
239            data,
240            arrow_schema,
241            batch_size,
242        };
243        pw.allocate()?;
244        pw
245    }
246
247    #[throws(ArrowDestinationError)]
248    fn allocate(&mut self) {
249        let builders = self
250            .schema
251            .iter()
252            .map(|dt| Ok(Realize::<FNewBuilder>::realize(*dt)?(self.batch_size)))
253            .collect::<Result<Vec<_>>>()?;
254        self.builders.replace(builders);
255    }
256
257    #[throws(ArrowDestinationError)]
258    fn flush(&mut self) {
259        let builders = self
260            .builders
261            .take()
262            .unwrap_or_else(|| panic!("arrow builder is none when flush!"));
263        let columns = builders
264            .into_iter()
265            .zip(self.schema.iter())
266            .map(|(builder, &dt)| Realize::<FFinishBuilder>::realize(dt)?(builder))
267            .collect::<std::result::Result<Vec<_>, crate::errors::ConnectorXError>>()?;
268        let rb = RecordBatch::try_new(Arc::clone(&self.arrow_schema), columns)?;
269        {
270            let mut guard = self
271                .data
272                .lock()
273                .map_err(|e| anyhow!("mutex poisoned {}", e))?;
274            let inner_data = &mut *guard;
275            inner_data.push(rb);
276        }
277
278        self.current_row = 0;
279        self.current_col = 0;
280    }
281}
282
283impl<'a> DestinationPartition<'a> for ArrowPartitionWriter {
284    type TypeSystem = ArrowTypeSystem;
285    type Error = ArrowDestinationError;
286
287    #[throws(ArrowDestinationError)]
288    fn finalize(&mut self) {
289        if self.builders.is_some() {
290            self.flush()?;
291        }
292    }
293
294    #[throws(ArrowDestinationError)]
295    fn aquire_row(&mut self, _n: usize) -> usize {
296        self.current_row
297    }
298
299    fn ncols(&self) -> usize {
300        self.schema.len()
301    }
302}
303
304impl<'a, T> Consume<T> for ArrowPartitionWriter
305where
306    T: TypeAssoc<<Self as DestinationPartition<'a>>::TypeSystem> + ArrowAssoc + 'static,
307{
308    type Error = ArrowDestinationError;
309
310    #[throws(ArrowDestinationError)]
311    fn consume(&mut self, value: T) {
312        let col = self.current_col;
313        self.current_col = (self.current_col + 1) % self.ncols();
314        self.schema[col].check::<T>()?;
315
316        loop {
317            match &mut self.builders {
318                Some(builders) => {
319                    <T as ArrowAssoc>::append(
320                        builders[col]
321                            .downcast_mut::<T::Builder>()
322                            .ok_or_else(|| anyhow!("cannot cast arrow builder for append"))?,
323                        value,
324                    )?;
325                    break;
326                }
327                None => self.allocate()?, // allocate if builders are not initialized
328            }
329        }
330
331        // flush if exceed batch_size
332        if self.current_col == 0 {
333            self.current_row += 1;
334            if self.current_row >= self.batch_size {
335                self.flush()?;
336                self.allocate()?;
337            }
338        }
339    }
340}