Skip to content

Commit 7af280d

Browse files
time filter for all tables in query
1 parent 95aacf6 commit 7af280d

File tree

5 files changed

+75
-86
lines changed

5 files changed

+75
-86
lines changed

src/alerts/alerts_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async fn execute_local_query(
107107
filter_tag: None,
108108
};
109109

110-
let (records, _) = execute(query, &tables[0], false)
110+
let (records, _) = execute(query, false)
111111
.await
112112
.map_err(|err| AlertError::CustomError(format!("Failed to execute query: {err}")))?;
113113

src/handlers/airplane.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl FlightService for AirServiceImpl {
205205
})?;
206206
let time = Instant::now();
207207

208-
let (records, _) = execute(query, &stream_name, false)
208+
let (records, _) = execute(query, false)
209209
.await
210210
.map_err(|err| Status::internal(err.to_string()))?;
211211

src/handlers/http/query.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,9 @@ pub async fn get_records_and_fields(
9292
let creds = extract_session_key_from_req(req)?;
9393
let permissions = Users.get_permissions(&creds);
9494

95-
let table_name = tables
96-
.first()
97-
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
9895
user_auth_for_datasets(&permissions, &tables).await?;
9996

100-
let (records, fields) = execute(query, table_name, false).await?;
97+
let (records, fields) = execute(query, false).await?;
10198

10299
let records = match records {
103100
Either::Left(vec_rb) => vec_rb,
@@ -121,28 +118,28 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
121118
let creds = extract_session_key_from_req(&req)?;
122119
let permissions = Users.get_permissions(&creds);
123120

124-
let table_name = tables
125-
.first()
126-
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
127121
user_auth_for_datasets(&permissions, &tables).await?;
128122
let time = Instant::now();
129123

130124
// if the query is `select count(*) from <dataset>`
131125
// we use the `get_bin_density` method to get the count of records in the dataset
132126
// instead of executing the query using datafusion
133127
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
134-
return handle_count_query(&query_request, table_name, column_name, time).await;
128+
let table = tables
129+
.first()
130+
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
131+
return handle_count_query(&query_request, table, column_name, time).await;
135132
}
136133

137134
// if the query request has streaming = false (default)
138135
// we use datafusion's `execute` method to get the records
139136
if !query_request.streaming {
140-
return handle_non_streaming_query(query, table_name, &query_request, time).await;
137+
return handle_non_streaming_query(query, tables, &query_request, time).await;
141138
}
142139

143140
// if the query request has streaming = true
144141
// we use datafusion's `execute_stream` method to get the records
145-
handle_streaming_query(query, table_name, &query_request, time).await
142+
handle_streaming_query(query, tables, &query_request, time).await
146143
}
147144

148145
/// Handles count queries (e.g., `SELECT COUNT(*) FROM <dataset-name>`)
@@ -211,11 +208,12 @@ async fn handle_count_query(
211208
/// - `HttpResponse` with the full query result as a JSON object.
212209
async fn handle_non_streaming_query(
213210
query: LogicalQuery,
214-
table_name: &str,
211+
table_name: Vec<String>,
215212
query_request: &Query,
216213
time: Instant,
217214
) -> Result<HttpResponse, QueryError> {
218-
let (records, fields) = execute(query, table_name, query_request.streaming).await?;
215+
let first_table_name = table_name[0].clone();
216+
let (records, fields) = execute(query, query_request.streaming).await?;
219217
let records = match records {
220218
Either::Left(rbs) => rbs,
221219
Either::Right(_) => {
@@ -228,7 +226,7 @@ async fn handle_non_streaming_query(
228226
let time = time.elapsed().as_secs_f64();
229227

230228
QUERY_EXECUTE_TIME
231-
.with_label_values(&[table_name])
229+
.with_label_values(&[&first_table_name])
232230
.observe(time);
233231
let response = QueryResponse {
234232
records,
@@ -259,11 +257,12 @@ async fn handle_non_streaming_query(
259257
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
260258
async fn handle_streaming_query(
261259
query: LogicalQuery,
262-
table_name: &str,
260+
table_name: Vec<String>,
263261
query_request: &Query,
264262
time: Instant,
265263
) -> Result<HttpResponse, QueryError> {
266-
let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?;
264+
let first_table_name = table_name[0].clone();
265+
let (records_stream, fields) = execute(query, query_request.streaming).await?;
267266
let records_stream = match records_stream {
268267
Either::Left(_) => {
269268
return Err(QueryError::MalformedQuery(
@@ -275,7 +274,7 @@ async fn handle_streaming_query(
275274
let total_time = format!("{:?}", time.elapsed());
276275
let time = time.elapsed().as_secs_f64();
277276
QUERY_EXECUTE_TIME
278-
.with_label_values(&[table_name])
277+
.with_label_values(&[&first_table_name])
279278
.observe(time);
280279

281280
let send_null = query_request.send_null;

src/query/mod.rs

Lines changed: 58 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use chrono::NaiveDateTime;
2525
use chrono::{DateTime, Duration, Utc};
2626
use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::catalog::resolve_table_references;
28-
use datafusion::common::tree_node::{Transformed, TreeNode};
28+
use datafusion::common::tree_node::Transformed;
2929
use datafusion::error::DataFusionError;
3030
use datafusion::execution::disk_manager::DiskManagerConfig;
3131
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
@@ -56,7 +56,7 @@ use crate::catalog::Snapshot as CatalogSnapshot;
5656
use crate::catalog::column::{Int64Type, TypedStatistics};
5757
use crate::catalog::manifest::Manifest;
5858
use crate::catalog::snapshot::Snapshot;
59-
use crate::event;
59+
use crate::event::{self, DEFAULT_TIMESTAMP_KEY};
6060
use crate::handlers::http::query::QueryError;
6161
use crate::option::Mode;
6262
use crate::parseable::PARSEABLE;
@@ -77,7 +77,6 @@ pub static QUERY_RUNTIME: Lazy<Runtime> =
7777
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
7878
pub async fn execute(
7979
query: Query,
80-
stream_name: &str,
8180
is_streaming: bool,
8281
) -> Result<
8382
(
@@ -86,9 +85,8 @@ pub async fn execute(
8685
),
8786
ExecuteError,
8887
> {
89-
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
9088
QUERY_RUNTIME
91-
.spawn(async move { query.execute(time_partition.as_ref(), is_streaming).await })
89+
.spawn(async move { query.execute(is_streaming).await })
9290
.await
9391
.expect("The Join should have been successful")
9492
}
@@ -180,7 +178,6 @@ impl Query {
180178
/// if streaming is false, it returns a vector of record batches
181179
pub async fn execute(
182180
&self,
183-
time_partition: Option<&String>,
184181
is_streaming: bool,
185182
) -> Result<
186183
(
@@ -190,7 +187,7 @@ impl Query {
190187
ExecuteError,
191188
> {
192189
let df = QUERY_SESSION
193-
.execute_logical_plan(self.final_logical_plan(time_partition))
190+
.execute_logical_plan(self.final_logical_plan())
194191
.await?;
195192

196193
let fields = df
@@ -214,19 +211,16 @@ impl Query {
214211
Ok((results, fields))
215212
}
216213

217-
pub async fn get_dataframe(
218-
&self,
219-
time_partition: Option<&String>,
220-
) -> Result<DataFrame, ExecuteError> {
214+
pub async fn get_dataframe(&self) -> Result<DataFrame, ExecuteError> {
221215
let df = QUERY_SESSION
222-
.execute_logical_plan(self.final_logical_plan(time_partition))
216+
.execute_logical_plan(self.final_logical_plan())
223217
.await?;
224218

225219
Ok(df)
226220
}
227221

228222
/// return logical plan with all time filters applied through
229-
fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan {
223+
fn final_logical_plan(&self) -> LogicalPlan {
230224
// see https://github.com/apache/arrow-datafusion/pull/8400
231225
// this can be eliminated in later version of datafusion but with slight caveat
232226
// transform cannot modify stringified plans by itself
@@ -238,7 +232,6 @@ impl Query {
238232
plan.plan.as_ref().clone(),
239233
self.time_range.start.naive_utc(),
240234
self.time_range.end.naive_utc(),
241-
time_partition,
242235
);
243236
LogicalPlan::Explain(Explain {
244237
verbose: plan.verbose,
@@ -257,7 +250,6 @@ impl Query {
257250
x,
258251
self.time_range.start.naive_utc(),
259252
self.time_range.end.naive_utc(),
260-
time_partition,
261253
)
262254
.data
263255
}
@@ -586,66 +578,69 @@ fn transform(
586578
plan: LogicalPlan,
587579
start_time: NaiveDateTime,
588580
end_time: NaiveDateTime,
589-
time_partition: Option<&String>,
590581
) -> Transformed<LogicalPlan> {
591-
plan.transform(&|plan| match plan {
592-
LogicalPlan::TableScan(table) => {
593-
let mut new_filters = vec![];
594-
if !table_contains_any_time_filters(&table, time_partition) {
595-
let mut _start_time_filter: Expr;
596-
let mut _end_time_filter: Expr;
597-
match time_partition {
598-
Some(time_partition) => {
599-
_start_time_filter =
600-
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
601-
.binary_expr(Expr::Column(Column::new(
602-
Some(table.table_name.to_owned()),
603-
time_partition.clone(),
604-
)));
605-
_end_time_filter =
606-
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
607-
.binary_expr(Expr::Column(Column::new(
608-
Some(table.table_name.to_owned()),
609-
time_partition,
610-
)));
611-
}
612-
None => {
613-
_start_time_filter =
614-
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
615-
.binary_expr(Expr::Column(Column::new(
616-
Some(table.table_name.to_owned()),
617-
event::DEFAULT_TIMESTAMP_KEY,
618-
)));
619-
_end_time_filter =
620-
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
621-
.binary_expr(Expr::Column(Column::new(
622-
Some(table.table_name.to_owned()),
623-
event::DEFAULT_TIMESTAMP_KEY,
624-
)));
625-
}
582+
plan.transform_up_with_subqueries(&|plan| {
583+
match plan {
584+
LogicalPlan::TableScan(table) => {
585+
// Get the specific time partition for this stream
586+
let time_partition = PARSEABLE
587+
.get_stream(&table.table_name.to_string())
588+
.ok()
589+
.and_then(|stream| stream.get_time_partition());
590+
591+
let mut new_filters = vec![];
592+
if !table_contains_any_time_filters(&table, time_partition.as_ref()) {
593+
let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string();
594+
let time_column = time_partition.as_ref().unwrap_or(&default_timestamp);
595+
596+
// Create time filters with table-qualified column names
597+
let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(
598+
start_time,
599+
))
600+
.binary_expr(Expr::Column(Column::new(
601+
Some(table.table_name.to_owned()),
602+
time_column.clone(),
603+
)));
604+
605+
let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(
606+
end_time,
607+
))
608+
.binary_expr(Expr::Column(Column::new(
609+
Some(table.table_name.to_owned()),
610+
time_column.clone(),
611+
)));
612+
613+
new_filters.push(start_time_filter);
614+
new_filters.push(end_time_filter);
626615
}
627616

628-
new_filters.push(_start_time_filter);
629-
new_filters.push(_end_time_filter);
617+
let new_filter = new_filters.into_iter().reduce(and);
618+
if let Some(new_filter) = new_filter {
619+
let filter =
620+
Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table)))
621+
.unwrap();
622+
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
623+
} else {
624+
Ok(Transformed::no(LogicalPlan::TableScan(table)))
625+
}
630626
}
631-
let new_filter = new_filters.into_iter().reduce(and);
632-
if let Some(new_filter) = new_filter {
633-
let filter =
634-
Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap();
635-
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
636-
} else {
637-
Ok(Transformed::no(LogicalPlan::TableScan(table)))
627+
_ => {
628+
// For all other plan types, continue the transformation recursively
629+
// This ensures that subqueries and other nested plans are also transformed
630+
Ok(Transformed::no(plan))
638631
}
639632
}
640-
x => Ok(Transformed::no(x)),
641633
})
642-
.expect("transform only transforms the tablescan")
634+
.expect("transform processes all plan nodes")
643635
}
644636

645637
fn table_contains_any_time_filters(
646638
table: &datafusion::logical_expr::TableScan,
647639
time_partition: Option<&String>,
648640
) -> bool {
641+
let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string();
642+
let time_column = time_partition.unwrap_or(&default_timestamp);
643+
649644
table
650645
.filters
651646
.iter()
@@ -658,8 +653,7 @@ fn table_contains_any_time_filters(
658653
})
659654
.any(|expr| {
660655
matches!(&*expr.left, Expr::Column(Column { name, .. })
661-
if (time_partition.is_some_and(|field| field == name) ||
662-
(time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY)))
656+
if name == time_column)
663657
})
664658
}
665659

src/query/stream_schema_provider.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,6 @@ impl TableProvider for StandardTableProvider {
487487
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
488488
let time_partition = object_store_format.time_partition;
489489
let mut time_filters = extract_primary_filter(filters, &time_partition);
490-
if time_filters.is_empty() {
491-
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
492-
}
493-
494490
if is_within_staging_window(&time_filters) {
495491
self.get_staging_execution_plan(
496492
&mut execution_plans,

0 commit comments

Comments
 (0)