Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,24 @@ use warp::multipart::{FormData, Part};
use warp::reply::{with_header, Response};
use warp::{hyper::header, hyper::StatusCode, Filter, Reply};

use super::http_utils::{handle_rejection, into_response, ApiError};
use crate::auth::{token_to_principal, AccessPolicy, Action, UserContext};
use crate::catalog::DEFAULT_DB;
use crate::config::schema::{AccessSettings, MEBIBYTES};
use crate::frontend::profile_utils::Timer;
use crate::{
config::schema::{str_to_hex_hash, HttpFrontend},
context::{
is_read_only, is_statement_read_only, DefaultSeafowlContext, SeafowlContext,
},
};

use super::http_utils::{handle_rejection, into_response, ApiError};

const QUERY_HEADER: &str = "X-Seafowl-Query";
const BEARER_PREFIX: &str = "Bearer ";
// We have a very lax CORS on this, so we don't mind browsers
// caching it for as long as possible.
const CORS_MAXAGE: u32 = 86400;
const QUERY_TIME_HEADER: &str = "X-Seafowl-Query-Time";

// Vary on Origin, as warp's CORS responds with Access-Control-Allow-Origin: [origin],
// so we can't cache the response in the browser if the origin changes.
Expand Down Expand Up @@ -155,6 +156,9 @@ pub async fn uncached_read_write_query(
query: String,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let mut timer = Timer::new();
timer.start_timer();

// If a specific DB name was used as a parameter in the route, scope the context to it,
// effectively making it the default DB for the duration of the session.
if database_name != context.database {
Expand Down Expand Up @@ -215,6 +219,12 @@ pub async fn uncached_read_write_query(
.headers_mut()
.insert(header::CONTENT_TYPE, content_type_with_schema(schema));
}

let elapsed = timer.formatted_elapsed();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());

Ok(response)
}

Expand Down Expand Up @@ -285,6 +295,9 @@ pub async fn cached_read_query(
if_none_match: Option<String>,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let mut timer = Timer::new();
timer.start_timer();

// Ignore dots at the end
let query_or_hash = query_or_hash.split('.').next().unwrap();

Expand Down Expand Up @@ -346,6 +359,10 @@ pub async fn cached_read_query(
let schema = physical.schema().clone();
let mut response = plan_to_response(context, physical).await?;

let elapsed = timer.formatted_elapsed();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());
response
.headers_mut()
.insert(header::ETAG, etag.parse().unwrap());
Expand Down Expand Up @@ -477,7 +494,6 @@ pub fn filters(
.max_age(CORS_MAXAGE);

let log = warp::log(module_path!());

// Cached read query
let ctx = context.clone();
let cached_read_query_route = warp::path!(String / "q" / String)
Expand Down Expand Up @@ -605,10 +621,10 @@ pub mod tests {

use crate::catalog::DEFAULT_DB;
use crate::config::schema::{str_to_hex_hash, HttpFrontend};
use crate::testutils::schema_from_header;
use crate::testutils::{assert_header_is_float, schema_from_header};
use crate::{
context::{test_utils::in_memory_context, DefaultSeafowlContext, SeafowlContext},
frontend::http::{filters, QUERY_HEADER},
frontend::http::{filters, QUERY_HEADER, QUERY_TIME_HEADER},
};

fn http_config_from_access_policy_and_cache_control(
Expand Down Expand Up @@ -1518,7 +1534,10 @@ pub mod tests {
)]
#[case::uncached_post("POST", "/q")]
#[tokio::test]
async fn test_http_type_conversion(#[case] method: &str, #[case] path: &str) {
async fn test_http_type_conversion_and_timing_header(
#[case] method: &str,
#[case] path: &str,
) {
let context = Arc::new(in_memory_context().await);
let handler = filters(
context.clone(),
Expand Down Expand Up @@ -1572,5 +1591,11 @@ SELECT
"#
)
);

// Assert the "request-to-response" time header is present
assert!(resp.headers().contains_key(QUERY_TIME_HEADER));
// Assert that it's a float
let header_value = resp.headers().get(QUERY_TIME_HEADER).unwrap();
assert_header_is_float(header_value);
}
}
1 change: 1 addition & 0 deletions src/frontend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod http;
pub mod http_utils;
#[cfg(feature = "frontend-postgres")]
pub mod postgres;
pub mod profile_utils;
34 changes: 34 additions & 0 deletions src/frontend/profile_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::{Duration, Instant};

// Simple timer intended for profiling
// Lazily inits. Returns `Duration`s or raw string (in ms)
// Use Option as a safeguard because apparently previous Rust versions panicked
// when current time was earlier than self.

#[derive(Default)]
pub struct Timer {
start_time: Option<Instant>,
}

impl Timer {
pub fn new() -> Self {
Self { start_time: None }
}

pub fn start_timer(&mut self) {
self.start_time = Some(Instant::now())
}

pub fn elapsed(&self) -> Option<Duration> {
self.start_time.map(|start| start.elapsed())
}

pub fn formatted_elapsed(&self) -> String {
self.elapsed()
.map(|duration| {
let millis = duration.as_millis();
format!("{}", millis)
})
.unwrap_or_else(String::new)
}
}
7 changes: 7 additions & 0 deletions src/testutils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::min;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::Int32Array;
Expand Down Expand Up @@ -161,3 +162,9 @@ pub fn schema_from_header(headers: &HeaderMap<HeaderValue>) -> Schema {

schema_from_json(&schema_json).expect("arrow schema reconstructable from JSON")
}

pub fn assert_header_is_float(header: &HeaderValue) -> bool {
let float_str = header.to_str().unwrap();
let parsed_float = f64::from_str(float_str).unwrap();
parsed_float.is_finite()
}