Skip to content

Commit 431f252

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into refactor
2 parents 920bcae + cf59e4d commit 431f252

File tree

5 files changed

+15
-10
lines changed

5 files changed

+15
-10
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ impl ParseableSinkProcessor {
8181
custom_partition_values: HashMap::new(),
8282
stream_type: StreamType::UserDefined,
8383
}
84-
.process(&stream)
85-
.await?;
84+
.process(&stream)?;
8685

8786
Ok(())
8887
}

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct Event {
4646

4747
// Events holds the schema related to a each event for a single log stream
4848
impl Event {
49-
pub async fn process(self, stream: &Stream) -> Result<(), EventError> {
49+
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
5050
let mut key = get_schema_key(&self.rb.schema().fields);
5151
if self.time_partition.is_some() {
5252
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();

src/handlers/http/ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9999
custom_partition_values: HashMap::new(),
100100
stream_type: StreamType::Internal,
101101
}
102-
.process(&stream)
103-
.await?;
102+
.process(&stream)?;
103+
104104
Ok(())
105105
}
106106

src/otel/traces.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
293293
span_record_json.extend(flatten_status(status));
294294
}
295295

296-
for span_json in &mut span_records_json {
297-
for (key, value) in &span_record_json {
298-
span_json.insert(key.clone(), value.clone());
296+
// if span_record.events is null, code should still flatten other elements in the span record - this is handled in the if block
297+
// else block handles the flattening the span record that includes events and links records in each span record
298+
if span_records_json.is_empty() {
299+
span_records_json = vec![span_record_json];
300+
} else {
301+
for span_json in &mut span_records_json {
302+
for (key, value) in &span_record_json {
303+
span_json.insert(key.clone(), value.clone());
304+
}
299305
}
300306
}
301307

src/parseable/streams.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ impl Stream {
216216
custom_partition_values,
217217
stream_type: StreamType::UserDefined,
218218
}
219-
.process(self)
220-
.await?;
219+
.process(self)?;
221220
}
221+
222222
Ok(())
223223
}
224224

0 commit comments

Comments
 (0)