diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 7e910805e..6d1cf3419 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -30,7 +30,10 @@ use std::{collections::HashMap, sync::Arc}; use tracing::error; use super::{EventFormat, Metadata, Tags}; -use crate::utils::{arrow::get_field, json::flatten_json_body}; +use crate::{ + metadata::SchemaVersion, + utils::{arrow::get_field, json::flatten_json_body}, +}; pub struct Event { pub data: Value, @@ -48,8 +51,9 @@ impl EventFormat for Event { schema: &HashMap>, static_schema_flag: Option<&String>, time_partition: Option<&String>, + schema_version: SchemaVersion, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(&self.data, None, None, None, false)?; + let data = flatten_json_body(self.data, None, None, None, schema_version, false)?; let stream_schema = schema; // incoming event may be a single json or a json array @@ -68,43 +72,38 @@ impl EventFormat for Event { let mut is_first = false; let schema = match derive_arrow_schema(stream_schema, fields) { Ok(schema) => schema, - Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { - Ok(mut infer_schema) => { - let new_infer_schema = super::super::format::update_field_type_in_schema( - Arc::new(infer_schema), - Some(stream_schema), - time_partition, - Some(&value_arr), - ); - infer_schema = Schema::new(new_infer_schema.fields().clone()); - if let Err(err) = Schema::try_merge(vec![ - Schema::new(stream_schema.values().cloned().collect::()), - infer_schema.clone(), - ]) { - return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err)); - } - is_first = true; - infer_schema - .fields - .iter() - .filter(|field| !field.data_type().is_null()) - .cloned() - .sorted_by(|a, b| a.name().cmp(b.name())) - .collect() - } - Err(err) => { - return Err(anyhow!( - "Could not infer schema for this event due to err {:?}", - err - )) - } - }, + Err(_) => { + let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)) + .map_err(|err| { + anyhow!("Could not infer schema for this event due to err {:?}", err) + })?; + let new_infer_schema = super::update_field_type_in_schema( + Arc::new(infer_schema), + Some(stream_schema), + time_partition, + Some(&value_arr), + schema_version, + ); + infer_schema = Schema::new(new_infer_schema.fields().clone()); + Schema::try_merge(vec![ + Schema::new(stream_schema.values().cloned().collect::()), + infer_schema.clone(), + ]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?; + is_first = true; + infer_schema + .fields + .iter() + .filter(|field| !field.data_type().is_null()) + .cloned() + .sorted_by(|a, b| a.name().cmp(b.name())) + .collect() + } }; if static_schema_flag.is_none() && value_arr .iter() - .any(|value| fields_mismatch(&schema, value)) + .any(|value| fields_mismatch(&schema, value, schema_version)) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" @@ -165,7 +164,7 @@ fn collect_keys<'a>(values: impl Iterator) -> Result], body: &Value) -> bool { +fn fields_mismatch(schema: &[Arc], body: &Value, schema_version: SchemaVersion) -> bool { for (name, val) in body.as_object().expect("body is of object variant") { if val.is_null() { continue; @@ -173,19 +172,22 @@ fn fields_mismatch(schema: &[Arc], body: &Value) -> bool { let Some(field) = get_field(schema, name) else { return true; }; - if !valid_type(field.data_type(), val) { + if !valid_type(field.data_type(), val, schema_version) { return true; } } false } -fn valid_type(data_type: &DataType, value: &Value) -> bool { +fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool { match data_type { DataType::Boolean => value.is_boolean(), DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), - DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), + DataType::Float16 | DataType::Float32 => value.is_f64(), + // All numbers can be cast as Float64 from schema version v1 + DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(), + DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(), DataType::Utf8 => value.is_string(), DataType::List(field) => { let data_type = field.data_type(); @@ -194,7 +196,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { if elem.is_null() { continue; } - if !valid_type(data_type, elem) { + if !valid_type(data_type, elem, schema_version) { return false; } } @@ -212,7 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { if value.is_null() { continue; } - if !valid_type(field.data_type(), value) { + if !valid_type(field.data_type(), value, schema_version) { return false; } } else { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 029d218ea..b3cb8e4dd 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -17,7 +17,10 @@ * */ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use anyhow::{anyhow, Error as AnyError}; use arrow_array::{RecordBatch, StringArray}; @@ -25,12 +28,17 @@ use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; use serde_json::Value; -use crate::utils::{self, arrow::get_field}; +use crate::{ + metadata::SchemaVersion, + utils::{self, arrow::get_field}, +}; use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; pub mod json; +static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"]; + type Tags = String; type Metadata = String; type EventSchema = Vec>; @@ -45,6 +53,7 @@ pub trait EventFormat: Sized { schema: &HashMap>, static_schema_flag: Option<&String>, time_partition: Option<&String>, + schema_version: SchemaVersion, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; @@ -54,10 +63,16 @@ pub trait EventFormat: Sized { storage_schema: &HashMap>, static_schema_flag: Option<&String>, time_partition: Option<&String>, + schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first, tags, metadata) = - self.to_data(storage_schema, static_schema_flag, time_partition)?; - + let (data, mut schema, is_first, tags, metadata) = self.to_data( + storage_schema, + static_schema_flag, + time_partition, + schema_version, + )?; + + // DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); }; @@ -101,7 +116,8 @@ pub trait EventFormat: Sized { if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = update_field_type_in_schema(new_schema, None, time_partition, None); + new_schema = + update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); let rb = Self::decode(data, new_schema.clone())?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); let metadata_arr = @@ -125,20 +141,14 @@ pub trait EventFormat: Sized { if static_schema_flag.is_none() { return true; } - for (field_name, field) in new_schema - .fields() - .iter() - .map(|field| (field.name().to_owned(), field.clone())) - .collect::>>() - { - if let Some(storage_field) = storage_schema.get(&field_name) { - if field_name != *storage_field.name() { - return false; - } - if field.data_type() != storage_field.data_type() { - return false; - } - } else { + for field in new_schema.fields() { + let Some(storage_field) = storage_schema.get(field.name()) else { + return false; + }; + if field.name() != storage_field.name() { + return false; + } + if field.data_type() != storage_field.data_type() { return false; } } @@ -146,49 +156,43 @@ pub trait EventFormat: Sized { } } -pub fn get_existing_fields( +pub fn get_existing_field_names( inferred_schema: Arc, existing_schema: Option<&HashMap>>, -) -> Vec> { - let mut existing_fields = Vec::new(); +) -> HashSet { + let mut existing_field_names = HashSet::new(); + let Some(existing_schema) = existing_schema else { + return existing_field_names; + }; for field in inferred_schema.fields.iter() { - if existing_schema.map_or(false, |schema| schema.contains_key(field.name())) { - existing_fields.push(field.clone()); + if existing_schema.contains_key(field.name()) { + existing_field_names.insert(field.name().to_owned()); } } - existing_fields + existing_field_names } -pub fn get_existing_timestamp_fields( +pub fn override_existing_timestamp_fields( existing_schema: &HashMap>, -) -> Vec> { - let mut timestamp_fields = Vec::new(); - - for field in existing_schema.values() { - if let DataType::Timestamp(TimeUnit::Millisecond, None) = field.data_type() { - timestamp_fields.push(field.clone()); - } - } - - timestamp_fields -} - -pub fn override_timestamp_fields( inferred_schema: Arc, - existing_timestamp_fields: &[Arc], ) -> Arc { - let timestamp_field_names: Vec<&str> = existing_timestamp_fields - .iter() - .map(|field| field.name().as_str()) + let timestamp_field_names: HashSet = existing_schema + .values() + .filter_map(|field| { + if let DataType::Timestamp(TimeUnit::Millisecond, None) = field.data_type() { + Some(field.name().to_owned()) + } else { + None + } + }) .collect(); - let updated_fields: Vec> = inferred_schema .fields() .iter() .map(|field| { - if timestamp_field_names.contains(&field.name().as_str()) { + if timestamp_field_names.contains(field.name()) { Arc::new(Field::new( field.name(), DataType::Timestamp(TimeUnit::Millisecond, None), @@ -208,45 +212,38 @@ pub fn update_field_type_in_schema( existing_schema: Option<&HashMap>>, time_partition: Option<&String>, log_records: Option<&Vec>, + schema_version: SchemaVersion, ) -> Arc { let mut updated_schema = inferred_schema.clone(); + let existing_field_names = get_existing_field_names(inferred_schema.clone(), existing_schema); if let Some(existing_schema) = existing_schema { - let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); - let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); // overriding known timestamp fields which were inferred as string fields - updated_schema = override_timestamp_fields(updated_schema, &existing_timestamp_fields); - let existing_field_names: Vec = existing_fields - .iter() - .map(|field| field.name().clone()) - .collect(); + updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema); + } - if let Some(log_records) = log_records { - for log_record in log_records { - updated_schema = Arc::new(update_data_type_to_datetime( - (*updated_schema).clone(), - log_record.clone(), - existing_field_names.clone(), - )); - } + if let Some(log_records) = log_records { + for log_record in log_records { + updated_schema = + override_data_type(updated_schema.clone(), log_record.clone(), schema_version); } } - if time_partition.is_none() { + let Some(time_partition) = time_partition else { return updated_schema; - } + }; let new_schema: Vec = updated_schema .fields() .iter() .map(|field| { - if field.name() == time_partition.unwrap() { - if field.data_type() == &DataType::Utf8 { - let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); - Field::new(field.name().clone(), new_data_type, true) - } else { - Field::new(field.name(), field.data_type().clone(), true) - } + // time_partition field not present in existing schema with string type data as timestamp + if field.name() == time_partition + && !existing_field_names.contains(field.name()) + && field.data_type() == &DataType::Utf8 + { + let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); + Field::new(field.name(), new_data_type, true) } else { Field::new(field.name(), field.data_type().clone(), true) } @@ -255,33 +252,50 @@ pub fn update_field_type_in_schema( Arc::new(Schema::new(new_schema)) } -pub fn update_data_type_to_datetime( - schema: Schema, - value: Value, - ignore_field_names: Vec, -) -> Schema { - let new_schema: Vec = schema +// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having +// a string value parseable into timestamp as timestamp type and all numbers as float64. +pub fn override_data_type( + inferred_schema: Arc, + log_record: Value, + schema_version: SchemaVersion, +) -> Arc { + let Value::Object(map) = log_record else { + return inferred_schema; + }; + let updated_schema: Vec = inferred_schema .fields() .iter() .map(|field| { - if field.data_type() == &DataType::Utf8 && !ignore_field_names.contains(field.name()) { - if let Value::Object(map) = &value { - if let Some(Value::String(s)) = map.get(field.name()) { - if DateTime::parse_from_rfc3339(s).is_ok() { - // Update the field's data type to Timestamp - return Field::new( - field.name().clone(), - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - ); - } - } + let field_name = field.name().as_str(); + match (schema_version, map.get(field.name())) { + // in V1 for new fields in json named "time"/"date" or such and having inferred + // type string, that can be parsed as timestamp, use the timestamp type. + // NOTE: support even more datetime string formats + (SchemaVersion::V1, Some(Value::String(s))) + if TIME_FIELD_NAME_PARTS + .iter() + .any(|part| field_name.to_lowercase().contains(part)) + && field.data_type() == &DataType::Utf8 + && (DateTime::parse_from_rfc3339(s).is_ok() + || DateTime::parse_from_rfc2822(s).is_ok()) => + { + // Update the field's data type to Timestamp + Field::new( + field_name, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ) + } + // in V1 for new fields in json with inferred type number, cast as float64. + (SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => { + // Update the field's data type to Float64 + Field::new(field_name, DataType::Float64, true) } + // Return the original field if no update is needed + _ => Field::new(field_name, field.data_type().clone(), true), } - // Return the original field if no update is needed - Field::new(field.name(), field.data_type().clone(), true) }) .collect(); - Schema::new(new_schema) + Arc::new(Schema::new(updated_schema)) } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 900219a01..ad6fd0089 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -28,7 +28,7 @@ use crate::event::{ use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; use crate::handlers::STREAM_NAME_HEADER_KEY; use crate::metadata::error::stream_info::MetadataError; -use crate::metadata::STREAM_INFO; +use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; @@ -84,7 +84,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< tags: String::default(), metadata: String::default(), }; - event.into_recordbatch(&schema, None, None)? + // For internal streams, use old schema + event.into_recordbatch(&schema, None, None, SchemaVersion::V0)? }; event::Event { rb, @@ -273,32 +274,34 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; - use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray}; + use arrow::datatypes::Int64Type; + use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; use crate::{ event, handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, + metadata::SchemaVersion, }; trait TestExt { - fn as_int64_arr(&self) -> &Int64Array; - fn as_float64_arr(&self) -> &Float64Array; - fn as_utf8_arr(&self) -> &StringArray; + fn as_int64_arr(&self) -> Option<&Int64Array>; + fn as_float64_arr(&self) -> Option<&Float64Array>; + fn as_utf8_arr(&self) -> Option<&StringArray>; } impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> &Int64Array { - self.as_any().downcast_ref().unwrap() + fn as_int64_arr(&self) -> Option<&Int64Array> { + self.as_any().downcast_ref() } - fn as_float64_arr(&self) -> &Float64Array { - self.as_any().downcast_ref().unwrap() + fn as_float64_arr(&self) -> Option<&Float64Array> { + self.as_any().downcast_ref() } - fn as_utf8_arr(&self) -> &StringArray { - self.as_any().downcast_ref().unwrap() + fn as_utf8_arr(&self) -> Option<&StringArray> { + self.as_any().downcast_ref() } } @@ -319,32 +322,42 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from_iter_values(["hello"]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), &Float64Array::from_iter([4.23]) ); assert_eq!( rb.column_by_name(event::DEFAULT_TAGS_KEY) .unwrap() - .as_utf8_arr(), + .as_utf8_arr() + .unwrap(), &StringArray::from_iter_values(["a=tag1"]) ); assert_eq!( rb.column_by_name(event::DEFAULT_METADATA_KEY) .unwrap() - .as_utf8_arr(), + .as_utf8_arr() + .unwrap(), &StringArray::from_iter_values(["c=meta1"]) ); } @@ -359,16 +372,24 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from_iter_values(["hello"]) ); } @@ -391,16 +412,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap(); + let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from_iter([1]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from_iter_values(["hello"]) ); } @@ -423,7 +444,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, schema, None, None).is_err()); + assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); } #[test] @@ -441,7 +462,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap(); + let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -453,7 +474,15 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, HashMap::default(), None, None).is_err()) + assert!(into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0 + ) + .is_err()) } #[test] @@ -476,7 +505,15 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -489,15 +526,15 @@ mod tests { assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr(), + rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), None]) ); } @@ -524,20 +561,28 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), &Float64Array::from(vec![None, Some(1.22), None,]) ); } @@ -572,20 +617,20 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap(); + let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), &Float64Array::from(vec![None, Some(1.22), None,]) ); } @@ -621,7 +666,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&req, &json, schema, None, None).is_err()); + assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); } #[test] @@ -649,16 +694,24 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), &StringArray::from(vec![ Some("hello"), Some("hello"), @@ -668,13 +721,93 @@ mod tests { ); assert_eq!( - rb.column_by_name("c_a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, Some(1), Some(1)]) + rb.column_by_name("c_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + Some(vec![Some(1i64)]), + Some(vec![Some(1)]) + ]) + ); + + assert_eq!( + rb.column_by_name("c_b") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + None, + Some(vec![Some(2i64)]) + ]) + ); + } + + #[test] + fn arr_obj_with_nested_type_v1() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1}] + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1, "b": 2}] + }, + ]); + + let req = TestRequest::default().to_http_request(); + + let (rb, _) = into_event_batch( + &req, + &json, + HashMap::default(), + None, + None, + SchemaVersion::V1, + ) + .unwrap(); + + assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_columns(), 7); + assert_eq!( + rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![ + Some("hello"), + Some("hello"), + Some("hello"), + Some("hello") + ]) + ); + + assert_eq!( + rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) ); assert_eq!( - rb.column_by_name("c_b").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, None, Some(2)]) + rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, None, None, Some(2.0)]) ); } } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index dad5df00e..ef7386eea 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -26,10 +26,10 @@ use super::modal::utils::logstream_utils::{ use super::query::update_schema_when_distributed; use crate::alerts::Alerts; use crate::catalog::get_first_event; -use crate::event::format::update_data_type_to_datetime; +use crate::event::format::override_data_type; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; -use crate::metadata::STREAM_INFO; +use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::{Mode, CONFIG}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; @@ -111,9 +111,9 @@ pub async fn detect_schema(body: Bytes) -> Result { } }; - let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap(); + let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap()); for log_record in log_records { - schema = update_data_type_to_datetime(schema, log_record, Vec::new()); + schema = override_data_type(schema, log_record, SchemaVersion::V1); } Ok((web::Json(schema), StatusCode::OK)) } @@ -517,6 +517,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + SchemaVersion::V1, // New stream ); } Err(err) => { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 6a2424aeb..69d22c2e5 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -19,21 +19,23 @@ use std::{collections::HashMap, sync::Arc}; use actix_web::HttpRequest; +use anyhow::anyhow; use arrow_schema::Field; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; +use itertools::Itertools; use serde_json::Value; use crate::{ event::{ - self, format::{self, EventFormat}, + Event, }, handlers::{ http::{ingest::PostError, kinesis}, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, - metadata::STREAM_INFO, + metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object}, }; @@ -69,153 +71,70 @@ pub async fn push_logs( let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; + let schema_version = STREAM_INFO.get_schema_version(stream_name)?; let body_val: Value = serde_json::from_slice(body)?; - let size: usize = body.len(); - let mut parsed_timestamp = Utc::now().naive_utc(); - if time_partition.is_none() { - if custom_partition.is_none() { - let size = size as u64; - create_process_record_batch( - stream_name, - req, - body_val, - static_schema_flag.as_ref(), - None, - parsed_timestamp, - &HashMap::new(), - size, - ) - .await?; - } else { - let data = convert_array_to_object(&body_val, None, None, custom_partition.as_ref())?; - let custom_partition = custom_partition.unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - - for value in data { - let custom_partition_values = - get_custom_partition_values(&value, &custom_partition_list); - - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name, - req, - value, - static_schema_flag.as_ref(), - None, - parsed_timestamp, - &custom_partition_values, - size, - ) - .await?; + let data = convert_array_to_object( + body_val, + time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), + schema_version, + )?; + + for value in data { + let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length + let parsed_timestamp = match time_partition.as_ref() { + Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, + _ => Utc::now().naive_utc(), + }; + let custom_partition_values = match custom_partition.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + get_custom_partition_values(&value, &custom_partitions) } - } - } else if custom_partition.is_none() { - let data = convert_array_to_object( - &body_val, - time_partition.as_ref(), - time_partition_limit, - None, - )?; - for value in data { - parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref()); - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name, - req, - value, - static_schema_flag.as_ref(), - time_partition.as_ref(), - parsed_timestamp, - &HashMap::new(), - size, - ) - .await?; - } - } else { - let data = convert_array_to_object( - &body_val, + None => HashMap::new(), + }; + let schema = STREAM_INFO + .read() + .unwrap() + .get(stream_name) + .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? + .schema + .clone(); + let (rb, is_first_event) = into_event_batch( + req, + &value, + schema, + static_schema_flag.as_ref(), time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), + schema_version, )?; - let custom_partition = custom_partition.unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - for value in data { - let custom_partition_values = - get_custom_partition_values(&value, &custom_partition_list); - - parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref()); - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name, - req, - value, - static_schema_flag.as_ref(), - time_partition.as_ref(), - parsed_timestamp, - &custom_partition_values, - size, - ) - .await?; + Event { + rb, + stream_name: stream_name.to_owned(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values, + stream_type: StreamType::UserDefined, } + .process() + .await?; } Ok(()) } -#[allow(clippy::too_many_arguments)] -pub async fn create_process_record_batch( - stream_name: &str, - req: &HttpRequest, - value: Value, - static_schema_flag: Option<&String>, - time_partition: Option<&String>, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - origin_size: u64, -) -> Result<(), PostError> { - let (rb, is_first_event) = - get_stream_schema(stream_name, req, &value, static_schema_flag, time_partition)?; - event::Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.cloned(), - custom_partition_values: custom_partition_values.clone(), - stream_type: StreamType::UserDefined, - } - .process() - .await?; - - Ok(()) -} - -pub fn get_stream_schema( - stream_name: &str, - req: &HttpRequest, - body: &Value, - static_schema_flag: Option<&String>, - time_partition: Option<&String>, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(stream_name) - .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? - .schema - .clone(); - into_event_batch(req, body, schema, static_schema_flag, time_partition) -} - pub fn into_event_batch( req: &HttpRequest, body: &Value, schema: HashMap>, static_schema_flag: Option<&String>, time_partition: Option<&String>, + schema_version: SchemaVersion, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; @@ -224,7 +143,8 @@ pub fn into_event_batch( tags, metadata, }; - let (rb, is_first) = event.into_recordbatch(&schema, static_schema_flag, time_partition)?; + let (rb, is_first) = + event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; Ok((rb, is_first)) } @@ -248,15 +168,48 @@ pub fn get_custom_partition_values( custom_partition_values } -pub fn get_parsed_timestamp(body: &Value, time_partition: Option<&String>) -> NaiveDateTime { - let body_timestamp = body.get(time_partition.unwrap()); - let parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); - parsed_timestamp +fn get_parsed_timestamp(body: &Value, time_partition: &str) -> Result { + let current_time = body.get(time_partition).ok_or_else(|| { + anyhow!( + "Missing field for time partition from json: {:?}", + time_partition + ) + })?; + let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; + + Ok(parsed_time.naive_utc()) +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use serde_json::json; + + use super::*; + + #[test] + fn parse_time_parition_from_value() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); + assert_eq!(parsed.unwrap(), expected); + } + + #[test] + fn time_parition_not_in_json() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::Invalid(_))); + } + + #[test] + fn time_parition_not_parseable_as_datetime() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::SerdeError(_))); + } } diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 1c9583fc5..d3fdd46eb 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -29,7 +29,7 @@ use crate::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, - metadata::{self, STREAM_INFO}, + metadata::{self, SchemaVersion, STREAM_INFO}, option::{Mode, CONFIG}, static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, storage::{LogStream, ObjectStoreFormat, StreamType}, @@ -426,6 +426,7 @@ pub async fn create_stream( static_schema_flag.to_string(), static_schema, stream_type, + SchemaVersion::V1, // New stream ); } Err(err) => { @@ -474,6 +475,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); + let schema_version = stream_metadata.schema_version; metadata::STREAM_INFO.add_stream( stream_name.to_string(), @@ -484,6 +486,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< static_schema_flag.to_string(), static_schema, stream_type, + schema_version, ); } else { return Ok(false); diff --git a/src/kafka.rs b/src/kafka.rs index e6e0c8d1c..f65b954c6 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -226,12 +226,14 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + let schema_version = STREAM_INFO.get_schema_version(stream_name)?; let (rb, is_first) = event .into_recordbatch( &schema, static_schema_flag.as_ref(), time_partition.as_ref(), + schema_version, ) .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; diff --git a/src/metadata.rs b/src/metadata.rs index 5a5ff9bb1..4657c8aa5 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -21,6 +21,7 @@ use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use std::num::NonZeroU32; @@ -28,13 +29,14 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; +use crate::catalog::snapshot::ManifestItem; use crate::metrics::{ fetch_stats_from_storage, EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; use crate::storage::retention::Retention; -use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; +use crate::storage::{ObjectStorage, ObjectStoreFormat, StorageDir, StreamType}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -45,8 +47,23 @@ pub static STREAM_INFO: Lazy = Lazy::new(StreamInfo::default); #[derive(Debug, Deref, DerefMut, Default)] pub struct StreamInfo(RwLock>); +/// In order to support backward compatability with streams created before v1.6.4, +/// we will consider past versions of stream schema to be v0. Streams created with +/// v1.6.4+ will be v1. +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +#[non_exhaustive] +#[serde(rename_all = "lowercase")] +pub enum SchemaVersion { + #[default] + V0, + /// Applies generic JSON flattening, ignores null data, handles all numbers as + /// float64 and uses the timestamp type to store compatible time information. + V1, +} + #[derive(Debug, Default)] pub struct LogStreamMetadata { + pub schema_version: SchemaVersion, pub schema: HashMap>, pub alerts: Alerts, pub retention: Option, @@ -145,6 +162,13 @@ impl StreamInfo { .map(|metadata| metadata.retention.clone()) } + pub fn get_schema_version(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.schema_version) + } + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schema = map @@ -250,6 +274,7 @@ impl StreamInfo { static_schema_flag: String, static_schema: HashMap>, stream_type: &str, + schema_version: SchemaVersion, ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -280,6 +305,7 @@ impl StreamInfo { static_schema }, stream_type: Some(stream_type.to_string()), + schema_version, ..Default::default() }; map.insert(stream_name, metadata); @@ -290,48 +316,6 @@ impl StreamInfo { map.remove(stream_name); } - #[allow(dead_code)] - pub async fn upsert_stream_info( - &self, - storage: &(impl ObjectStorage + ?Sized), - stream: LogStream, - ) -> Result<(), LoadError> { - let alerts = storage.get_alerts(&stream.name).await?; - - let schema = storage.upsert_schema_to_storage(&stream.name).await?; - let meta = storage.upsert_stream_metadata(&stream.name).await?; - let retention = meta.retention; - let schema = update_schema_from_staging(&stream.name, schema); - let schema = HashMap::from_iter( - schema - .fields - .iter() - .map(|v| (v.name().to_owned(), v.clone())), - ); - - let metadata = LogStreamMetadata { - schema, - alerts, - retention, - cache_enabled: meta.cache_enabled, - created_at: meta.created_at, - first_event_at: meta.first_event_at, - time_partition: meta.time_partition, - time_partition_limit: meta - .time_partition_limit - .and_then(|limit| limit.parse().ok()), - custom_partition: meta.custom_partition, - static_schema_flag: meta.static_schema_flag, - hot_tier_enabled: meta.hot_tier_enabled, - stream_type: meta.stream_type, - }; - - let mut map = self.write().expect(LOCK_EXPECT); - - map.insert(stream.name, metadata); - Ok(()) - } - pub fn list_streams(&self) -> Vec { self.read() .expect(LOCK_EXPECT) @@ -408,18 +392,17 @@ pub async fn update_data_type_time_partition( storage: &(impl ObjectStorage + ?Sized), stream_name: &str, schema: Schema, - meta: ObjectStoreFormat, + time_partition: Option<&String>, ) -> anyhow::Result { let mut schema = schema.clone(); - if meta.time_partition.is_some() { - let time_partition = meta.time_partition.unwrap(); - if let Ok(time_partition_field) = schema.field_with_name(&time_partition) { + if let Some(time_partition) = time_partition { + if let Ok(time_partition_field) = schema.field_with_name(time_partition) { if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) { let mut fields = schema .fields() .iter() - .filter(|field| *field.name() != time_partition) + .filter(|field| field.name() != time_partition) .cloned() .collect::>>(); let time_partition_field = Arc::new(Field::new( @@ -442,18 +425,33 @@ pub async fn load_stream_metadata_on_server_start( schema: Schema, stream_metadata_value: Value, ) -> Result<(), LoadError> { - let mut meta: ObjectStoreFormat = ObjectStoreFormat::default(); - if !stream_metadata_value.is_null() { - meta = - serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap(); - } + let ObjectStoreFormat { + schema_version, + created_at, + first_event_at, + retention, + cache_enabled, + snapshot, + stats, + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + hot_tier_enabled, + stream_type, + .. + } = if !stream_metadata_value.is_null() { + serde_json::from_slice(&serde_json::to_vec(&stream_metadata_value).unwrap()).unwrap() + } else { + ObjectStoreFormat::default() + }; let schema = - update_data_type_time_partition(storage, stream_name, schema, meta.clone()).await?; + update_data_type_time_partition(storage, stream_name, schema, time_partition.as_ref()) + .await?; storage.put_schema(stream_name, &schema).await?; //load stats from storage - let stats = meta.stats; fetch_stats_from_storage(stream_name, stats).await; - load_daily_metrics(&meta, stream_name); + load_daily_metrics(&snapshot.manifest_list, stream_name); let alerts = storage.get_alerts(stream_name).await?; let schema = update_schema_from_staging(stream_name, schema); @@ -465,20 +463,19 @@ pub async fn load_stream_metadata_on_server_start( ); let metadata = LogStreamMetadata { + schema_version, schema, alerts, - retention: meta.retention, - cache_enabled: meta.cache_enabled, - created_at: meta.created_at, - first_event_at: meta.first_event_at, - time_partition: meta.time_partition, - time_partition_limit: meta - .time_partition_limit - .and_then(|limit| limit.parse().ok()), - custom_partition: meta.custom_partition, - static_schema_flag: meta.static_schema_flag, - hot_tier_enabled: meta.hot_tier_enabled, - stream_type: meta.stream_type, + retention, + cache_enabled, + created_at, + first_event_at, + time_partition, + time_partition_limit: time_partition_limit.and_then(|limit| limit.parse().ok()), + custom_partition, + static_schema_flag, + hot_tier_enabled, + stream_type, }; let mut map = STREAM_INFO.write().expect(LOCK_EXPECT); @@ -488,8 +485,7 @@ pub async fn load_stream_metadata_on_server_start( Ok(()) } -fn load_daily_metrics(meta: &ObjectStoreFormat, stream_name: &str) { - let manifests = &meta.snapshot.manifest_list; +fn load_daily_metrics(manifests: &Vec, stream_name: &str) { for manifest in manifests { let manifest_date = manifest.time_lower_bound.date_naive().to_string(); let events_ingested = manifest.events_ingested; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 9b4c6163b..a43cbc780 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -17,7 +17,9 @@ */ use crate::{ - catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::FullStats, + catalog::snapshot::Snapshot, + metadata::{error::stream_info::MetadataError, SchemaVersion}, + stats::FullStats, }; use chrono::Local; @@ -78,6 +80,9 @@ pub const CURRENT_SCHEMA_VERSION: &str = "v5"; pub struct ObjectStoreFormat { /// Version of schema registry pub version: String, + /// Version of schema, defaults to v0 if not set + #[serde(default)] + pub schema_version: SchemaVersion, /// Version for change in the way how parquet are generated/stored. #[serde(rename = "objectstore-format")] pub objectstore_format: String, @@ -177,6 +182,7 @@ impl Default for ObjectStoreFormat { fn default() -> Self { Self { version: CURRENT_SCHEMA_VERSION.to_string(), + schema_version: SchemaVersion::V1, // Newly created streams should be v1 objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), stream_type: Some(StreamType::UserDefined.to_string()), created_at: Local::now().to_rfc3339(), @@ -196,13 +202,6 @@ impl Default for ObjectStoreFormat { } } -impl ObjectStoreFormat { - fn set_id(&mut self, id: String) { - self.owner.id.clone_from(&id); - self.owner.group = id; - } -} - #[derive(serde::Serialize, PartialEq)] pub struct LogStream { pub name: String, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b5b1e74f5..272ab47bb 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,13 +21,15 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, - PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + Owner, ALERT_FILE_NAME, CORRELATIONS_ROOT_DIRECTORY, MANIFEST_FILE, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; use crate::{ @@ -153,28 +155,22 @@ pub trait ObjectStorage: Send + Sync + 'static { schema: Arc, stream_type: &str, ) -> Result { - let mut format = ObjectStoreFormat::default(); - format.set_id(CONFIG.parseable.username.clone()); - let permission = Permisssion::new(CONFIG.parseable.username.clone()); - format.permissions = vec![permission]; - format.created_at = Local::now().to_rfc3339(); - format.stream_type = Some(stream_type.to_string()); - if time_partition.is_empty() { - format.time_partition = None; - } else { - format.time_partition = Some(time_partition.to_string()); - } - format.time_partition_limit = time_partition_limit.map(|limit| limit.to_string()); - if custom_partition.is_empty() { - format.custom_partition = None; - } else { - format.custom_partition = Some(custom_partition.to_string()); - } - if static_schema_flag != "true" { - format.static_schema_flag = None; - } else { - format.static_schema_flag = Some(static_schema_flag.to_string()); - } + let format = ObjectStoreFormat { + created_at: Local::now().to_rfc3339(), + permissions: vec![Permisssion::new(CONFIG.parseable.username.clone())], + stream_type: Some(stream_type.to_string()), + time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), + time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), + custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()), + static_schema_flag: (static_schema_flag == "true") + .then(|| static_schema_flag.to_string()), + schema_version: SchemaVersion::V1, // NOTE: Newly created streams are all V1 + owner: Owner { + id: CONFIG.parseable.username.clone(), + group: CONFIG.parseable.username.clone(), + }, + ..Default::default() + }; let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&schema)) .await?; diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 298954972..afd17ace6 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -280,15 +280,15 @@ pub fn flatten_array_objects( /// 2. `[{"a": 1}, {"b": 2}]` ~> `[{"a": 1}, {"b": 2}]` /// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]` /// 3. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]` -pub fn flatten_json(value: &Value) -> Vec { +fn flattening_helper(value: &Value) -> Vec { match value { - Value::Array(arr) => arr.iter().flat_map(flatten_json).collect(), + Value::Array(arr) => arr.iter().flat_map(flattening_helper).collect(), Value::Object(map) => map .iter() .fold(vec![Map::new()], |results, (key, val)| match val { Value::Array(arr) => arr .iter() - .flat_map(flatten_json) + .flat_map(flattening_helper) .flat_map(|flattened_item| { results.iter().map(move |result| { let mut new_obj = result.clone(); @@ -297,7 +297,7 @@ pub fn flatten_json(value: &Value) -> Vec { }) }) .collect(), - Value::Object(_) => flatten_json(val) + Value::Object(_) => flattening_helper(val) .iter() .flat_map(|nested_result| { results.iter().map(move |result| { @@ -323,9 +323,9 @@ pub fn flatten_json(value: &Value) -> Vec { } // Converts a Vector of values into a `Value::Array`, as long as all of them are objects -pub fn convert_to_array(flattened: Vec) -> Result { - let mut result = Vec::new(); - for item in flattened { +pub fn generic_flattening(json: Value) -> Result { + let mut flattened = Vec::new(); + for item in flattening_helper(&json) { let mut map = Map::new(); let Some(item) = item.as_object() else { return Err(JsonFlattenError::ExpectedObjectInArray); @@ -333,9 +333,10 @@ pub fn convert_to_array(flattened: Vec) -> Result, time_partition_limit: Option, custom_partition: Option<&String>, + schema_version: SchemaVersion, validation_required: bool, ) -> Result { - let mut nested_value = flatten::convert_to_array(flatten::flatten_json(body))?; + let mut nested_value = if schema_version == SchemaVersion::V1 { + flatten::generic_flattening(body)? + } else { + body + }; flatten::flatten( &mut nested_value, @@ -45,16 +52,18 @@ pub fn flatten_json_body( } pub fn convert_array_to_object( - body: &Value, + body: Value, time_partition: Option<&String>, time_partition_limit: Option, custom_partition: Option<&String>, + schema_version: SchemaVersion, ) -> Result, anyhow::Error> { let data = flatten_json_body( body, time_partition, time_partition_limit, custom_partition, + schema_version, true, )?; let value_arr = match data {