Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl EventFormat for Event {
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
Expand All @@ -168,6 +169,7 @@ impl EventFormat for Event {
static_schema_flag,
time_partition,
schema_version,
p_custom_fields,
)?;

Ok(super::Event {
Expand Down
26 changes: 7 additions & 19 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use serde_json::Value;
use crate::{
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
utils::arrow::{add_parseable_fields, get_field},
};

use super::{Event, DEFAULT_TIMESTAMP_KEY};
Expand Down Expand Up @@ -145,9 +145,10 @@ pub trait EventFormat: Sized {
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
p_custom_fields: &HashMap<String, String>,
) -> Result<(RecordBatch, bool), AnyError> {
let p_timestamp = self.get_p_timestamp();
let (data, mut schema, is_first) = self.to_data(
let (data, schema, is_first) = self.to_data(
storage_schema,
time_partition,
schema_version,
Expand All @@ -161,16 +162,6 @@ pub trait EventFormat: Sized {
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
Expand All @@ -179,13 +170,9 @@ pub trait EventFormat: Sized {
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
);
let rb = Self::decode(data, new_schema.clone())?;

let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;

Ok((rb, is_first))
}
Expand Down Expand Up @@ -223,6 +210,7 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
) -> Result<Event, AnyError>;
}

Expand Down
3 changes: 3 additions & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const USER_AGENT_KEY: &str = "p_user_agent";
pub const SOURCE_IP_KEY: &str = "p_src_ip";
pub const FORMAT_KEY: &str = "p_format";

#[derive(Clone)]
pub struct Event {
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use actix_web::{
middleware::Next,
};
use actix_web_httpauth::extractors::basic::BasicAuth;
use http::header::USER_AGENT;
use ulid::Ulid;

use crate::{
Expand Down Expand Up @@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
)
.with_user_agent(
req.headers()
.get("User-Agent")
.get(USER_AGENT)
.and_then(|a| a.to_str().ok())
.unwrap_or_default(),
)
Expand Down
80 changes: 63 additions & 17 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::flatten_and_push_logs;
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;

Expand Down Expand Up @@ -72,6 +72,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
return Err(PostError::OtelNotSupported);
}

let p_custom_fields = get_custom_fields_from_header(req);

let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
PARSEABLE
.create_stream_if_not_exists(
Expand All @@ -81,7 +83,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -102,6 +104,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
None,
SchemaVersion::V0,
StreamType::Internal,
&HashMap::new(),
)?
.process()?;

Expand Down Expand Up @@ -143,8 +146,9 @@ pub async fn handle_otel_logs_ingestion(
vec![log_source_entry],
)
.await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source).await?;
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -166,6 +170,7 @@ pub async fn handle_otel_metrics_ingestion(
if log_source != LogSource::OtelMetrics {
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
}

let stream_name = stream_name.to_str().unwrap().to_owned();
let log_source_entry = LogSourceEntry::new(
log_source.clone(),
Expand All @@ -182,7 +187,9 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -222,7 +229,9 @@ pub async fn handle_otel_traces_ingestion(
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -271,7 +280,8 @@ pub async fn post_event(
return Err(PostError::OtelNotSupported);
}

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -421,7 +431,13 @@ mod tests {
});

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -449,7 +465,13 @@ mod tests {
});

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -481,7 +503,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -513,7 +535,7 @@ mod tests {
);

assert!(json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err());
}

Expand All @@ -531,7 +553,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -572,7 +594,13 @@ mod tests {
]);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -620,7 +648,13 @@ mod tests {
]);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -669,7 +703,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -718,7 +752,7 @@ mod tests {
);

assert!(json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err());
}

Expand Down Expand Up @@ -758,7 +792,13 @@ mod tests {
.unwrap();

let (rb, _) = json::Event::new(flattened_json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();
assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -841,7 +881,13 @@ mod tests {
.unwrap();

let (rb, _) = json::Event::new(flattened_json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V1,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 4);
Expand Down
Loading
Loading