connectorx/
fed_rewriter.rs

1use crate::{
2    constants::{CX_REWRITER_PATH, J4RS_BASE_PATH},
3    prelude::*,
4};
5use fehler::throws;
6use j4rs::{ClasspathEntry, Instance, InvocationArg, Jvm, JvmBuilder};
7use log::debug;
8use std::collections::HashMap;
9use std::convert::TryFrom;
10use std::{env, fs};
11
12pub struct Plan {
13    pub db_name: String,
14    pub db_alias: String,
15    pub sql: String,
16    pub cardinality: usize,
17}
18
19pub struct FederatedDataSourceInfo<'a> {
20    pub conn_str_info: Option<SourceConn>,
21    pub manual_info: Option<HashMap<String, Vec<String>>>,
22    pub is_local: bool,
23    pub jdbc_url: &'a str,
24    pub jdbc_driver: &'a str,
25}
26
27impl<'a> FederatedDataSourceInfo<'a> {
28    pub fn new_from_conn_str(
29        source_conn: SourceConn,
30        is_local: bool,
31        jdbc_url: &'a str,
32        jdbc_driver: &'a str,
33    ) -> Self {
34        Self {
35            conn_str_info: Some(source_conn),
36            manual_info: None,
37            is_local,
38            jdbc_url,
39            jdbc_driver,
40        }
41    }
42    pub fn new_from_manual_schema(
43        manual_schema: HashMap<String, Vec<String>>,
44        is_local: bool,
45    ) -> Self {
46        Self {
47            conn_str_info: None,
48            manual_info: Some(manual_schema),
49            is_local,
50            jdbc_url: "",
51            jdbc_driver: "",
52        }
53    }
54}
55
56#[throws(ConnectorXOutError)]
57fn init_jvm(j4rs_base: Option<&str>) -> Jvm {
58    let base = match j4rs_base {
59        Some(path) => fs::canonicalize(path)
60            .map_err(|_| ConnectorXOutError::FileNotFoundError(path.to_string()))?,
61        None => fs::canonicalize(J4RS_BASE_PATH)
62            .map_err(|_| ConnectorXOutError::FileNotFoundError(J4RS_BASE_PATH.to_string()))?,
63    };
64    debug!("j4rs base path: {:?}", base);
65
66    let rewriter_path = env::var("CX_REWRITER_PATH").unwrap_or(CX_REWRITER_PATH.to_string());
67    let path = fs::canonicalize(rewriter_path.as_str())
68        .map_err(|_| ConnectorXOutError::FileNotFoundError(rewriter_path))?;
69
70    debug!("rewriter path: {:?}", path);
71
72    let entry = ClasspathEntry::new(path.to_str().unwrap());
73    JvmBuilder::new()
74        .skip_setting_native_lib()
75        .classpath_entry(entry)
76        .with_base_path(base.to_str().unwrap())
77        .build()?
78}
79
80#[allow(dead_code)]
81#[throws(ConnectorXOutError)]
82fn create_sources(
83    jvm: &Jvm,
84    db_map: &HashMap<String, FederatedDataSourceInfo>,
85) -> (Instance, Instance) {
86    debug!("Could not find environment variable `FED_CONFIG_PATH`, use manual configuration (c++ API only)!");
87    let mut db_config = vec![];
88    let db_manual = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
89
90    for (db_name, db_info) in db_map.iter() {
91        if db_info.manual_info.is_some() {
92            let manual_info = db_info.manual_info.as_ref().unwrap();
93            let schema_info = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
94            for (name, columns) in manual_info {
95                let arr_instance = jvm.java_list("java.lang.String", columns.to_vec())?;
96                jvm.invoke(
97                    &schema_info,
98                    "put",
99                    &[
100                        InvocationArg::try_from(name).unwrap(),
101                        InvocationArg::try_from(arr_instance).unwrap(),
102                    ],
103                )?;
104            }
105            let fed_ds = jvm.create_instance(
106                "ai.dataprep.federated.FederatedDataSource",
107                &[
108                    InvocationArg::try_from(db_info.is_local).unwrap(),
109                    InvocationArg::try_from(schema_info).unwrap(),
110                ],
111            )?;
112            jvm.invoke(
113                &db_manual,
114                "put",
115                &[
116                    InvocationArg::try_from(db_name).unwrap(),
117                    InvocationArg::try_from(fed_ds).unwrap(),
118                ],
119            )?;
120        } else {
121            db_config.push(String::from(db_name));
122        }
123    }
124    let db_config = jvm.java_list("java.lang.String", db_config)?;
125    (db_config, db_manual)
126}
127
128#[allow(dead_code)]
129#[throws(ConnectorXOutError)]
130fn create_sources2(
131    jvm: &Jvm,
132    db_map: &HashMap<String, FederatedDataSourceInfo>,
133) -> (Instance, Instance) {
134    debug!("Found environment variable `FED_CONFIG_PATH`, use configurations!");
135    let mut dbs = vec![];
136    let db_manual = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
137    for db in db_map.keys() {
138        dbs.push(String::from(db));
139    }
140    (jvm.java_list("java.lang.String", dbs)?, db_manual)
141}
142
143#[throws(ConnectorXOutError)]
144pub fn rewrite_sql(
145    sql: &str,
146    db_map: &HashMap<String, FederatedDataSourceInfo>,
147    j4rs_base: Option<&str>,
148    strategy: &str,
149) -> Vec<Plan> {
150    let jvm = init_jvm(j4rs_base)?;
151    debug!("init jvm successfully!");
152
153    let sql = InvocationArg::try_from(sql).unwrap();
154    let strategy = InvocationArg::try_from(strategy).unwrap();
155
156    let (db_config, db_manual) = match env::var("FED_CONFIG_PATH") {
157        Ok(_) => create_sources2(&jvm, db_map)?,
158        _ => create_sources(&jvm, db_map)?,
159    };
160    let rewriter = jvm.create_instance(
161        "ai.dataprep.accio.FederatedQueryRewriter",
162        InvocationArg::empty(),
163    )?;
164    let db_config = InvocationArg::try_from(db_config).unwrap();
165    let db_manual = InvocationArg::try_from(db_manual).unwrap();
166    let plan = jvm.invoke(&rewriter, "rewrite", &[sql, db_config, db_manual, strategy])?;
167
168    let count = jvm.invoke(&plan, "getCount", InvocationArg::empty())?;
169    let count: i32 = jvm.to_rust(count)?;
170    debug!("rewrite finished, got {} queries", count);
171
172    let mut fed_plan = vec![];
173    for i in 0..count {
174        let idx = [InvocationArg::try_from(i).unwrap().into_primitive()?];
175
176        let db = jvm.invoke(&plan, "getDBName", &idx)?;
177        let db: String = jvm.to_rust(db)?;
178
179        let alias_db = jvm.invoke(&plan, "getAliasDBName", &idx)?;
180        let alias_db: String = jvm.to_rust(alias_db)?;
181
182        let rewrite_sql = jvm.invoke(&plan, "getSql", &idx)?;
183        let rewrite_sql: String = jvm.to_rust(rewrite_sql)?;
184
185        let cardinality = jvm.invoke(&plan, "getCardinality", &idx)?;
186        let cardinality: usize = jvm.to_rust(cardinality)?;
187
188        debug!(
189            "{} - db: {}, alias: {}, cardinality: {}, rewrite sql: {}",
190            i, db, alias_db, cardinality, rewrite_sql
191        );
192        fed_plan.push(Plan {
193            db_name: db,
194            db_alias: alias_db,
195            sql: rewrite_sql,
196            cardinality,
197        });
198    }
199    fed_plan
200}