1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//! This module defines traits that required to define a typesystem.
//!
//! A typesystem is an enum that describes what types can be produced by a source and accepted by a destination.
//! A typesystem also needs to implement [`TypeAssoc`] to associate the enum variants to the physical representation
//! of the types in the typesystem.

use crate::destinations::{Consume, Destination, DestinationPartition};
use crate::errors::{ConnectorXError, Result as CXResult};
use crate::sources::{PartitionParser, Produce, Source, SourcePartition};

#[doc(hidden)]
/// `TypeSystem` describes all the types a source or destination support
/// using enum variants.
/// The variant can be used to type check with a static type `T` through the `check` method.
pub trait TypeSystem: Copy + Clone + Send + Sync {
    /// Check whether T is the same type as defined by self.
    fn check<T: TypeAssoc<Self>>(self) -> CXResult<()> {
        T::check(self)
    }
}

#[doc(hidden)]
/// Associate a static type to a TypeSystem
pub trait TypeAssoc<TS: TypeSystem> {
    fn check(ts: TS) -> CXResult<()>;
}

#[doc(hidden)]
/// Realize means that a TypeSystem can realize a parameterized func F, based on its current variants.
pub trait Realize<F>
where
    F: ParameterizedFunc,
{
    /// realize a parameterized function with the type that self currently is.
    fn realize(self) -> CXResult<F::Function>;
}

#[doc(hidden)]
/// A ParameterizedFunc refers to a function that is parameterized on a type T,
/// where type T will be dynaically determined by the variant of a TypeSystem.
/// An example is the `transmit<S,W,T>` function. When piping values from a source
/// to the destination, its type `T` is determined by the schema at the runtime.
pub trait ParameterizedFunc {
    type Function;
    fn realize<T>() -> Self::Function
    where
        Self: ParameterizedOn<T>,
    {
        Self::parameterize()
    }
}

#[doc(hidden)]
/// `ParameterizedOn` indicates a parameterized function `Self`
/// is parameterized on type `T`
pub trait ParameterizedOn<T>: ParameterizedFunc {
    fn parameterize() -> Self::Function;
}

/// Defines a rule to convert a type `T` to a type `U`.
pub trait TypeConversion<T, U> {
    fn convert(val: T) -> U;
}

/// Transport asks the source to produce a value, do type conversion and then write
/// the value to a destination. Do not manually implement this trait for types.
/// Use [`impl_transport!`] to create a struct that implements this trait instead.
pub trait Transport {
    type TSS: TypeSystem;
    type TSD: TypeSystem;
    type S: Source;
    type D: Destination;
    type Error: From<ConnectorXError>
        + From<<Self::S as Source>::Error>
        + From<<Self::D as Destination>::Error>
        + Send
        + std::fmt::Debug;

    /// convert_typesystem convert the source type system TSS to the destination
    /// type system TSD.
    fn convert_typesystem(ts: Self::TSS) -> CXResult<Self::TSD>;

    /// convert_type convert the type T1 associated with the source type system
    /// TSS to a type T2 which is associated with the destination type system TSD.
    fn convert_type<T1, T2>(val: T1) -> T2
    where
        Self: TypeConversion<T1, T2>,
    {
        <Self as TypeConversion<T1, T2>>::convert(val)
    }

    /// `process` will ask source to produce a value with type T1, based on TSS, and then do
    /// type conversion using `convert_type` to get value with type T2, which is associated to
    /// TSD. Finally, it will write the value with type T2 to the destination.
    fn process<'s, 'd, 'r>(
        ts1: Self::TSS,
        ts2: Self::TSD,
        src: &'r mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
        dst: &'r mut <Self::D as Destination>::Partition<'d>,
    ) -> Result<(), Self::Error>
    where
        Self: 'd;

    #[allow(clippy::type_complexity)]
    fn processor<'s, 'd>(
        ts1: Self::TSS,
        ts2: Self::TSD,
    ) -> CXResult<
        fn(
            src: &mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
            dst: &mut <Self::D as Destination>::Partition<'d>,
        ) -> Result<(), Self::Error>,
    >
    where
        Self: 'd;
}

#[doc(hidden)]
pub fn process<'s, 'd, 'r, T1, T2, TP, S, D, ES, ED, ET>(
    src: &'r mut <<S as Source>::Partition as SourcePartition>::Parser<'s>,
    dst: &'r mut <D as Destination>::Partition<'d>,
) -> Result<(), ET>
where
    T1: TypeAssoc<<S as Source>::TypeSystem>,
    S: Source<Error = ES>,
    <S as Source>::Partition: SourcePartition<Error = ES>,

    <<S as Source>::Partition as SourcePartition>::Parser<'s>: Produce<'r, T1, Error = ES>,
    ES: From<ConnectorXError> + Send,

    T2: TypeAssoc<<D as Destination>::TypeSystem>,
    D: Destination<Error = ED>,
    <D as Destination>::Partition<'d>: Consume<T2, Error = ED>,
    ED: From<ConnectorXError> + Send,

    TP: TypeConversion<T1, T2>,
    ET: From<ES> + From<ED>,
{
    let val: T1 = PartitionParser::parse(src)?;
    let val: T2 = <TP as TypeConversion<T1, _>>::convert(val);
    DestinationPartition::write(dst, val)?;
    Ok(())
}