connectorx/destinations/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
//! This module defines three traits [`Destination`], [`DestinationPartition`], and [`Consume`] to define a destination.
//! This module also contains destination implementations for various dataframes.
#[cfg(feature = "dst_arrow")]
pub mod arrow;
#[cfg(feature = "dst_arrow")]
pub mod arrowstream;
#[cfg(feature = "dst_arrow2")]
pub mod arrow2;
use crate::data_order::DataOrder;
use crate::errors::ConnectorXError;
use crate::typesystem::{TypeAssoc, TypeSystem};
/// A `Destination` is associated with a `TypeSystem` and a `PartitionDestination`.
/// `PartitionDestination` allows multiple threads write data into the buffer owned by `Destination`.
pub trait Destination: Sized {
const DATA_ORDERS: &'static [DataOrder];
type TypeSystem: TypeSystem;
type Partition<'a>: DestinationPartition<'a, TypeSystem = Self::TypeSystem, Error = Self::Error>
where
Self: 'a;
type Error: From<ConnectorXError> + Send;
/// Specify whether the destination needs total rows in advance
/// in order to pre-allocate the buffer.
fn needs_count(&self) -> bool;
/// Construct the `Destination`.
/// This allocates the memory based on the types of each columns
/// and the number of rows.
fn allocate<S: AsRef<str>>(
&mut self,
nrow: usize,
names: &[S],
schema: &[Self::TypeSystem],
data_order: DataOrder,
) -> Result<(), Self::Error>;
/// Create a bunch of partition destinations, with each write `count` number of rows.
fn partition(&mut self, counts: usize) -> Result<Vec<Self::Partition<'_>>, Self::Error>;
/// Return the schema of the destination.
fn schema(&self) -> &[Self::TypeSystem];
}
/// `PartitionDestination` writes values to its own region. `PartitionDestination` is parameterized
/// on lifetime `'a`, which is the lifetime of the parent `Destination`. Usually,
/// a `PartitionDestination` can never live longer than the parent.
pub trait DestinationPartition<'a>: Send {
type TypeSystem: TypeSystem;
type Error: From<ConnectorXError> + Send;
/// Write a value of type T to the location (row, col). If T mismatch with the
/// schema, `ConnectorXError::TypeCheckFailed` will return.
fn write<T>(&mut self, value: T) -> Result<(), <Self as DestinationPartition<'a>>::Error>
where
T: TypeAssoc<Self::TypeSystem>,
Self: Consume<T, Error = <Self as DestinationPartition<'a>>::Error>,
{
self.consume(value)
}
/// Number of rows this `PartitionDestination` controls.
fn ncols(&self) -> usize;
/// Final clean ups
fn finalize(&mut self) -> Result<(), Self::Error>;
/// Aquire n rows in final destination
fn aquire_row(&mut self, n: usize) -> Result<usize, Self::Error>;
}
/// A type implemented `Consume<T>` means that it can consume a value `T` by adding it to it's own buffer.
pub trait Consume<T> {
type Error: From<ConnectorXError> + Send;
fn consume(&mut self, value: T) -> Result<(), Self::Error>;
}