Skip to content

Commit b7dd383

Browse files
author
Devdutt Shenoi
committed
refactor: retire ingest utils
1 parent 2376fa5 commit b7dd383

File tree

4 files changed

+206
-261
lines changed

4 files changed

+206
-261
lines changed

src/handlers/http/ingest.rs

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use crate::utils::header_parsing::ParseHeaderError;
4444
use crate::utils::json::flatten::JsonFlattenError;
4545

4646
use super::logstream::error::{CreateStreamError, StreamError};
47-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
4847
use super::users::dashboards::DashboardError;
4948
use super::users::filters::FiltersError;
5049

@@ -70,7 +69,10 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7069
.get(LOG_SOURCE_KEY)
7170
.and_then(|h| h.to_str().ok())
7271
.map_or(LogSource::default(), LogSource::from);
73-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
72+
PARSEABLE
73+
.get_stream(&stream_name)?
74+
.flatten_and_push_logs(json, &log_source)
75+
.await?;
7476

7577
Ok(HttpResponse::Ok().finish())
7678
}
@@ -123,11 +125,12 @@ pub async fn handle_otel_logs_ingestion(
123125
PARSEABLE
124126
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
125127
.await?;
128+
let stream = PARSEABLE.get_stream(&stream_name)?;
126129

127130
//custom flattening required for otel logs
128131
let logs: LogsData = serde_json::from_value(json)?;
129132
for record in flatten_otel_logs(&logs) {
130-
push_logs(&stream_name, record, &log_source).await?;
133+
stream.push_logs(record, &log_source).await?;
131134
}
132135

133136
Ok(HttpResponse::Ok().finish())
@@ -158,11 +161,12 @@ pub async fn handle_otel_metrics_ingestion(
158161
LogSource::OtelMetrics,
159162
)
160163
.await?;
164+
let stream = PARSEABLE.get_stream(&stream_name)?;
161165

162166
//custom flattening required for otel metrics
163167
let metrics: MetricsData = serde_json::from_value(json)?;
164168
for record in flatten_otel_metrics(metrics) {
165-
push_logs(&stream_name, record, &log_source).await?;
169+
stream.push_logs(record, &log_source).await?;
166170
}
167171

168172
Ok(HttpResponse::Ok().finish())
@@ -190,11 +194,12 @@ pub async fn handle_otel_traces_ingestion(
190194
PARSEABLE
191195
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
192196
.await?;
197+
let stream = PARSEABLE.get_stream(&stream_name)?;
193198

194199
//custom flattening required for otel traces
195200
let traces: TracesData = serde_json::from_value(json)?;
196201
for record in flatten_otel_traces(&traces) {
197-
push_logs(&stream_name, record, &log_source).await?;
202+
stream.push_logs(record, &log_source).await?;
198203
}
199204

200205
Ok(HttpResponse::Ok().finish())
@@ -236,7 +241,10 @@ pub async fn post_event(
236241
.get(LOG_SOURCE_KEY)
237242
.and_then(|h| h.to_str().ok())
238243
.map_or(LogSource::default(), LogSource::from);
239-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
244+
PARSEABLE
245+
.get_stream(&stream_name)?
246+
.flatten_and_push_logs(json, &log_source)
247+
.await?;
240248

241249
Ok(HttpResponse::Ok().finish())
242250
}
@@ -349,7 +357,7 @@ mod tests {
349357
use std::{collections::HashMap, sync::Arc};
350358

351359
use crate::{
352-
handlers::http::modal::utils::ingest_utils::into_event_batch,
360+
event::format::{json, EventFormat},
353361
metadata::SchemaVersion,
354362
utils::json::{convert_array_to_object, flatten::convert_to_array},
355363
};
@@ -386,8 +394,9 @@ mod tests {
386394
"b": "hello",
387395
});
388396

389-
let (rb, _) =
390-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
397+
let (rb, _) = json::Event { data: json }
398+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
399+
.unwrap();
391400

392401
assert_eq!(rb.num_rows(), 1);
393402
assert_eq!(rb.num_columns(), 4);
@@ -413,8 +422,9 @@ mod tests {
413422
"c": null
414423
});
415424

416-
let (rb, _) =
417-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
425+
let (rb, _) = json::Event { data: json }
426+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
427+
.unwrap();
418428

419429
assert_eq!(rb.num_rows(), 1);
420430
assert_eq!(rb.num_columns(), 3);
@@ -444,7 +454,9 @@ mod tests {
444454
.into_iter(),
445455
);
446456

447-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
457+
let (rb, _) = json::Event { data: json }
458+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
459+
.unwrap();
448460

449461
assert_eq!(rb.num_rows(), 1);
450462
assert_eq!(rb.num_columns(), 3);
@@ -474,7 +486,9 @@ mod tests {
474486
.into_iter(),
475487
);
476488

477-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
489+
assert!(json::Event { data: json }
490+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
491+
.is_err());
478492
}
479493

480494
#[test]
@@ -490,7 +504,9 @@ mod tests {
490504
.into_iter(),
491505
);
492506

493-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
507+
let (rb, _) = json::Event { data: json }
508+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
509+
.unwrap();
494510

495511
assert_eq!(rb.num_rows(), 1);
496512
assert_eq!(rb.num_columns(), 1);
@@ -529,8 +545,9 @@ mod tests {
529545
},
530546
]);
531547

532-
let (rb, _) =
533-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
548+
let (rb, _) = json::Event { data: json }
549+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
550+
.unwrap();
534551

535552
assert_eq!(rb.num_rows(), 3);
536553
assert_eq!(rb.num_columns(), 4);
@@ -576,8 +593,9 @@ mod tests {
576593
},
577594
]);
578595

579-
let (rb, _) =
580-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
596+
let (rb, _) = json::Event { data: json }
597+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
598+
.unwrap();
581599

582600
assert_eq!(rb.num_rows(), 3);
583601
assert_eq!(rb.num_columns(), 4);
@@ -624,7 +642,9 @@ mod tests {
624642
.into_iter(),
625643
);
626644

627-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
645+
let (rb, _) = json::Event { data: json }
646+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
647+
.unwrap();
628648

629649
assert_eq!(rb.num_rows(), 3);
630650
assert_eq!(rb.num_columns(), 4);
@@ -671,7 +691,9 @@ mod tests {
671691
.into_iter(),
672692
);
673693

674-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
694+
assert!(json::Event { data: json }
695+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
696+
.is_err());
675697
}
676698

677699
#[test]
@@ -709,13 +731,10 @@ mod tests {
709731
)
710732
.unwrap();
711733

712-
let (rb, _) = into_event_batch(
713-
flattened_json,
714-
HashMap::default(),
715-
false,
716-
None,
717-
SchemaVersion::V0,
718-
)
734+
let (rb, _) = json::Event {
735+
data: flattened_json,
736+
}
737+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
719738
.unwrap();
720739
assert_eq!(rb.num_rows(), 4);
721740
assert_eq!(rb.num_columns(), 5);
@@ -797,13 +816,10 @@ mod tests {
797816
)
798817
.unwrap();
799818

800-
let (rb, _) = into_event_batch(
801-
flattened_json,
802-
HashMap::default(),
803-
false,
804-
None,
805-
SchemaVersion::V1,
806-
)
819+
let (rb, _) = json::Event {
820+
data: flattened_json,
821+
}
822+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
807823
.unwrap();
808824

809825
assert_eq!(rb.num_rows(), 4);

0 commit comments

Comments
 (0)