connectorx/transports/
mssql_arrow.rs1use crate::destinations::arrow::{
4 typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
5 ArrowDestination, ArrowDestinationError, ArrowTypeSystem,
6};
7use crate::sources::mssql::{FloatN, IntN, MsSQLSource, MsSQLSourceError, MsSQLTypeSystem};
8use crate::typesystem::TypeConversion;
9use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
10use num_traits::ToPrimitive;
11use rust_decimal::Decimal;
12use thiserror::Error;
13use uuid_old::Uuid;
14
15pub struct MsSQLArrowTransport;
17
18#[derive(Error, Debug)]
19pub enum MsSQLArrowTransportError {
20 #[error(transparent)]
21 Source(#[from] MsSQLSourceError),
22
23 #[error(transparent)]
24 Destination(#[from] ArrowDestinationError),
25
26 #[error(transparent)]
27 ConnectorX(#[from] crate::errors::ConnectorXError),
28}
29
30impl_transport!(
31 name = MsSQLArrowTransport,
32 error = MsSQLArrowTransportError,
33 systems = MsSQLTypeSystem => ArrowTypeSystem,
34 route = MsSQLSource => ArrowDestination,
35 mappings = {
36 { Tinyint[u8] => Int64[i64] | conversion auto }
37 { Smallint[i16] => Int64[i64] | conversion auto }
38 { Int[i32] => Int64[i64] | conversion auto }
39 { Bigint[i64] => Int64[i64] | conversion auto }
40 { Intn[IntN] => Int64[i64] | conversion option }
41 { Float24[f32] => Float32[f32] | conversion auto }
42 { Float53[f64] => Float64[f64] | conversion auto }
43 { Floatn[FloatN] => Float64[f64] | conversion option }
44 { Bit[bool] => Boolean[bool] | conversion auto }
45 { Nvarchar[&'r str] => LargeUtf8[String] | conversion owned }
46 { Varchar[&'r str] => LargeUtf8[String] | conversion none }
47 { Nchar[&'r str] => LargeUtf8[String] | conversion none }
48 { Char[&'r str] => LargeUtf8[String] | conversion none }
49 { Text[&'r str] => LargeUtf8[String] | conversion none }
50 { Ntext[&'r str] => LargeUtf8[String] | conversion none }
51 { Binary[&'r [u8]] => LargeBinary[Vec<u8>] | conversion owned }
52 { Varbinary[&'r [u8]] => LargeBinary[Vec<u8>] | conversion none }
53 { Image[&'r [u8]] => LargeBinary[Vec<u8>] | conversion none }
54 { Numeric[Decimal] => Float64[f64] | conversion option }
55 { Decimal[Decimal] => Float64[f64] | conversion none }
56 { Datetime[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
57 { Datetime2[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none }
58 { Smalldatetime[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none }
59 { Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
60 { Datetimeoffset[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
61 { Uniqueidentifier[Uuid] => LargeUtf8[String] | conversion option }
62 { Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
63 { SmallMoney[f32] => Float32[f32] | conversion none }
64 { Money[f64] => Float64[f64] | conversion none }
65 }
66);
67
68impl TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for MsSQLArrowTransport {
69 fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
70 NaiveTimeWrapperMicro(val)
71 }
72}
73
74impl TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro> for MsSQLArrowTransport {
75 fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
76 NaiveDateTimeWrapperMicro(val)
77 }
78}
79
80impl TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for MsSQLArrowTransport {
81 fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
82 DateTimeWrapperMicro(val)
83 }
84}
85
86impl TypeConversion<Uuid, String> for MsSQLArrowTransport {
87 fn convert(val: Uuid) -> String {
88 val.to_string()
89 }
90}
91
92impl TypeConversion<IntN, i64> for MsSQLArrowTransport {
93 fn convert(val: IntN) -> i64 {
94 val.0
95 }
96}
97
98impl TypeConversion<FloatN, f64> for MsSQLArrowTransport {
99 fn convert(val: FloatN) -> f64 {
100 val.0
101 }
102}
103
104impl TypeConversion<Decimal, f64> for MsSQLArrowTransport {
105 fn convert(val: Decimal) -> f64 {
106 val.to_f64()
107 .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
108 }
109}