connectorx/transports/
mssql_arrowstream.rs

1//! Transport from MsSQL Source to Arrow Destination.
2
3use crate::destinations::arrowstream::{ArrowDestination, ArrowDestinationError, ArrowTypeSystem};
4use crate::sources::mssql::{FloatN, IntN, MsSQLSource, MsSQLSourceError, MsSQLTypeSystem};
5use crate::typesystem::TypeConversion;
6use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
7use num_traits::ToPrimitive;
8use rust_decimal::Decimal;
9use thiserror::Error;
10use uuid_old::Uuid;
11
12/// Convert MsSQL data types to Arrow data types.
13pub struct MsSQLArrowTransport;
14
15#[derive(Error, Debug)]
16pub enum MsSQLArrowTransportError {
17    #[error(transparent)]
18    Source(#[from] MsSQLSourceError),
19
20    #[error(transparent)]
21    Destination(#[from] ArrowDestinationError),
22
23    #[error(transparent)]
24    ConnectorX(#[from] crate::errors::ConnectorXError),
25}
26
27impl_transport!(
28    name = MsSQLArrowTransport,
29    error = MsSQLArrowTransportError,
30    systems = MsSQLTypeSystem => ArrowTypeSystem,
31    route = MsSQLSource => ArrowDestination,
32    mappings = {
33        { Tinyint[u8]                   => Int64[i64]                | conversion auto }
34        { Smallint[i16]                 => Int64[i64]                | conversion auto }
35        { Int[i32]                      => Int64[i64]                | conversion auto }
36        { Bigint[i64]                   => Int64[i64]                | conversion auto }
37        { Intn[IntN]                    => Int64[i64]                | conversion option }
38        { Float24[f32]                  => Float32[f32]              | conversion auto }
39        { Float53[f64]                  => Float64[f64]              | conversion auto }
40        { Floatn[FloatN]                => Float64[f64]              | conversion option }
41        { Bit[bool]                     => Boolean[bool]             | conversion auto  }
42        { Nvarchar[&'r str]             => LargeUtf8[String]         | conversion owned }
43        { Varchar[&'r str]              => LargeUtf8[String]         | conversion none }
44        { Nchar[&'r str]                => LargeUtf8[String]         | conversion none }
45        { Char[&'r str]                 => LargeUtf8[String]         | conversion none }
46        { Text[&'r str]                 => LargeUtf8[String]         | conversion none }
47        { Ntext[&'r str]                => LargeUtf8[String]         | conversion none }
48        { Binary[&'r [u8]]              => LargeBinary[Vec<u8>]      | conversion owned }
49        { Varbinary[&'r [u8]]           => LargeBinary[Vec<u8>]      | conversion none }
50        { Image[&'r [u8]]               => LargeBinary[Vec<u8>]      | conversion none }
51        { Numeric[Decimal]              => Float64[f64]              | conversion option }
52        { Decimal[Decimal]              => Float64[f64]              | conversion none }
53        { Datetime[NaiveDateTime]       => Date64[NaiveDateTime]     | conversion auto }
54        { Datetime2[NaiveDateTime]      => Date64[NaiveDateTime]     | conversion none }
55        { Smalldatetime[NaiveDateTime]  => Date64[NaiveDateTime]     | conversion none }
56        { Date[NaiveDate]               => Date32[NaiveDate]         | conversion auto }
57        { Datetimeoffset[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
58        { Uniqueidentifier[Uuid]        => LargeUtf8[String]         | conversion option }
59        { Time[NaiveTime]               => Time64[NaiveTime]         | conversion auto }
60        { SmallMoney[f32]               => Float32[f32]              | conversion none }
61        { Money[f64]                    => Float64[f64]              | conversion none }
62    }
63);
64
65impl TypeConversion<Uuid, String> for MsSQLArrowTransport {
66    fn convert(val: Uuid) -> String {
67        val.to_string()
68    }
69}
70
71impl TypeConversion<IntN, i64> for MsSQLArrowTransport {
72    fn convert(val: IntN) -> i64 {
73        val.0
74    }
75}
76
77impl TypeConversion<FloatN, f64> for MsSQLArrowTransport {
78    fn convert(val: FloatN) -> f64 {
79        val.0
80    }
81}
82
83impl TypeConversion<Decimal, f64> for MsSQLArrowTransport {
84    fn convert(val: Decimal) -> f64 {
85        val.to_f64()
86            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
87    }
88}