connectorx/destinations/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//! This module defines three traits [`Destination`], [`DestinationPartition`], and [`Consume`] to define a destination.
//! This module also contains destination implementations for various dataframes.

#[cfg(feature = "dst_arrow")]
pub mod arrow;
#[cfg(feature = "dst_arrow")]
pub mod arrowstream;

#[cfg(feature = "dst_arrow2")]
pub mod arrow2;

use crate::data_order::DataOrder;
use crate::errors::ConnectorXError;
use crate::typesystem::{TypeAssoc, TypeSystem};

/// A `Destination` is associated with a `TypeSystem` and a `PartitionDestination`.
/// `PartitionDestination` allows multiple threads write data into the buffer owned by `Destination`.
pub trait Destination: Sized {
    const DATA_ORDERS: &'static [DataOrder];
    type TypeSystem: TypeSystem;
    type Partition<'a>: DestinationPartition<'a, TypeSystem = Self::TypeSystem, Error = Self::Error>
    where
        Self: 'a;
    type Error: From<ConnectorXError> + Send;

    /// Specify whether the destination needs total rows in advance
    /// in order to pre-allocate the buffer.
    fn needs_count(&self) -> bool;

    /// Construct the `Destination`.
    /// This allocates the memory based on the types of each columns
    /// and the number of rows.
    fn allocate<S: AsRef<str>>(
        &mut self,
        nrow: usize,
        names: &[S],
        schema: &[Self::TypeSystem],
        data_order: DataOrder,
    ) -> Result<(), Self::Error>;

    /// Create a bunch of partition destinations, with each write `count` number of rows.
    fn partition(&mut self, counts: usize) -> Result<Vec<Self::Partition<'_>>, Self::Error>;
    /// Return the schema of the destination.
    fn schema(&self) -> &[Self::TypeSystem];
}

/// `PartitionDestination` writes values to its own region. `PartitionDestination` is parameterized
/// on lifetime `'a`, which is the lifetime of the parent `Destination`. Usually,
/// a `PartitionDestination` can never live longer than the parent.
pub trait DestinationPartition<'a>: Send {
    type TypeSystem: TypeSystem;
    type Error: From<ConnectorXError> + Send;

    /// Write a value of type T to the location (row, col). If T mismatch with the
    /// schema, `ConnectorXError::TypeCheckFailed` will return.
    fn write<T>(&mut self, value: T) -> Result<(), <Self as DestinationPartition<'a>>::Error>
    where
        T: TypeAssoc<Self::TypeSystem>,
        Self: Consume<T, Error = <Self as DestinationPartition<'a>>::Error>,
    {
        self.consume(value)
    }

    /// Number of rows this `PartitionDestination` controls.
    fn ncols(&self) -> usize;

    /// Final clean ups
    fn finalize(&mut self) -> Result<(), Self::Error>;

    /// Aquire n rows in final destination
    fn aquire_row(&mut self, n: usize) -> Result<usize, Self::Error>;
}

/// A type implemented `Consume<T>` means that it can consume a value `T` by adding it to it's own buffer.
pub trait Consume<T> {
    type Error: From<ConnectorXError> + Send;
    fn consume(&mut self, value: T) -> Result<(), Self::Error>;
}