connectorx/
typesystem.rs

1//! This module defines traits that required to define a typesystem.
2//!
3//! A typesystem is an enum that describes what types can be produced by a source and accepted by a destination.
4//! A typesystem also needs to implement [`TypeAssoc`] to associate the enum variants to the physical representation
5//! of the types in the typesystem.
6
7use crate::destinations::{Consume, Destination, DestinationPartition};
8use crate::errors::{ConnectorXError, Result as CXResult};
9use crate::sources::{PartitionParser, Produce, Source, SourcePartition};
10
11#[doc(hidden)]
12/// `TypeSystem` describes all the types a source or destination support
13/// using enum variants.
14/// The variant can be used to type check with a static type `T` through the `check` method.
15pub trait TypeSystem: Copy + Clone + Send + Sync {
16    /// Check whether T is the same type as defined by self.
17    fn check<T: TypeAssoc<Self>>(self) -> CXResult<()> {
18        T::check(self)
19    }
20}
21
22#[doc(hidden)]
23/// Associate a static type to a TypeSystem
24pub trait TypeAssoc<TS: TypeSystem> {
25    fn check(ts: TS) -> CXResult<()>;
26}
27
28#[doc(hidden)]
29/// Realize means that a TypeSystem can realize a parameterized func F, based on its current variants.
30pub trait Realize<F>
31where
32    F: ParameterizedFunc,
33{
34    /// realize a parameterized function with the type that self currently is.
35    fn realize(self) -> CXResult<F::Function>;
36}
37
38#[doc(hidden)]
39/// A ParameterizedFunc refers to a function that is parameterized on a type T,
40/// where type T will be dynaically determined by the variant of a TypeSystem.
41/// An example is the `transmit<S,W,T>` function. When piping values from a source
42/// to the destination, its type `T` is determined by the schema at the runtime.
43pub trait ParameterizedFunc {
44    type Function;
45    fn realize<T>() -> Self::Function
46    where
47        Self: ParameterizedOn<T>,
48    {
49        Self::parameterize()
50    }
51}
52
53#[doc(hidden)]
54/// `ParameterizedOn` indicates a parameterized function `Self`
55/// is parameterized on type `T`
56pub trait ParameterizedOn<T>: ParameterizedFunc {
57    fn parameterize() -> Self::Function;
58}
59
60/// Defines a rule to convert a type `T` to a type `U`.
61pub trait TypeConversion<T, U> {
62    fn convert(val: T) -> U;
63}
64
65/// Transport asks the source to produce a value, do type conversion and then write
66/// the value to a destination. Do not manually implement this trait for types.
67/// Use [`impl_transport!`] to create a struct that implements this trait instead.
68pub trait Transport {
69    type TSS: TypeSystem;
70    type TSD: TypeSystem;
71    type S: Source;
72    type D: Destination;
73    type Error: From<ConnectorXError>
74        + From<<Self::S as Source>::Error>
75        + From<<Self::D as Destination>::Error>
76        + Send
77        + std::fmt::Debug;
78
79    /// convert_typesystem convert the source type system TSS to the destination
80    /// type system TSD.
81    fn convert_typesystem(ts: Self::TSS) -> CXResult<Self::TSD>;
82
83    /// convert_type convert the type T1 associated with the source type system
84    /// TSS to a type T2 which is associated with the destination type system TSD.
85    fn convert_type<T1, T2>(val: T1) -> T2
86    where
87        Self: TypeConversion<T1, T2>,
88    {
89        <Self as TypeConversion<T1, T2>>::convert(val)
90    }
91
92    /// `process` will ask source to produce a value with type T1, based on TSS, and then do
93    /// type conversion using `convert_type` to get value with type T2, which is associated to
94    /// TSD. Finally, it will write the value with type T2 to the destination.
95    fn process<'s, 'd, 'r>(
96        ts1: Self::TSS,
97        ts2: Self::TSD,
98        src: &'r mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
99        dst: &'r mut <Self::D as Destination>::Partition<'d>,
100    ) -> Result<(), Self::Error>
101    where
102        Self: 'd;
103
104    #[allow(clippy::type_complexity)]
105    fn processor<'s, 'd>(
106        ts1: Self::TSS,
107        ts2: Self::TSD,
108    ) -> CXResult<
109        fn(
110            src: &mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
111            dst: &mut <Self::D as Destination>::Partition<'d>,
112        ) -> Result<(), Self::Error>,
113    >
114    where
115        Self: 'd;
116}
117
118#[doc(hidden)]
119pub fn process<'s, 'd, 'r, T1, T2, TP, S, D, ES, ED, ET>(
120    src: &'r mut <<S as Source>::Partition as SourcePartition>::Parser<'s>,
121    dst: &'r mut <D as Destination>::Partition<'d>,
122) -> Result<(), ET>
123where
124    T1: TypeAssoc<<S as Source>::TypeSystem>,
125    S: Source<Error = ES>,
126    <S as Source>::Partition: SourcePartition<Error = ES>,
127
128    <<S as Source>::Partition as SourcePartition>::Parser<'s>: Produce<'r, T1, Error = ES>,
129    ES: From<ConnectorXError> + Send,
130
131    T2: TypeAssoc<<D as Destination>::TypeSystem>,
132    D: Destination<Error = ED>,
133    <D as Destination>::Partition<'d>: Consume<T2, Error = ED>,
134    ED: From<ConnectorXError> + Send,
135
136    TP: TypeConversion<T1, T2>,
137    ET: From<ES> + From<ED>,
138{
139    let val: T1 = PartitionParser::parse(src)?;
140    let val: T2 = <TP as TypeConversion<T1, _>>::convert(val);
141    DestinationPartition::write(dst, val)?;
142    Ok(())
143}