Skip to content

Commit a5e857c

Browse files
committed
refactor: modified struct
1 parent e60a884 commit a5e857c

File tree

5 files changed

+140
-52
lines changed

5 files changed

+140
-52
lines changed

src/correlation/correlation_utils.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,24 @@
1616
*
1717
*/
1818

19-
use datafusion::common::tree_node::TreeNode;
19+
use itertools::Itertools;
2020

21-
use crate::{
22-
query::{TableScanVisitor, QUERY_SESSION},
23-
rbac::{
21+
use crate::rbac::{
2422
map::SessionKey,
2523
role::{Action, Permission},
2624
Users,
27-
},
28-
};
25+
};
2926

30-
use super::CorrelationError;
31-
32-
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, CorrelationError> {
33-
let session_state = QUERY_SESSION.state();
34-
let raw_logical_plan = session_state
35-
.create_logical_plan(query)
36-
.await
37-
.map_err(|err| CorrelationError::AnyhowError(err.into()))?;
38-
39-
let mut visitor = TableScanVisitor::default();
40-
let _ = raw_logical_plan.visit(&mut visitor);
41-
Ok(visitor)
42-
}
27+
use super::{CorrelationError, TableConfig};
4328

4429
pub async fn user_auth_for_query(
4530
session_key: &SessionKey,
46-
query: &str,
31+
table_configs: &[TableConfig],
4732
) -> Result<(), CorrelationError> {
48-
let tables = get_tables_from_query(query).await?;
33+
let tables = table_configs.iter().map(|t| &t.table_name).collect_vec();
4934
let permissions = Users.get_permissions(session_key);
5035

51-
for table_name in tables.into_inner().iter() {
36+
for table_name in tables {
5237
let mut authorized = false;
5338

5439
// in permission check if user can run query on the stream.

src/correlation/http_handlers.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use relative_path::RelativePathBuf;
2323
use crate::{
2424
option::CONFIG,
2525
storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY},
26-
utils::{actix::extract_session_key_from_req, uid::Uid},
26+
utils::actix::extract_session_key_from_req,
2727
};
2828

2929
use super::{
@@ -53,7 +53,7 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
5353

5454
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
5555

56-
if user_auth_for_query(&session_key, &correlation.query)
56+
if user_auth_for_query(&session_key, &correlation.table_configs)
5757
.await
5858
.is_ok()
5959
{
@@ -68,10 +68,10 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, Corre
6868
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
6969

7070
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
71-
let correlation: CorrelationConfig = correlation_request.into();
7271

73-
// validate user's query auth
74-
user_auth_for_query(&session_key, &correlation.query).await?;
72+
correlation_request.validate(&session_key).await?;
73+
74+
let correlation: CorrelationConfig = correlation_request.into();
7575

7676
// Save to disk
7777
let store = CONFIG.storage().get_object_store();
@@ -82,7 +82,7 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, Corre
8282

8383
Ok(format!(
8484
"Saved correlation with ID- {}",
85-
correlation.id.to_string()
85+
correlation.id
8686
))
8787
}
8888

@@ -95,17 +95,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
9595
.get("correlation_id")
9696
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
9797

98-
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
98+
// validate whether user has access to this correlation object or not
99+
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
100+
user_auth_for_query(&session_key, &correlation.table_configs).await?;
99101

100-
// validate user's query auth
101-
user_auth_for_query(&session_key, &correlation_request.query).await?;
102+
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
103+
correlation_request.validate(&session_key).await?;
102104

103-
let correlation: CorrelationConfig = CorrelationConfig {
104-
version: correlation_request.version,
105-
id: Uid::from_string(correlation_id)
106-
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?,
107-
query: correlation_request.query,
108-
};
105+
let correlation = correlation_request.generate_correlation_config(correlation_id.to_owned());
109106

110107
// Save to disk
111108
let store = CONFIG.storage().get_object_store();
@@ -116,7 +113,7 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
116113

117114
Ok(format!(
118115
"Modified correlation with ID- {}",
119-
correlation.id.to_string()
116+
correlation.id
120117
))
121118
}
122119

@@ -132,14 +129,14 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
132129
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
133130

134131
// validate user's query auth
135-
user_auth_for_query(&session_key, &correlation.query).await?;
132+
user_auth_for_query(&session_key, &correlation.table_configs).await?;
136133

137134
// Delete from disk
138135
let store = CONFIG.storage().get_object_store();
139136
let path = RelativePathBuf::from_iter([
140137
PARSEABLE_ROOT_DIRECTORY,
141138
CORRELATION_DIRECTORY,
142-
&format!("{}", correlation.id),
139+
&correlation.id.to_string(),
143140
]);
144141
store.delete_object(&path).await?;
145142

src/correlation/mod.rs

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
20+
1921
use actix_web::http::header::ContentType;
22+
use chrono::Utc;
2023
use correlation_utils::user_auth_for_query;
24+
use datafusion::error::DataFusionError;
2125
use http::StatusCode;
2226
use itertools::Itertools;
2327
use once_cell::sync::Lazy;
@@ -27,8 +31,8 @@ use tokio::sync::RwLock;
2731
use tracing::{trace, warn};
2832

2933
use crate::{
30-
handlers::http::rbac::RBACError, option::CONFIG, rbac::map::SessionKey,
31-
storage::ObjectStorageError, utils::uid::Uid,
34+
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
35+
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
3236
};
3337

3438
pub mod correlation_utils;
@@ -69,7 +73,10 @@ impl Correlation {
6973

7074
let mut user_correlations = vec![];
7175
for c in correlations {
72-
if user_auth_for_query(session_key, &c.query).await.is_ok() {
76+
if user_auth_for_query(session_key, &c.table_configs)
77+
.await
78+
.is_ok()
79+
{
7380
user_correlations.push(c);
7481
}
7582
}
@@ -83,7 +90,7 @@ impl Correlation {
8390
let read = self.0.read().await;
8491
let correlation = read
8592
.iter()
86-
.find(|c| c.id.to_string() == correlation_id)
93+
.find(|c| c.id == correlation_id)
8794
.cloned();
8895

8996
if let Some(c) = correlation {
@@ -110,7 +117,7 @@ impl Correlation {
110117
let index = read_access
111118
.iter()
112119
.enumerate()
113-
.find(|(_, c)| c.id.to_string() == correlation_id)
120+
.find(|(_, c)| c.id == correlation_id)
114121
.to_owned();
115122

116123
if let Some((index, _)) = index {
@@ -126,6 +133,7 @@ impl Correlation {
126133
}
127134

128135
#[derive(Debug, Clone, Serialize, Deserialize)]
136+
#[serde(rename_all = "camelCase")]
129137
pub enum CorrelationVersion {
130138
V1,
131139
}
@@ -134,8 +142,12 @@ pub enum CorrelationVersion {
134142
#[serde(rename_all = "camelCase")]
135143
pub struct CorrelationConfig {
136144
pub version: CorrelationVersion,
137-
pub id: Uid,
138-
pub query: String,
145+
pub id: String,
146+
pub table_configs: Vec<TableConfig>,
147+
pub join_config: JoinConfig,
148+
pub filter: Option<FilterQuery>,
149+
pub start_time: Option<String>,
150+
pub end_time: Option<String>,
139151
}
140152

141153
impl CorrelationConfig {}
@@ -144,16 +156,91 @@ impl CorrelationConfig {}
144156
#[serde(rename_all = "camelCase")]
145157
pub struct CorrelationRequest {
146158
pub version: CorrelationVersion,
147-
pub query: String,
159+
pub table_configs: Vec<TableConfig>,
160+
pub join_config: JoinConfig,
161+
pub filter: Option<FilterQuery>,
162+
pub start_time: Option<String>,
163+
pub end_time: Option<String>,
148164
}
149165

150166
impl From<CorrelationRequest> for CorrelationConfig {
151167
fn from(val: CorrelationRequest) -> Self {
152168
Self {
153169
version: val.version,
154-
id: crate::utils::uid::gen(),
155-
query: val.query,
170+
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
171+
table_configs: val.table_configs,
172+
join_config: val.join_config,
173+
filter: val.filter,
174+
start_time: val.start_time,
175+
end_time: val.end_time,
176+
}
177+
}
178+
}
179+
180+
impl CorrelationRequest {
181+
pub fn generate_correlation_config(self, id: String) -> CorrelationConfig {
182+
CorrelationConfig {
183+
version: self.version,
184+
id,
185+
table_configs: self.table_configs,
186+
join_config: self.join_config,
187+
filter: self.filter,
188+
start_time: self.start_time,
189+
end_time: self.end_time,
190+
}
191+
}
192+
193+
/// This function will validate the TableConfigs, JoinConfig, and user auth
194+
pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> {
195+
let ctx = &QUERY_SESSION;
196+
197+
let mut h1 = HashSet::new();
198+
199+
h1 = self.table_configs.iter().map(|t| &t.table_name).collect();
200+
let h2 = HashSet::from([&self.join_config.table_one, &self.join_config.table_two]);
201+
202+
// check if table config tables are the same
203+
if h1.len() != 2 {
204+
return Err(CorrelationError::Metadata(
205+
"Must provide config for two unique tables",
206+
));
207+
}
208+
209+
// check that the tables mentioned in join config are
210+
// the same as those in table config
211+
if h1 != h2 {
212+
return Err(CorrelationError::Metadata(
213+
"Must provide same tables for join config and table config",
214+
));
215+
}
216+
217+
// check if user has access to table
218+
user_auth_for_query(session_key, &self.table_configs).await?;
219+
220+
// to validate table config, we need to check whether the mentioned fields
221+
// are present in the table or not
222+
for table_config in self.table_configs.iter() {
223+
// table config check
224+
let df = ctx.table(&table_config.table_name).await?;
225+
226+
let mut selected_fields = table_config
227+
.selected_fields
228+
.iter()
229+
.map(|c| c.as_str())
230+
.collect_vec();
231+
let join_field = if table_config.table_name == self.join_config.table_one {
232+
&self.join_config.field_one
233+
} else {
234+
&self.join_config.field_two
235+
};
236+
237+
selected_fields.push(join_field.as_str());
238+
239+
// if this errors out then the table config is incorrect or join config is incorrect
240+
df.select_columns(selected_fields.as_slice())?;
156241
}
242+
243+
Ok(())
157244
}
158245
}
159246

@@ -171,6 +258,8 @@ pub enum CorrelationError {
171258
AnyhowError(#[from] anyhow::Error),
172259
#[error("Unauthorized")]
173260
Unauthorized,
261+
#[error("DataFusion Error: {0}")]
262+
DataFusion(#[from] DataFusionError),
174263
}
175264

176265
impl actix_web::ResponseError for CorrelationError {
@@ -182,6 +271,7 @@ impl actix_web::ResponseError for CorrelationError {
182271
Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND,
183272
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
184273
Self::Unauthorized => StatusCode::BAD_REQUEST,
274+
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
185275
}
186276
}
187277

@@ -191,3 +281,19 @@ impl actix_web::ResponseError for CorrelationError {
191281
.body(self.to_string())
192282
}
193283
}
284+
285+
#[derive(Debug, Clone, Serialize, Deserialize)]
286+
#[serde(rename_all = "camelCase")]
287+
pub struct TableConfig {
288+
pub selected_fields: Vec<String>,
289+
pub table_name: String,
290+
}
291+
292+
#[derive(Debug, Clone, Serialize, Deserialize)]
293+
#[serde(rename_all = "camelCase")]
294+
pub struct JoinConfig {
295+
pub table_one: String,
296+
pub field_one: String,
297+
pub table_two: String,
298+
pub field_two: String,
299+
}

src/handlers/http/modal/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl Server {
194194
),
195195
)
196196
.service(
197-
web::resource("/correlation/{correlation_id}")
197+
web::resource("/{correlation_id}")
198198
.route(
199199
web::get()
200200
.to(correlation::http_handlers::get)

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
640640
let path = RelativePathBuf::from_iter([
641641
PARSEABLE_ROOT_DIRECTORY,
642642
CORRELATION_DIRECTORY,
643-
&format!("{}", correlation.id),
643+
&format!("{}.json",correlation.id.to_string()),
644644
]);
645645
self.put_object(&path, to_bytes(correlation)).await?;
646646
Ok(())

0 commit comments

Comments
 (0)