diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 191396660..6c5f6e398 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -68,6 +68,7 @@ impl ParseableSinkProcessor { let (rb, is_first) = batch_json_event.into_recordbatch( &schema, + Utc::now(), static_schema_flag, time_partition.as_ref(), schema_version, diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index c0a2ec323..e7f0707e8 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -26,7 +26,7 @@ use std::{ use anyhow::{anyhow, Error as AnyError}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::DateTime; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -108,6 +108,7 @@ pub trait EventFormat: Sized { fn into_recordbatch( self, storage_schema: &HashMap>, + p_timestamp: DateTime, static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, @@ -145,7 +146,7 @@ pub trait EventFormat: Sized { rb.schema(), &rb, &[0], - &[Arc::new(get_timestamp_array(rb.num_rows()))], + &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], ); Ok((rb, is_first)) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6706470a2..0ca1f11a5 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); - let parsed_timestamp = Utc::now().naive_utc(); + let now = Utc::now(); let (rb, is_first) = { let body_val: Value = serde_json::from_slice(&body)?; let hash_map = PARSEABLE.streams.read().unwrap(); @@ -93,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< .clone(); let event = format::json::Event { data: body_val }; // For internal streams, use old schema - event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? + event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)? }; event::Event { rb, @@ -101,7 +101,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< origin_format: "json", origin_size: size as u64, is_first_event: is_first, - parsed_timestamp, + parsed_timestamp: now.naive_utc(), time_partition: None, custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, @@ -351,6 +351,7 @@ mod tests { use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; + use chrono::Utc; use serde_json::json; use std::{collections::HashMap, sync::Arc}; @@ -392,8 +393,15 @@ mod tests { "b": "hello", }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + json, + HashMap::default(), + Utc::now(), + false, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 4); @@ -419,8 +427,15 @@ mod tests { "c": null }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + json, + HashMap::default(), + Utc::now(), + false, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -450,7 +465,8 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -480,7 +496,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!( + into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err() + ); } #[test] @@ -496,7 +514,8 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 1); @@ -535,8 +554,15 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + json, + HashMap::default(), + Utc::now(), + false, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -582,8 +608,15 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + json, + HashMap::default(), + Utc::now(), + false, + None, + SchemaVersion::V0, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -630,7 +663,8 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -677,7 +711,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!( + into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err() + ); } #[test] @@ -718,6 +754,7 @@ mod tests { let (rb, _) = into_event_batch( flattened_json, HashMap::default(), + Utc::now(), false, None, SchemaVersion::V0, @@ -806,6 +843,7 @@ mod tests { let (rb, _) = into_event_batch( flattened_json, HashMap::default(), + Utc::now(), false, None, SchemaVersion::V1, diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 005b38a91..b6286d67b 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -96,6 +96,7 @@ async fn push_logs( let static_schema_flag = stream.get_static_schema_flag(); let custom_partition = stream.get_custom_partition(); let schema_version = stream.get_schema_version(); + let p_timestamp = Utc::now(); let data = if time_partition.is_some() || custom_partition.is_some() { convert_array_to_object( @@ -121,7 +122,7 @@ async fn push_logs( let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length let parsed_timestamp = match time_partition.as_ref() { Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, - _ => Utc::now().naive_utc(), + _ => p_timestamp.naive_utc(), }; let custom_partition_values = match custom_partition.as_ref() { Some(custom_partition) => { @@ -144,6 +145,7 @@ async fn push_logs( let (rb, is_first_event) = into_event_batch( value, schema, + p_timestamp, static_schema_flag, time_partition.as_ref(), schema_version, @@ -168,12 +170,14 @@ async fn push_logs( pub fn into_event_batch( data: Value, schema: HashMap>, + p_timestamp: DateTime, static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let (rb, is_first) = json::Event { data }.into_recordbatch( &schema, + p_timestamp, static_schema_flag, time_partition, schema_version, diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 3cdc5193c..53e6437d6 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -45,7 +45,7 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array}; use arrow_schema::Schema; use arrow_select::take::take; -use chrono::Utc; +use chrono::{DateTime, Utc}; use itertools::Itertools; pub mod batch_adapter; @@ -133,8 +133,8 @@ pub fn get_field<'a>( /// # Returns /// /// A column in arrow, containing the current timestamp in millis. -pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) +pub fn get_timestamp_array(p_timestamp: DateTime, size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size) } pub fn reverse(rb: &RecordBatch) -> RecordBatch { @@ -196,19 +196,19 @@ mod tests { #[test] fn test_timestamp_array_has_correct_size_and_value() { let size = 5; - let now = Utc::now().timestamp_millis(); + let now = Utc::now(); - let array = get_timestamp_array(size); + let array = get_timestamp_array(now, size); assert_eq!(array.len(), size); for i in 0..size { - assert!(array.value(i) >= now); + assert!(array.value(i) >= now.timestamp_millis()); } } #[test] fn test_timestamp_array_with_zero_size() { - let array = get_timestamp_array(0); + let array = get_timestamp_array(Utc::now(), 0); assert_eq!(array.len(), 0); assert!(array.is_empty());