Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1963e11
feat: store `schema_version` to track feature support
Dec 27, 2024
acc6412
feat: perform generic flattening only if v1
Dec 26, 2024
b5ad7d3
fix: byte size calculation
Dec 27, 2024
9e773a9
don't clone
Dec 27, 2024
679d781
doc: explain purpose of schema versioning
Dec 28, 2024
7e6cb22
Merge branch 'main' into conditional
Dec 31, 2024
e58cf12
test: make clear what is breaking
Jan 1, 2025
4cd3fb8
test: revert to dd2b8b2
Jan 1, 2025
1a6c84d
fix and test timepartitioning in json
Jan 1, 2025
da98205
refactor: use `PostError`
Jan 1, 2025
46c2684
refactor: flatten out function
Jan 1, 2025
b374730
refactor: for readability
Jan 1, 2025
02c56ad
support RFC2822 timestamps
Jan 1, 2025
c6be638
field name should have date/time in it
Jan 1, 2025
c15acb6
refactor: iterating
Jan 1, 2025
8d202e1
feat: v1 timestamp and numbers as float64
Jan 1, 2025
ac338d9
refactor: `override_data_type`
Jan 2, 2025
970df04
fix: support casting all numbers as `Float64`
Jan 2, 2025
7102c79
fix: schema_version check before allowing all numbers as Float64
Jan 2, 2025
73b9080
test: v1 converts all numbers to Float64
Jan 2, 2025
a28f7bb
doc: improve readability of json.rs
Jan 2, 2025
203c1fc
refactor: flatten nesting
Jan 2, 2025
02fb350
fix: add schema version to LogStreamMetadata
nikhilsinhaparseable Jan 2, 2025
2de9597
fix: remove check for existing fields
nikhilsinhaparseable Jan 2, 2025
a55e14b
fix: load schema version, don't default
Jan 3, 2025
65470a3
refactor: use `..Default::default()` pattern
Jan 3, 2025
e5f025c
doc: explaing reason for using V0
Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 42 additions & 40 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use crate::utils::{arrow::get_field, json::flatten_json_body};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};

pub struct Event {
pub data: Value,
Expand All @@ -48,8 +51,9 @@ impl EventFormat for Event {
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(&self.data, None, None, None, false)?;
let data = flatten_json_body(self.data, None, None, None, schema_version, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -68,43 +72,38 @@ impl EventFormat for Event {
let mut is_first = false;
let schema = match derive_arrow_schema(stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
if let Err(err) = Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
]) {
return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err));
}
is_first = true;
infer_schema
.fields
.iter()
.filter(|field| !field.data_type().is_null())
.cloned()
.sorted_by(|a, b| a.name().cmp(b.name()))
.collect()
}
Err(err) => {
return Err(anyhow!(
"Could not infer schema for this event due to err {:?}",
err
))
}
},
Err(_) => {
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
.map_err(|err| {
anyhow!("Could not infer schema for this event due to err {:?}", err)
})?;
let new_infer_schema = super::update_field_type_in_schema(
Arc::new(infer_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
schema_version,
);
infer_schema = Schema::new(new_infer_schema.fields().clone());
Schema::try_merge(vec![
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
infer_schema.clone(),
]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?;
is_first = true;
infer_schema
.fields
.iter()
.filter(|field| !field.data_type().is_null())
.cloned()
.sorted_by(|a, b| a.name().cmp(b.name()))
.collect()
}
};

if static_schema_flag.is_none()
&& value_arr
.iter()
.any(|value| fields_mismatch(&schema, value))
.any(|value| fields_mismatch(&schema, value, schema_version))
{
return Err(anyhow!(
"Could not process this event due to mismatch in datatype"
Expand Down Expand Up @@ -165,27 +164,30 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
Ok(keys)
}

fn fields_mismatch(schema: &[Arc<Field>], body: &Value) -> bool {
fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVersion) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
if val.is_null() {
continue;
}
let Some(field) = get_field(schema, name) else {
return true;
};
if !valid_type(field.data_type(), val) {
if !valid_type(field.data_type(), val, schema_version) {
return true;
}
}
false
}

fn valid_type(data_type: &DataType, value: &Value) -> bool {
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
match data_type {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
DataType::Float16 | DataType::Float32 => value.is_f64(),
// All numbers can be cast as Float64 from schema version v1
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
Expand All @@ -194,7 +196,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
if elem.is_null() {
continue;
}
if !valid_type(data_type, elem) {
if !valid_type(data_type, elem, schema_version) {
return false;
}
}
Expand All @@ -212,7 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
if value.is_null() {
continue;
}
if !valid_type(field.data_type(), value) {
if !valid_type(field.data_type(), value, schema_version) {
return false;
}
} else {
Expand Down
Loading
Loading