1use crate::{
2 constants::{CX_REWRITER_PATH, J4RS_BASE_PATH},
3 prelude::*,
4};
5use fehler::throws;
6use j4rs::{ClasspathEntry, Instance, InvocationArg, Jvm, JvmBuilder};
7use log::debug;
8use std::collections::HashMap;
9use std::convert::TryFrom;
10use std::{env, fs};
11
12pub struct Plan {
13 pub db_name: String,
14 pub db_alias: String,
15 pub sql: String,
16 pub cardinality: usize,
17}
18
19pub struct FederatedDataSourceInfo<'a> {
20 pub conn_str_info: Option<SourceConn>,
21 pub manual_info: Option<HashMap<String, Vec<String>>>,
22 pub is_local: bool,
23 pub jdbc_url: &'a str,
24 pub jdbc_driver: &'a str,
25}
26
27impl<'a> FederatedDataSourceInfo<'a> {
28 pub fn new_from_conn_str(
29 source_conn: SourceConn,
30 is_local: bool,
31 jdbc_url: &'a str,
32 jdbc_driver: &'a str,
33 ) -> Self {
34 Self {
35 conn_str_info: Some(source_conn),
36 manual_info: None,
37 is_local,
38 jdbc_url,
39 jdbc_driver,
40 }
41 }
42 pub fn new_from_manual_schema(
43 manual_schema: HashMap<String, Vec<String>>,
44 is_local: bool,
45 ) -> Self {
46 Self {
47 conn_str_info: None,
48 manual_info: Some(manual_schema),
49 is_local,
50 jdbc_url: "",
51 jdbc_driver: "",
52 }
53 }
54}
55
56#[throws(ConnectorXOutError)]
57fn init_jvm(j4rs_base: Option<&str>) -> Jvm {
58 let base = match j4rs_base {
59 Some(path) => fs::canonicalize(path)
60 .map_err(|_| ConnectorXOutError::FileNotFoundError(path.to_string()))?,
61 None => fs::canonicalize(J4RS_BASE_PATH)
62 .map_err(|_| ConnectorXOutError::FileNotFoundError(J4RS_BASE_PATH.to_string()))?,
63 };
64 debug!("j4rs base path: {:?}", base);
65
66 let rewriter_path = env::var("CX_REWRITER_PATH").unwrap_or(CX_REWRITER_PATH.to_string());
67 let path = fs::canonicalize(rewriter_path.as_str())
68 .map_err(|_| ConnectorXOutError::FileNotFoundError(rewriter_path))?;
69
70 debug!("rewriter path: {:?}", path);
71
72 let entry = ClasspathEntry::new(path.to_str().unwrap());
73 JvmBuilder::new()
74 .skip_setting_native_lib()
75 .classpath_entry(entry)
76 .with_base_path(base.to_str().unwrap())
77 .build()?
78}
79
80#[allow(dead_code)]
81#[throws(ConnectorXOutError)]
82fn create_sources(
83 jvm: &Jvm,
84 db_map: &HashMap<String, FederatedDataSourceInfo>,
85) -> (Instance, Instance) {
86 debug!("Could not find environment variable `FED_CONFIG_PATH`, use manual configuration (c++ API only)!");
87 let mut db_config = vec![];
88 let db_manual = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
89
90 for (db_name, db_info) in db_map.iter() {
91 if db_info.manual_info.is_some() {
92 let manual_info = db_info.manual_info.as_ref().unwrap();
93 let schema_info = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
94 for (name, columns) in manual_info {
95 let arr_instance = jvm.java_list("java.lang.String", columns.to_vec())?;
96 jvm.invoke(
97 &schema_info,
98 "put",
99 &[
100 InvocationArg::try_from(name).unwrap(),
101 InvocationArg::try_from(arr_instance).unwrap(),
102 ],
103 )?;
104 }
105 let fed_ds = jvm.create_instance(
106 "ai.dataprep.federated.FederatedDataSource",
107 &[
108 InvocationArg::try_from(db_info.is_local).unwrap(),
109 InvocationArg::try_from(schema_info).unwrap(),
110 ],
111 )?;
112 jvm.invoke(
113 &db_manual,
114 "put",
115 &[
116 InvocationArg::try_from(db_name).unwrap(),
117 InvocationArg::try_from(fed_ds).unwrap(),
118 ],
119 )?;
120 } else {
121 db_config.push(String::from(db_name));
122 }
123 }
124 let db_config = jvm.java_list("java.lang.String", db_config)?;
125 (db_config, db_manual)
126}
127
128#[allow(dead_code)]
129#[throws(ConnectorXOutError)]
130fn create_sources2(
131 jvm: &Jvm,
132 db_map: &HashMap<String, FederatedDataSourceInfo>,
133) -> (Instance, Instance) {
134 debug!("Found environment variable `FED_CONFIG_PATH`, use configurations!");
135 let mut dbs = vec![];
136 let db_manual = jvm.create_instance("java.util.HashMap", InvocationArg::empty())?;
137 for db in db_map.keys() {
138 dbs.push(String::from(db));
139 }
140 (jvm.java_list("java.lang.String", dbs)?, db_manual)
141}
142
143#[throws(ConnectorXOutError)]
144pub fn rewrite_sql(
145 sql: &str,
146 db_map: &HashMap<String, FederatedDataSourceInfo>,
147 j4rs_base: Option<&str>,
148 strategy: &str,
149) -> Vec<Plan> {
150 let jvm = init_jvm(j4rs_base)?;
151 debug!("init jvm successfully!");
152
153 let sql = InvocationArg::try_from(sql).unwrap();
154 let strategy = InvocationArg::try_from(strategy).unwrap();
155
156 let (db_config, db_manual) = match env::var("FED_CONFIG_PATH") {
157 Ok(_) => create_sources2(&jvm, db_map)?,
158 _ => create_sources(&jvm, db_map)?,
159 };
160 let rewriter = jvm.create_instance(
161 "ai.dataprep.accio.FederatedQueryRewriter",
162 InvocationArg::empty(),
163 )?;
164 let db_config = InvocationArg::try_from(db_config).unwrap();
165 let db_manual = InvocationArg::try_from(db_manual).unwrap();
166 let plan = jvm.invoke(&rewriter, "rewrite", &[sql, db_config, db_manual, strategy])?;
167
168 let count = jvm.invoke(&plan, "getCount", InvocationArg::empty())?;
169 let count: i32 = jvm.to_rust(count)?;
170 debug!("rewrite finished, got {} queries", count);
171
172 let mut fed_plan = vec![];
173 for i in 0..count {
174 let idx = [InvocationArg::try_from(i).unwrap().into_primitive()?];
175
176 let db = jvm.invoke(&plan, "getDBName", &idx)?;
177 let db: String = jvm.to_rust(db)?;
178
179 let alias_db = jvm.invoke(&plan, "getAliasDBName", &idx)?;
180 let alias_db: String = jvm.to_rust(alias_db)?;
181
182 let rewrite_sql = jvm.invoke(&plan, "getSql", &idx)?;
183 let rewrite_sql: String = jvm.to_rust(rewrite_sql)?;
184
185 let cardinality = jvm.invoke(&plan, "getCardinality", &idx)?;
186 let cardinality: usize = jvm.to_rust(cardinality)?;
187
188 debug!(
189 "{} - db: {}, alias: {}, cardinality: {}, rewrite sql: {}",
190 i, db, alias_db, cardinality, rewrite_sql
191 );
192 fed_plan.push(Plan {
193 db_name: db,
194 db_alias: alias_db,
195 sql: rewrite_sql,
196 cardinality,
197 });
198 }
199 fed_plan
200}