Skip to content

Commit 674eed5

Browse files
refactor
1 parent 7af280d commit 674eed5

File tree

4 files changed

+37
-42
lines changed

4 files changed

+37
-42
lines changed

src/catalog/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,9 @@ async fn process_single_partition(
261261
handle_existing_partition(
262262
pos,
263263
partition_changes,
264-
&mut meta.snapshot.manifest_list,
265264
storage,
266265
stream_name,
266+
meta,
267267
events_ingested,
268268
ingestion_size,
269269
storage_size,
@@ -292,14 +292,15 @@ async fn process_single_partition(
292292
async fn handle_existing_partition(
293293
pos: usize,
294294
partition_changes: Vec<manifest::File>,
295-
manifests: &mut [snapshot::ManifestItem],
296295
storage: Arc<dyn ObjectStorage>,
297296
stream_name: &str,
297+
meta: &mut ObjectStoreFormat,
298298
events_ingested: u64,
299299
ingestion_size: u64,
300300
storage_size: u64,
301301
partition_lower: DateTime<Utc>,
302302
) -> Result<Option<snapshot::ManifestItem>, ObjectStorageError> {
303+
let manifests = &mut meta.snapshot.manifest_list;
303304
let path = partition_path(
304305
stream_name,
305306
manifests[pos].time_lower_bound,
@@ -329,7 +330,7 @@ async fn handle_existing_partition(
329330
storage,
330331
stream_name,
331332
false,
332-
ObjectStoreFormat::default(), // We don't have meta here, use default
333+
meta.clone(),
333334
events_ingested,
334335
ingestion_size,
335336
storage_size,

src/handlers/http/ingest.rs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::event::error::EventError;
2929
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3030
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3131
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
32+
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
3233
use crate::handlers::{
3334
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
3435
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
@@ -117,20 +118,7 @@ pub async fn ingest(
117118

118119
//if stream exists, fetch the stream log source
119120
//return error if the stream log source is otel traces or otel metrics
120-
let stream = match PARSEABLE.get_stream(&stream_name) {
121-
Ok(stream) => {
122-
stream
123-
.get_log_source()
124-
.iter()
125-
.find(|&stream_log_source_entry| {
126-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
127-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
128-
})
129-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
130-
stream
131-
}
132-
Err(e) => return Err(PostError::from(e)),
133-
};
121+
let stream = validate_stream_for_ingestion(&stream_name)?;
134122

135123
PARSEABLE
136124
.add_update_log_source(&stream_name, log_source_entry)
@@ -409,20 +397,7 @@ pub async fn post_event(
409397

410398
//if stream exists, fetch the stream log source
411399
//return error if the stream log source is otel traces or otel metrics
412-
let stream = match PARSEABLE.get_stream(&stream_name) {
413-
Ok(stream) => {
414-
stream
415-
.get_log_source()
416-
.iter()
417-
.find(|&stream_log_source_entry| {
418-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
419-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
420-
})
421-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
422-
stream
423-
}
424-
Err(e) => return Err(PostError::from(e)),
425-
};
400+
let stream = validate_stream_for_ingestion(&stream_name)?;
426401

427402
if stream.get_time_partition().is_some() {
428403
return Err(PostError::CustomError(

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
2323
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
2424
};
2525
use serde_json::Value;
26-
use std::collections::HashMap;
26+
use std::{collections::HashMap, sync::Arc};
2727
use tracing::warn;
2828

2929
use crate::{
@@ -39,7 +39,7 @@ use crate::{
3939
},
4040
},
4141
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
42-
parseable::PARSEABLE,
42+
parseable::{PARSEABLE, Stream},
4343
storage::StreamType,
4444
utils::json::{convert_array_to_object, flatten::convert_to_array},
4545
};
@@ -268,6 +268,29 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> {
268268
Ok(())
269269
}
270270

271+
pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<Arc<Stream>, PostError> {
272+
let stream = PARSEABLE.get_stream(stream_name)?;
273+
274+
// Validate that the stream's log source is compatible
275+
stream
276+
.get_log_source()
277+
.iter()
278+
.find(|&stream_log_source_entry| {
279+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
280+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
281+
})
282+
.ok_or(PostError::IncorrectLogFormat(stream_name.to_string()))?;
283+
284+
// Check for time partition
285+
if stream.get_time_partition().is_some() {
286+
return Err(PostError::CustomError(
287+
"Ingestion is not allowed to stream with time partition".to_string(),
288+
));
289+
}
290+
291+
Ok(stream)
292+
}
293+
271294
#[cfg(test)]
272295
mod tests {
273296
use super::*;

src/storage/object_storage.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,17 +114,13 @@ async fn upload_single_parquet_file(
114114
.expect("filename is valid string");
115115

116116
// Upload the file
117-
if let Err(e) = store
117+
store
118118
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
119119
.await
120-
{
121-
error!("Failed to upload file {filename:?}: {e}");
122-
return Ok(UploadResult {
123-
stats_calculated: false,
124-
file_path: path,
125-
manifest_file: None,
126-
});
127-
}
120+
.map_err(|e| {
121+
error!("Failed to upload file {filename:?} to {stream_relative_path}: {e}");
122+
ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}"))
123+
})?;
128124

129125
// Update storage metrics
130126
update_storage_metrics(&path, &stream_name, filename)?;

0 commit comments

Comments
 (0)