-
-
Notifications
You must be signed in to change notification settings - Fork 157
reject event if fields count exceed 250 #1311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
0eefe77
038df7c
a11790c
59582a6
9045f66
df59fde
e46ee5f
cb1d328
5ff122a
566ea15
1636d31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -368,6 +368,15 @@ pub struct Options { | |||||
|
|
||||||
| #[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] | ||||||
| pub ms_clarity_tag: Option<String>, | ||||||
|
|
||||||
| #[arg( | ||||||
| long, | ||||||
| env = "P_DATASET_FIELDS_ALLOWED_LIMIT", | ||||||
| default_value = "250", | ||||||
|
||||||
| value_parser = validation::validate_dataset_fields_allowed_limit, | ||||||
| help = "allowed limit for fields count in a dataset" | ||||||
|
||||||
| help = "allowed limit for fields count in a dataset" | |
| help = "total number of fields recommended in a dataset" |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -467,6 +467,8 @@ pub enum PostError { | |||||
| KnownFormat(#[from] known_schema::Error), | ||||||
| #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] | ||||||
| IncorrectLogFormat(String), | ||||||
| #[error("Ingestion has been stoppped for dataset {0} as fields count {1} exceeds the allowed limit of {2}, Please create a new dataset.")] | ||||||
nikhilsinhaparseable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| FieldsLimitExceeded(String, usize, usize), | ||||||
|
||||||
| FieldsLimitExceeded(String, usize, usize), | |
| FieldsCountLimitExceeded(String, usize, usize), |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| PostError::FieldsLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, | |
| PostError:: FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -47,13 +47,18 @@ use crate::{ | |||||||||
| const IGNORE_HEADERS: [&str; 3] = [STREAM_NAME_HEADER_KEY, LOG_SOURCE_KEY, EXTRACT_LOG_KEY]; | ||||||||||
| const MAX_CUSTOM_FIELDS: usize = 10; | ||||||||||
| const MAX_FIELD_VALUE_LENGTH: usize = 100; | ||||||||||
| // Maximum allowed count for fields in a dataset | ||||||||||
| pub const DATASET_FIELDS_ALLOWED_LIMIT: usize = 250; | ||||||||||
|
|
||||||||||
| pub async fn flatten_and_push_logs( | ||||||||||
| json: Value, | ||||||||||
| stream_name: &str, | ||||||||||
| log_source: &LogSource, | ||||||||||
| p_custom_fields: &HashMap<String, String>, | ||||||||||
| ) -> Result<(), PostError> { | ||||||||||
| // Verify the dataset fields count | ||||||||||
| verify_dataset_fields_count(stream_name)?; | ||||||||||
|
|
||||||||||
| match log_source { | ||||||||||
| LogSource::Kinesis => { | ||||||||||
| //custom flattening required for Amazon Kinesis | ||||||||||
|
|
@@ -205,6 +210,39 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin | |||||||||
| p_custom_fields | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { | ||||||||||
| let fields_count = PARSEABLE | ||||||||||
| .get_stream(stream_name)? | ||||||||||
| .get_schema() | ||||||||||
| .fields() | ||||||||||
| .len(); | ||||||||||
| let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64; | ||||||||||
|
||||||||||
| let dataset_fields_warn_threshold = 0.8 * DATASET_FIELDS_ALLOWED_LIMIT as f64; | |
| let dataset_fields_warn_threshold = 0.8 * PARSEABLE.options.dataset_fields_allowed_limit as f64; |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "Fields count {0} for dataset {1} has exceeded the warning threshold of {2} fields, Parseable recommends creating a new dataset.", | |
| "Total fields in dataset {0} has reached the warning threshold of {1}. Ingestion will not be possible after reaching {2} fields. We recommend creating a new dataset.", |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| dataset_fields_warn_threshold); | |
| stream_name, | |
| fields_count, | |
| PARSEABLE.options.dataset_fields_allowed_limit); |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this a string and use the same string that is put in the Error definition.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return Err(PostError::FieldsLimitExceeded( | |
| return Err(PostError:: FieldsCountLimitExceeded( |
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -91,6 +91,7 @@ pub mod validation { | |||||
| path::{Path, PathBuf}, | ||||||
| }; | ||||||
|
|
||||||
| use crate::handlers::http::modal::utils::ingest_utils::DATASET_FIELDS_ALLOWED_LIMIT; | ||||||
| use path_clean::PathClean; | ||||||
|
|
||||||
| use super::{Compression, Mode}; | ||||||
|
|
@@ -173,4 +174,19 @@ pub mod validation { | |||||
| Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string()) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> { | ||||||
| if let Ok(size) = s.parse::<usize>() { | ||||||
| if (1..=DATASET_FIELDS_ALLOWED_LIMIT).contains(&size) { | ||||||
| Ok(size) | ||||||
| } else { | ||||||
| Err(format!( | ||||||
| "Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}", | ||||||
|
||||||
| "Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be between 1 and {}", | |
| "Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}", |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Err("Invalid value for P_DATASET_FIELDS_ALLOWED_LIMIT. It should be given as integer value".to_string()) | |
| Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,27 +15,33 @@ | |
| * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
| * | ||
| */ | ||
|
|
||
| use super::otel_utils::collect_json_from_values; | ||
| use super::otel_utils::convert_epoch_nano_to_timestamp; | ||
| use super::otel_utils::insert_attributes; | ||
| use opentelemetry_proto::tonic::logs::v1::LogRecord; | ||
| use opentelemetry_proto::tonic::logs::v1::LogsData; | ||
| use opentelemetry_proto::tonic::logs::v1::ScopeLogs; | ||
| use opentelemetry_proto::tonic::logs::v1::SeverityNumber; | ||
| use serde_json::Map; | ||
| use serde_json::Value; | ||
|
|
||
| use super::otel_utils::add_other_attributes_if_not_empty; | ||
| use super::otel_utils::collect_json_from_values; | ||
| use super::otel_utils::convert_epoch_nano_to_timestamp; | ||
| use super::otel_utils::insert_attributes; | ||
| use super::otel_utils::merge_attributes_in_json; | ||
|
|
||
| pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ | ||
| pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the use of this list now? We're defaulting to separate columns right
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this list was initially maintained to store the known field list along with the known log format name in the stream info, with an idea that UI can use the fields list to apply quick filters
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. identifying useful columns is difficult. chances of getting it wrong are high. In UX we're working on a way that lets user decide what is important for them.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can create an issue and work on it in a separate PR, similar change needs to be done at the place when we detect schema and add fields to stream_info, other than otel |
||
| "scope_name", | ||
| "scope_version", | ||
| "scope_log_schema_url", | ||
| "scope_dropped_attributes_count", | ||
| "resource_dropped_attributes_count", | ||
| "schema_url", | ||
| "time_unix_nano", | ||
| "observed_time_unix_nano", | ||
| "severity_number", | ||
| "severity_text", | ||
| "body", | ||
| "flags", | ||
| "log_record_dropped_attributes_count", | ||
| "span_id", | ||
| "trace_id", | ||
| "event_name", | ||
| ]; | ||
| /// otel log event has severity number | ||
| /// there is a mapping of severity number to severity text provided in proto | ||
|
|
@@ -60,7 +66,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> { | |
| /// this function is called recursively for each log record object in the otel logs | ||
| pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | ||
| let mut log_record_json: Map<String, Value> = Map::new(); | ||
| let mut other_attributes = Map::new(); | ||
| log_record_json.insert( | ||
| "time_unix_nano".to_string(), | ||
| Value::String(convert_epoch_nano_to_timestamp( | ||
|
|
@@ -83,11 +88,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
| log_record_json.insert(key.to_owned(), body_json[key].to_owned()); | ||
| } | ||
| } | ||
| insert_attributes( | ||
| &mut log_record_json, | ||
| &log_record.attributes, | ||
| &mut other_attributes, | ||
| ); | ||
| insert_attributes(&mut log_record_json, &log_record.attributes); | ||
| log_record_json.insert( | ||
| "log_record_dropped_attributes_count".to_string(), | ||
| Value::Number(log_record.dropped_attributes_count.into()), | ||
|
|
@@ -106,9 +107,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
| Value::String(hex::encode(&log_record.trace_id)), | ||
| ); | ||
|
|
||
| // Add the `other_attributes` to the log record json | ||
| add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes); | ||
|
|
||
| log_record_json | ||
| } | ||
|
|
||
|
|
@@ -117,18 +115,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> { | |
| fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> { | ||
| let mut vec_scope_log_json = Vec::new(); | ||
| let mut scope_log_json = Map::new(); | ||
| let mut other_attributes = Map::new(); | ||
| if let Some(scope) = &scope_log.scope { | ||
| scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); | ||
| scope_log_json.insert( | ||
| "scope_version".to_string(), | ||
| Value::String(scope.version.clone()), | ||
| ); | ||
| insert_attributes( | ||
| &mut scope_log_json, | ||
| &scope.attributes, | ||
| &mut other_attributes, | ||
| ); | ||
| insert_attributes(&mut scope_log_json, &scope.attributes); | ||
| scope_log_json.insert( | ||
| "scope_dropped_attributes_count".to_string(), | ||
| Value::Number(scope.dropped_attributes_count.into()), | ||
|
|
@@ -146,26 +139,17 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> { | |
| vec_scope_log_json.push(combined_json); | ||
| } | ||
|
|
||
| // Add the `other_attributes` to the scope log json | ||
| merge_attributes_in_json(other_attributes, &mut vec_scope_log_json); | ||
|
|
||
| vec_scope_log_json | ||
| } | ||
|
|
||
| /// this function performs the custom flattening of the otel logs | ||
| /// and returns a `Vec` of `Value::Object` of the flattened json | ||
| pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> { | ||
| let mut vec_otel_json = Vec::new(); | ||
|
|
||
| for record in &message.resource_logs { | ||
| let mut resource_log_json = Map::new(); | ||
| let mut other_attributes = Map::new(); | ||
| if let Some(resource) = &record.resource { | ||
| insert_attributes( | ||
| &mut resource_log_json, | ||
| &resource.attributes, | ||
| &mut other_attributes, | ||
| ); | ||
| insert_attributes(&mut resource_log_json, &resource.attributes); | ||
| resource_log_json.insert( | ||
| "resource_dropped_attributes_count".to_string(), | ||
| Value::Number(resource.dropped_attributes_count.into()), | ||
|
|
@@ -176,19 +160,18 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> { | |
| for scope_log in &record.scope_logs { | ||
| vec_resource_logs_json.extend(flatten_scope_log(scope_log)); | ||
| } | ||
|
|
||
| resource_log_json.insert( | ||
| "schema_url".to_string(), | ||
| Value::String(record.schema_url.clone()), | ||
| ); | ||
|
|
||
| for resource_logs_json in &mut vec_resource_logs_json { | ||
| resource_logs_json.extend(resource_log_json.clone()); | ||
| } | ||
|
|
||
| // Add the `other_attributes` to the resource log json | ||
| merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json); | ||
|
|
||
| vec_otel_json.extend(vec_resource_logs_json); | ||
| vec_otel_json.push(Value::Object(resource_logs_json.clone())); | ||
| } | ||
| } | ||
| vec_otel_json.into_iter().map(Value::Object).collect() | ||
|
|
||
| vec_otel_json | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.