@@ -28,15 +28,14 @@ use once_cell::sync::Lazy;
2828use serde:: { Deserialize , Serialize } ;
2929use serde_json:: Error as SerdeError ;
3030use tokio:: sync:: RwLock ;
31- use tracing:: { trace, warn} ;
31+ use tracing:: { error , trace, warn} ;
3232
3333use crate :: {
3434 handlers:: http:: rbac:: RBACError , option:: CONFIG , query:: QUERY_SESSION , rbac:: map:: SessionKey ,
3535 storage:: ObjectStorageError , users:: filters:: FilterQuery , utils:: get_hash,
3636} ;
3737
3838pub mod correlation_utils;
39- pub mod http_handlers;
4039
4140pub static CORRELATIONS : Lazy < Correlation > = Lazy :: new ( Correlation :: default) ;
4241
@@ -55,7 +54,13 @@ impl Correlation {
5554 continue ;
5655 }
5756
58- let correlation: CorrelationConfig = serde_json:: from_slice ( & corr) ?;
57+ let correlation: CorrelationConfig = match serde_json:: from_slice ( & corr) {
58+ Ok ( c) => c,
59+ Err ( e) => {
60+ error ! ( "Unable to load correlation- {e}" ) ;
61+ continue ;
62+ } ,
63+ } ;
5964
6065 correlations. push ( correlation) ;
6166 }
@@ -192,7 +197,12 @@ impl CorrelationRequest {
192197 let ctx = & QUERY_SESSION ;
193198
194199 let h1: HashSet < & String > = self . table_configs . iter ( ) . map ( |t| & t. table_name ) . collect ( ) ;
195- let h2 = HashSet :: from ( [ & self . join_config . table_one , & self . join_config . table_two ] ) ;
200+ let h2: HashSet < & String > = self
201+ . join_config
202+ . join_conditions
203+ . iter ( )
204+ . map ( |j| & j. table_name )
205+ . collect ( ) ;
196206
197207 // check if table config tables are the same
198208 if h1. len ( ) != 2 {
@@ -223,13 +233,19 @@ impl CorrelationRequest {
223233 . iter ( )
224234 . map ( |c| c. as_str ( ) )
225235 . collect_vec ( ) ;
226- let join_field = if table_config. table_name == self . join_config . table_one {
227- & self . join_config . field_one
228- } else {
229- & self . join_config . field_two
230- } ;
231236
232- selected_fields. push ( join_field. as_str ( ) ) ;
237+ // unwrap because we have determined that the tables in table config are the same as those in join config
238+ let condition = self
239+ . join_config
240+ . join_conditions
241+ . iter ( )
242+ . find ( |j| j. table_name == table_config. table_name )
243+ . unwrap ( ) ;
244+ let join_field = condition. field . as_str ( ) ;
245+
246+ if !selected_fields. contains ( & join_field) {
247+ selected_fields. push ( join_field) ;
248+ }
233249
234250 // if this errors out then the table config is incorrect or join config is incorrect
235251 df. select_columns ( selected_fields. as_slice ( ) ) ?;
@@ -284,11 +300,15 @@ pub struct TableConfig {
284300 pub table_name : String ,
285301}
286302
303+ #[ derive( Debug , Clone , Serialize , Deserialize ) ]
304+ #[ serde( rename_all = "camelCase" ) ]
305+ pub struct JoinCondition {
306+ pub table_name : String ,
307+ pub field : String ,
308+ }
309+
287310#[ derive( Debug , Clone , Serialize , Deserialize ) ]
288311#[ serde( rename_all = "camelCase" ) ]
289312pub struct JoinConfig {
290- pub table_one : String ,
291- pub field_one : String ,
292- pub table_two : String ,
293- pub field_two : String ,
313+ pub join_conditions : Vec < JoinCondition > ,
294314}
0 commit comments