connectorx/transports/
mysql_arrowstream.rs1use crate::{
4 destinations::arrowstream::{
5 typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError,
6 },
7 impl_transport,
8 sources::mysql::{
9 BinaryProtocol, MySQLSource, MySQLSourceError, MySQLTypeSystem, TextProtocol,
10 },
11 typesystem::TypeConversion,
12};
13use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
14use num_traits::ToPrimitive;
15use rust_decimal::Decimal;
16use serde_json::{to_string, Value};
17use std::marker::PhantomData;
18use thiserror::Error;
19
20#[derive(Error, Debug)]
21pub enum MySQLArrowTransportError {
22 #[error(transparent)]
23 Source(#[from] MySQLSourceError),
24
25 #[error(transparent)]
26 Destination(#[from] ArrowDestinationError),
27
28 #[error(transparent)]
29 ConnectorX(#[from] crate::errors::ConnectorXError),
30}
31
32pub struct MySQLArrowTransport<P>(PhantomData<P>);
34
35impl_transport!(
36 name = MySQLArrowTransport<BinaryProtocol>,
37 error = MySQLArrowTransportError,
38 systems = MySQLTypeSystem => ArrowTypeSystem,
39 route = MySQLSource<BinaryProtocol> => ArrowDestination,
40 mappings = {
41 { Float[f32] => Float64[f64] | conversion auto }
42 { Double[f64] => Float64[f64] | conversion auto }
43 { Tiny[i8] => Boolean[bool] | conversion option }
44 { Short[i16] => Int64[i64] | conversion auto }
45 { Int24[i32] => Int64[i64] | conversion none }
46 { Long[i32] => Int64[i64] | conversion auto }
47 { LongLong[i64] => Int64[i64] | conversion auto }
48 { UTiny[u8] => Int64[i64] | conversion auto }
49 { UShort[u16] => Int64[i64] | conversion auto }
50 { ULong[u32] => Int64[i64] | conversion auto }
51 { UInt24[u32] => Int64[i64] | conversion none }
52 { ULongLong[u64] => Float64[f64] | conversion auto }
53 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
54 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
55 { Datetime[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
56 { Year[i16] => Int64[i64] | conversion none}
57 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion none }
58 { Decimal[Decimal] => Float64[f64] | conversion option }
59 { VarChar[String] => LargeUtf8[String] | conversion auto }
60 { Char[String] => LargeUtf8[String] | conversion none }
61 { Enum[String] => LargeUtf8[String] | conversion none }
62 { TinyBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
63 { Blob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
64 { MediumBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
65 { LongBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
66 { Json[Value] => LargeUtf8[String] | conversion option }
67 { Bit[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
68 }
69);
70
71impl_transport!(
72 name = MySQLArrowTransport<TextProtocol>,
73 error = MySQLArrowTransportError,
74 systems = MySQLTypeSystem => ArrowTypeSystem,
75 route = MySQLSource<TextProtocol> => ArrowDestination,
76 mappings = {
77 { Float[f32] => Float64[f64] | conversion auto }
78 { Double[f64] => Float64[f64] | conversion auto }
79 { Tiny[i8] => Boolean[bool] | conversion option }
80 { Short[i16] => Int64[i64] | conversion auto }
81 { Int24[i32] => Int64[i64] | conversion none }
82 { Long[i32] => Int64[i64] | conversion auto }
83 { LongLong[i64] => Int64[i64] | conversion auto }
84 { UTiny[u8] => Int64[i64] | conversion auto }
85 { UShort[u16] => Int64[i64] | conversion auto }
86 { ULong[u32] => Int64[i64] | conversion auto }
87 { UInt24[u32] => Int64[i64] | conversion none }
88 { ULongLong[u64] => Float64[f64] | conversion auto }
89 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
90 { Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
91 { Datetime[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
92 { Year[i16] => Int64[i64] | conversion none}
93 { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion none }
94 { Decimal[Decimal] => Float64[f64] | conversion option }
95 { VarChar[String] => LargeUtf8[String] | conversion auto }
96 { Char[String] => LargeUtf8[String] | conversion none }
97 { Enum[String] => LargeUtf8[String] | conversion none }
98 { TinyBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
99 { Blob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
100 { MediumBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
101 { LongBlob[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
102 { Json[Value] => LargeUtf8[String] | conversion option }
103 { Bit[Vec<u8>] => LargeBinary[Vec<u8>] | conversion none }
104 }
105);
106
107impl<P> TypeConversion<Decimal, f64> for MySQLArrowTransport<P> {
108 fn convert(val: Decimal) -> f64 {
109 val.to_f64()
110 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
111 }
112}
113
114impl<P> TypeConversion<Value, String> for MySQLArrowTransport<P> {
115 fn convert(val: Value) -> String {
116 to_string(&val).unwrap()
117 }
118}
119
120impl<P> TypeConversion<i8, bool> for MySQLArrowTransport<P> {
121 fn convert(val: i8) -> bool {
122 val != 0
123 }
124}