connectorx/transports/
mysql_arrow.rs
1use crate::{
4 destinations::arrow::{
5 typesystem::{ArrowTypeSystem, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
6 ArrowDestination, ArrowDestinationError,
7 },
8 impl_transport,
9 sources::mysql::{
10 BinaryProtocol, MySQLSource, MySQLSourceError, MySQLTypeSystem, TextProtocol,
11 },
12 typesystem::TypeConversion,
13};
14use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
15use num_traits::ToPrimitive;
16use rust_decimal::Decimal;
17use serde_json::{to_string, Value};
18use std::marker::PhantomData;
19use thiserror::Error;
20
21#[derive(Error, Debug)]
22pub enum MySQLArrowTransportError {
23 #[error(transparent)]
24 Source(#[from] MySQLSourceError),
25
26 #[error(transparent)]
27 Destination(#[from] ArrowDestinationError),
28
29 #[error(transparent)]
30 ConnectorX(#[from] crate::errors::ConnectorXError),
31}
32
33pub struct MySQLArrowTransport<P>(PhantomData<P>);
35
36impl_transport!(
37 name = MySQLArrowTransport<BinaryProtocol>,
38 error = MySQLArrowTransportError,
39 systems = MySQLTypeSystem => ArrowTypeSystem,
40 route = MySQLSource<BinaryProtocol> => ArrowDestination,
41 mappings = {
42 { Float[f32] => Float64[f64] | conversion auto }
43 { Double[f64] => Float64[f64] | conversion auto }
44 { Tiny[i8] => Boolean[bool] | conversion option }
45 { Short[i16] => Int64[i64] | conversion auto }
46 { Int24[i32] => Int64[i64] | conversion none }
47 { Long[i32] => Int64[i64] | conversion auto }
48 { LongLong[i64] => Int64[i64] | conversion auto }
49 { UTiny[u8] => Int64[i64] | conversion auto }
50 { UShort[u16] => Int64[i64] | conversion auto }
51 { ULong[u32] => Int64[i64] | conversion auto }
52 { UInt24[u32] => Int64[i64] | conversion none }
53 { ULongLong[u64] => Float64[f64] | conversion auto }
54 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
55 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
56 { Datetime[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
57 { Year[i16] => Int64[i64] | conversion none}
58 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none }
59 { Decimal[Decimal] => Float64[f64] | conversion option }
60 { VarChar[String] => LargeUtf8[String] | conversion auto }
61 { Char[String] => LargeUtf8[String] | conversion none }
62 { Enum[String] => LargeUtf8[String] | conversion none }
63 { TinyBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
64 { Blob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
65 { MediumBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
66 { LongBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
67 { Json[Value] => LargeUtf8[String] | conversion option }
68 { Bit[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
69 }
70);
71
72impl_transport!(
73 name = MySQLArrowTransport<TextProtocol>,
74 error = MySQLArrowTransportError,
75 systems = MySQLTypeSystem => ArrowTypeSystem,
76 route = MySQLSource<TextProtocol> => ArrowDestination,
77 mappings = {
78 { Float[f32] => Float64[f64] | conversion auto }
79 { Double[f64] => Float64[f64] | conversion auto }
80 { Tiny[i8] => Boolean[bool] | conversion option }
81 { Short[i16] => Int64[i64] | conversion auto }
82 { Int24[i32] => Int64[i64] | conversion none }
83 { Long[i32] => Int64[i64] | conversion auto }
84 { LongLong[i64] => Int64[i64] | conversion auto }
85 { UTiny[u8] => Int64[i64] | conversion auto }
86 { UShort[u16] => Int64[i64] | conversion auto }
87 { ULong[u32] => Int64[i64] | conversion auto }
88 { UInt24[u32] => Int64[i64] | conversion none }
89 { ULongLong[u64] => Float64[f64] | conversion auto }
90 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
91 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
92 { Datetime[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
93 { Year[i16] => Int64[i64] | conversion none}
94 { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none }
95 { Decimal[Decimal] => Float64[f64] | conversion option }
96 { VarChar[String] => LargeUtf8[String] | conversion auto }
97 { Char[String] => LargeUtf8[String] | conversion none }
98 { Enum[String] => LargeUtf8[String] | conversion none }
99 { TinyBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
100 { Blob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
101 { MediumBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
102 { LongBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
103 { Json[Value] => LargeUtf8[String] | conversion option }
104 { Bit[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
105 }
106);
107
108impl<P> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MySQLArrowTransport<P> {
109 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
110 NaiveTimeWrapperMicro(val)
111 }
112}
113
114impl<P> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MySQLArrowTransport<P> {
115 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
116 NaiveDateTimeWrapperMicro(val)
117 }
118}
119
120impl<P> TypeConversion<Decimal, f64> for MySQLArrowTransport<P> {
121 fn convert(val: Decimal) -> f64 {
122 val.to_f64()
123 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
124 }
125}
126
127impl<P> TypeConversion<Value, String> for MySQLArrowTransport<P> {
128 fn convert(val: Value) -> String {
129 to_string(&val).unwrap()
130 }
131}
132
133impl<P> TypeConversion<i8, bool> for MySQLArrowTransport<P> {
134 fn convert(val: i8) -> bool {
135 val != 0
136 }
137}