1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
//! This module defines traits that required to define a typesystem.
//!
//! A typesystem is an enum that describes what types can be produced by a source and accepted by a destination.
//! A typesystem also needs to implement [`TypeAssoc`] to associate the enum variants to the physical representation
//! of the types in the typesystem.
use crate::destinations::{Consume, Destination, DestinationPartition};
use crate::errors::{ConnectorXError, Result as CXResult};
use crate::sources::{PartitionParser, Produce, Source, SourcePartition};
#[doc(hidden)]
/// `TypeSystem` describes all the types a source or destination support
/// using enum variants.
/// The variant can be used to type check with a static type `T` through the `check` method.
pub trait TypeSystem: Copy + Clone + Send + Sync {
/// Check whether T is the same type as defined by self.
fn check<T: TypeAssoc<Self>>(self) -> CXResult<()> {
T::check(self)
}
}
#[doc(hidden)]
/// Associate a static type to a TypeSystem
pub trait TypeAssoc<TS: TypeSystem> {
fn check(ts: TS) -> CXResult<()>;
}
#[doc(hidden)]
/// Realize means that a TypeSystem can realize a parameterized func F, based on its current variants.
pub trait Realize<F>
where
F: ParameterizedFunc,
{
/// realize a parameterized function with the type that self currently is.
fn realize(self) -> CXResult<F::Function>;
}
#[doc(hidden)]
/// A ParameterizedFunc refers to a function that is parameterized on a type T,
/// where type T will be dynaically determined by the variant of a TypeSystem.
/// An example is the `transmit<S,W,T>` function. When piping values from a source
/// to the destination, its type `T` is determined by the schema at the runtime.
pub trait ParameterizedFunc {
type Function;
fn realize<T>() -> Self::Function
where
Self: ParameterizedOn<T>,
{
Self::parameterize()
}
}
#[doc(hidden)]
/// `ParameterizedOn` indicates a parameterized function `Self`
/// is parameterized on type `T`
pub trait ParameterizedOn<T>: ParameterizedFunc {
fn parameterize() -> Self::Function;
}
/// Defines a rule to convert a type `T` to a type `U`.
pub trait TypeConversion<T, U> {
fn convert(val: T) -> U;
}
/// Transport asks the source to produce a value, do type conversion and then write
/// the value to a destination. Do not manually implement this trait for types.
/// Use [`impl_transport!`] to create a struct that implements this trait instead.
pub trait Transport {
type TSS: TypeSystem;
type TSD: TypeSystem;
type S: Source;
type D: Destination;
type Error: From<ConnectorXError>
+ From<<Self::S as Source>::Error>
+ From<<Self::D as Destination>::Error>
+ Send
+ std::fmt::Debug;
/// convert_typesystem convert the source type system TSS to the destination
/// type system TSD.
fn convert_typesystem(ts: Self::TSS) -> CXResult<Self::TSD>;
/// convert_type convert the type T1 associated with the source type system
/// TSS to a type T2 which is associated with the destination type system TSD.
fn convert_type<T1, T2>(val: T1) -> T2
where
Self: TypeConversion<T1, T2>,
{
<Self as TypeConversion<T1, T2>>::convert(val)
}
/// `process` will ask source to produce a value with type T1, based on TSS, and then do
/// type conversion using `convert_type` to get value with type T2, which is associated to
/// TSD. Finally, it will write the value with type T2 to the destination.
fn process<'s, 'd, 'r>(
ts1: Self::TSS,
ts2: Self::TSD,
src: &'r mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
dst: &'r mut <Self::D as Destination>::Partition<'d>,
) -> Result<(), Self::Error>
where
Self: 'd;
#[allow(clippy::type_complexity)]
fn processor<'s, 'd>(
ts1: Self::TSS,
ts2: Self::TSD,
) -> CXResult<
fn(
src: &mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
dst: &mut <Self::D as Destination>::Partition<'d>,
) -> Result<(), Self::Error>,
>
where
Self: 'd;
}
#[doc(hidden)]
pub fn process<'s, 'd, 'r, T1, T2, TP, S, D, ES, ED, ET>(
src: &'r mut <<S as Source>::Partition as SourcePartition>::Parser<'s>,
dst: &'r mut <D as Destination>::Partition<'d>,
) -> Result<(), ET>
where
T1: TypeAssoc<<S as Source>::TypeSystem>,
S: Source<Error = ES>,
<S as Source>::Partition: SourcePartition<Error = ES>,
<<S as Source>::Partition as SourcePartition>::Parser<'s>: Produce<'r, T1, Error = ES>,
ES: From<ConnectorXError> + Send,
T2: TypeAssoc<<D as Destination>::TypeSystem>,
D: Destination<Error = ED>,
<D as Destination>::Partition<'d>: Consume<T2, Error = ED>,
ED: From<ConnectorXError> + Send,
TP: TypeConversion<T1, T2>,
ET: From<ES> + From<ED>,
{
let val: T1 = PartitionParser::parse(src)?;
let val: T2 = <TP as TypeConversion<T1, _>>::convert(val);
DestinationPartition::write(dst, val)?;
Ok(())
}