1use crate::destinations::{Consume, Destination, DestinationPartition};
8use crate::errors::{ConnectorXError, Result as CXResult};
9use crate::sources::{PartitionParser, Produce, Source, SourcePartition};
10
11#[doc(hidden)]
12pub trait TypeSystem: Copy + Clone + Send + Sync {
16 fn check<T: TypeAssoc<Self>>(self) -> CXResult<()> {
18 T::check(self)
19 }
20}
21
22#[doc(hidden)]
23pub trait TypeAssoc<TS: TypeSystem> {
25 fn check(ts: TS) -> CXResult<()>;
26}
27
28#[doc(hidden)]
29pub trait Realize<F>
31where
32 F: ParameterizedFunc,
33{
34 fn realize(self) -> CXResult<F::Function>;
36}
37
38#[doc(hidden)]
39pub trait ParameterizedFunc {
44 type Function;
45 fn realize<T>() -> Self::Function
46 where
47 Self: ParameterizedOn<T>,
48 {
49 Self::parameterize()
50 }
51}
52
53#[doc(hidden)]
54pub trait ParameterizedOn<T>: ParameterizedFunc {
57 fn parameterize() -> Self::Function;
58}
59
60pub trait TypeConversion<T, U> {
62 fn convert(val: T) -> U;
63}
64
65pub trait Transport {
69 type TSS: TypeSystem;
70 type TSD: TypeSystem;
71 type S: Source;
72 type D: Destination;
73 type Error: From<ConnectorXError>
74 + From<<Self::S as Source>::Error>
75 + From<<Self::D as Destination>::Error>
76 + Send
77 + std::fmt::Debug;
78
79 fn convert_typesystem(ts: Self::TSS) -> CXResult<Self::TSD>;
82
83 fn convert_type<T1, T2>(val: T1) -> T2
86 where
87 Self: TypeConversion<T1, T2>,
88 {
89 <Self as TypeConversion<T1, T2>>::convert(val)
90 }
91
92 fn process<'s, 'd, 'r>(
96 ts1: Self::TSS,
97 ts2: Self::TSD,
98 src: &'r mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
99 dst: &'r mut <Self::D as Destination>::Partition<'d>,
100 ) -> Result<(), Self::Error>
101 where
102 Self: 'd;
103
104 #[allow(clippy::type_complexity)]
105 fn processor<'s, 'd>(
106 ts1: Self::TSS,
107 ts2: Self::TSD,
108 ) -> CXResult<
109 fn(
110 src: &mut <<Self::S as Source>::Partition as SourcePartition>::Parser<'s>,
111 dst: &mut <Self::D as Destination>::Partition<'d>,
112 ) -> Result<(), Self::Error>,
113 >
114 where
115 Self: 'd;
116}
117
118#[doc(hidden)]
119pub fn process<'s, 'd, 'r, T1, T2, TP, S, D, ES, ED, ET>(
120 src: &'r mut <<S as Source>::Partition as SourcePartition>::Parser<'s>,
121 dst: &'r mut <D as Destination>::Partition<'d>,
122) -> Result<(), ET>
123where
124 T1: TypeAssoc<<S as Source>::TypeSystem>,
125 S: Source<Error = ES>,
126 <S as Source>::Partition: SourcePartition<Error = ES>,
127
128 <<S as Source>::Partition as SourcePartition>::Parser<'s>: Produce<'r, T1, Error = ES>,
129 ES: From<ConnectorXError> + Send,
130
131 T2: TypeAssoc<<D as Destination>::TypeSystem>,
132 D: Destination<Error = ED>,
133 <D as Destination>::Partition<'d>: Consume<T2, Error = ED>,
134 ED: From<ConnectorXError> + Send,
135
136 TP: TypeConversion<T1, T2>,
137 ET: From<ES> + From<ED>,
138{
139 let val: T1 = PartitionParser::parse(src)?;
140 let val: T2 = <TP as TypeConversion<T1, _>>::convert(val);
141 DestinationPartition::write(dst, val)?;
142 Ok(())
143}