connectorx/transports/
mssql_arrowstream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//! Transport from MsSQL Source to Arrow Destination.

use crate::destinations::arrowstream::{ArrowDestination, ArrowDestinationError, ArrowTypeSystem};
use crate::sources::mssql::{FloatN, IntN, MsSQLSource, MsSQLSourceError, MsSQLTypeSystem};
use crate::typesystem::TypeConversion;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use num_traits::ToPrimitive;
use rust_decimal::Decimal;
use thiserror::Error;
use uuid::Uuid;

/// Convert MsSQL data types to Arrow data types.
pub struct MsSQLArrowTransport;

#[derive(Error, Debug)]
pub enum MsSQLArrowTransportError {
    #[error(transparent)]
    Source(#[from] MsSQLSourceError),

    #[error(transparent)]
    Destination(#[from] ArrowDestinationError),

    #[error(transparent)]
    ConnectorX(#[from] crate::errors::ConnectorXError),
}

impl_transport!(
    name = MsSQLArrowTransport,
    error = MsSQLArrowTransportError,
    systems = MsSQLTypeSystem => ArrowTypeSystem,
    route = MsSQLSource => ArrowDestination,
    mappings = {
        { Tinyint[u8]                   => Int64[i64]                | conversion auto }
        { Smallint[i16]                 => Int64[i64]                | conversion auto }
        { Int[i32]                      => Int64[i64]                | conversion auto }
        { Bigint[i64]                   => Int64[i64]                | conversion auto }
        { Intn[IntN]                    => Int64[i64]                | conversion option }
        { Float24[f32]                  => Float32[f32]              | conversion auto }
        { Float53[f64]                  => Float64[f64]              | conversion auto }
        { Floatn[FloatN]                => Float64[f64]              | conversion option }
        { Bit[bool]                     => Boolean[bool]             | conversion auto  }
        { Nvarchar[&'r str]             => LargeUtf8[String]         | conversion owned }
        { Varchar[&'r str]              => LargeUtf8[String]         | conversion none }
        { Nchar[&'r str]                => LargeUtf8[String]         | conversion none }
        { Char[&'r str]                 => LargeUtf8[String]         | conversion none }
        { Text[&'r str]                 => LargeUtf8[String]         | conversion none }
        { Ntext[&'r str]                => LargeUtf8[String]         | conversion none }
        { Binary[&'r [u8]]              => LargeBinary[Vec<u8>]      | conversion owned }
        { Varbinary[&'r [u8]]           => LargeBinary[Vec<u8>]      | conversion none }
        { Image[&'r [u8]]               => LargeBinary[Vec<u8>]      | conversion none }
        { Numeric[Decimal]              => Float64[f64]              | conversion option }
        { Decimal[Decimal]              => Float64[f64]              | conversion none }
        { Datetime[NaiveDateTime]       => Date64[NaiveDateTime]     | conversion auto }
        { Datetime2[NaiveDateTime]      => Date64[NaiveDateTime]     | conversion none }
        { Smalldatetime[NaiveDateTime]  => Date64[NaiveDateTime]     | conversion none }
        { Date[NaiveDate]               => Date32[NaiveDate]         | conversion auto }
        { Datetimeoffset[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
        { Uniqueidentifier[Uuid]        => LargeUtf8[String]         | conversion option }
        { Time[NaiveTime]               => Time64[NaiveTime]         | conversion auto }
        { SmallMoney[f32]               => Float32[f32]              | conversion none }
        { Money[f64]                    => Float64[f64]              | conversion none }
    }
);

impl TypeConversion<Uuid, String> for MsSQLArrowTransport {
    fn convert(val: Uuid) -> String {
        val.to_string()
    }
}

impl TypeConversion<IntN, i64> for MsSQLArrowTransport {
    fn convert(val: IntN) -> i64 {
        val.0
    }
}

impl TypeConversion<FloatN, f64> for MsSQLArrowTransport {
    fn convert(val: FloatN) -> f64 {
        val.0
    }
}

impl TypeConversion<Decimal, f64> for MsSQLArrowTransport {
    fn convert(val: Decimal) -> f64 {
        val.to_f64()
            .unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
    }
}