connectorx/destinations/
mod.rs

1//! This module defines three traits [`Destination`], [`DestinationPartition`], and [`Consume`] to define a destination.
2//! This module also contains destination implementations for various dataframes.
3
4#[cfg(feature = "dst_arrow")]
5pub mod arrow;
6#[cfg(feature = "dst_arrow")]
7pub mod arrowstream;
8
9use crate::data_order::DataOrder;
10use crate::errors::ConnectorXError;
11use crate::typesystem::{TypeAssoc, TypeSystem};
12
13/// A `Destination` is associated with a `TypeSystem` and a `PartitionDestination`.
14/// `PartitionDestination` allows multiple threads write data into the buffer owned by `Destination`.
15pub trait Destination: Sized {
16    const DATA_ORDERS: &'static [DataOrder];
17    type TypeSystem: TypeSystem;
18    type Partition<'a>: DestinationPartition<'a, TypeSystem = Self::TypeSystem, Error = Self::Error>
19    where
20        Self: 'a;
21    type Error: From<ConnectorXError> + Send;
22
23    /// Specify whether the destination needs total rows in advance
24    /// in order to pre-allocate the buffer.
25    fn needs_count(&self) -> bool;
26
27    /// Construct the `Destination`.
28    /// This allocates the memory based on the types of each columns
29    /// and the number of rows.
30    fn allocate<S: AsRef<str>>(
31        &mut self,
32        nrow: usize,
33        names: &[S],
34        schema: &[Self::TypeSystem],
35        data_order: DataOrder,
36    ) -> Result<(), Self::Error>;
37
38    /// Create a bunch of partition destinations, with each write `count` number of rows.
39    fn partition(&mut self, counts: usize) -> Result<Vec<Self::Partition<'_>>, Self::Error>;
40    /// Return the schema of the destination.
41    fn schema(&self) -> &[Self::TypeSystem];
42}
43
44/// `PartitionDestination` writes values to its own region. `PartitionDestination` is parameterized
45/// on lifetime `'a`, which is the lifetime of the parent `Destination`. Usually,
46/// a `PartitionDestination` can never live longer than the parent.
47pub trait DestinationPartition<'a>: Send {
48    type TypeSystem: TypeSystem;
49    type Error: From<ConnectorXError> + Send;
50
51    /// Write a value of type T to the location (row, col). If T mismatch with the
52    /// schema, `ConnectorXError::TypeCheckFailed` will return.
53    fn write<T>(&mut self, value: T) -> Result<(), <Self as DestinationPartition<'a>>::Error>
54    where
55        T: TypeAssoc<Self::TypeSystem>,
56        Self: Consume<T, Error = <Self as DestinationPartition<'a>>::Error>,
57    {
58        self.consume(value)
59    }
60
61    /// Number of rows this `PartitionDestination` controls.
62    fn ncols(&self) -> usize;
63
64    /// Final clean ups
65    fn finalize(&mut self) -> Result<(), Self::Error>;
66
67    /// Aquire n rows in final destination
68    fn aquire_row(&mut self, n: usize) -> Result<usize, Self::Error>;
69}
70
71/// A type implemented `Consume<T>` means that it can consume a value `T` by adding it to it's own buffer.
72pub trait Consume<T> {
73    type Error: From<ConnectorXError> + Send;
74    fn consume(&mut self, value: T) -> Result<(), Self::Error>;
75}