Skip to content

Commit c672d77

Browse files
restrict for OSS
1 parent eff0b73 commit c672d77

File tree

4 files changed

+100
-36
lines changed

4 files changed

+100
-36
lines changed

src/handlers/http/ingest.rs

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -117,22 +117,30 @@ pub async fn ingest(
117117

118118
//if stream exists, fetch the stream log source
119119
//return error if the stream log source is otel traces or otel metrics
120-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
121-
stream
122-
.get_log_source()
123-
.iter()
124-
.find(|&stream_log_source_entry| {
125-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
126-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
127-
})
128-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
129-
}
120+
let stream = match PARSEABLE.get_stream(&stream_name) {
121+
Ok(stream) => {
122+
stream
123+
.get_log_source()
124+
.iter()
125+
.find(|&stream_log_source_entry| {
126+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
127+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
128+
})
129+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
130+
stream
131+
}
132+
Err(e) => return Err(PostError::from(e)),
133+
};
130134

131135
PARSEABLE
132136
.add_update_log_source(&stream_name, log_source_entry)
133137
.await?;
134138

135-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
139+
if stream.get_time_partition().is_some() {
140+
return Err(PostError::IngestionNotAllowedWithTimePartition);
141+
}
142+
143+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
136144

137145
Ok(HttpResponse::Ok().finish())
138146
}
@@ -255,6 +263,7 @@ async fn process_otel_content(
255263
stream_name,
256264
log_source,
257265
&p_custom_fields,
266+
None,
258267
)
259268
.await?;
260269
} else if content_type == CONTENT_TYPE_PROTOBUF {
@@ -398,18 +407,31 @@ pub async fn post_event(
398407

399408
//if stream exists, fetch the stream log source
400409
//return error if the stream log source is otel traces or otel metrics
401-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
402-
stream
403-
.get_log_source()
404-
.iter()
405-
.find(|&stream_log_source_entry| {
406-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
407-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
408-
})
409-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
410-
}
410+
let stream = match PARSEABLE.get_stream(&stream_name) {
411+
Ok(stream) => {
412+
stream
413+
.get_log_source()
414+
.iter()
415+
.find(|&stream_log_source_entry| {
416+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
417+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
418+
})
419+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
420+
stream
421+
}
422+
Err(e) => return Err(PostError::from(e)),
423+
};
411424

412-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
425+
let time_partition = stream.get_time_partition();
426+
427+
flatten_and_push_logs(
428+
json,
429+
&stream_name,
430+
&log_source,
431+
&p_custom_fields,
432+
time_partition,
433+
)
434+
.await?;
413435

414436
Ok(HttpResponse::Ok().finish())
415437
}
@@ -489,6 +511,8 @@ pub enum PostError {
489511
InvalidQueryParameter,
490512
#[error("Missing query parameter")]
491513
MissingQueryParameter,
514+
#[error("Ingestion is not allowed to stream with time partition")]
515+
IngestionNotAllowedWithTimePartition,
492516
}
493517

494518
impl actix_web::ResponseError for PostError {
@@ -520,6 +544,7 @@ impl actix_web::ResponseError for PostError {
520544
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
521545
PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST,
522546
PostError::MissingQueryParameter => StatusCode::BAD_REQUEST,
547+
PostError::IngestionNotAllowedWithTimePartition => StatusCode::BAD_REQUEST,
523548
}
524549
}
525550

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub async fn flatten_and_push_logs(
5353
stream_name: &str,
5454
log_source: &LogSource,
5555
p_custom_fields: &HashMap<String, String>,
56+
time_partition: Option<String>,
5657
) -> Result<(), PostError> {
5758
// Verify the dataset fields count
5859
verify_dataset_fields_count(stream_name)?;
@@ -63,30 +64,67 @@ pub async fn flatten_and_push_logs(
6364
let message: Message = serde_json::from_value(json)?;
6465
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
6566
let record = convert_to_array(flattened_kinesis_data)?;
66-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
67+
push_logs(
68+
stream_name,
69+
record,
70+
log_source,
71+
p_custom_fields,
72+
time_partition,
73+
)
74+
.await?;
6775
}
6876
LogSource::OtelLogs => {
6977
//custom flattening required for otel logs
7078
let logs: LogsData = serde_json::from_value(json)?;
7179
for record in flatten_otel_logs(&logs) {
72-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
80+
push_logs(
81+
stream_name,
82+
record,
83+
log_source,
84+
p_custom_fields,
85+
time_partition.clone(),
86+
)
87+
.await?;
7388
}
7489
}
7590
LogSource::OtelTraces => {
7691
//custom flattening required for otel traces
7792
let traces: TracesData = serde_json::from_value(json)?;
7893
for record in flatten_otel_traces(&traces) {
79-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
94+
push_logs(
95+
stream_name,
96+
record,
97+
log_source,
98+
p_custom_fields,
99+
time_partition.clone(),
100+
)
101+
.await?;
80102
}
81103
}
82104
LogSource::OtelMetrics => {
83105
//custom flattening required for otel metrics
84106
let metrics: MetricsData = serde_json::from_value(json)?;
85107
for record in flatten_otel_metrics(metrics) {
86-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
108+
push_logs(
109+
stream_name,
110+
record,
111+
log_source,
112+
p_custom_fields,
113+
time_partition.clone(),
114+
)
115+
.await?;
87116
}
88117
}
89-
_ => push_logs(stream_name, json, log_source, p_custom_fields).await?,
118+
_ => {
119+
push_logs(
120+
stream_name,
121+
json,
122+
log_source,
123+
p_custom_fields,
124+
time_partition,
125+
)
126+
.await?
127+
}
90128
}
91129

92130
Ok(())
@@ -97,9 +135,9 @@ pub async fn push_logs(
97135
json: Value,
98136
log_source: &LogSource,
99137
p_custom_fields: &HashMap<String, String>,
138+
time_partition: Option<String>,
100139
) -> Result<(), PostError> {
101140
let stream = PARSEABLE.get_stream(stream_name)?;
102-
let time_partition = stream.get_time_partition();
103141
let time_partition_limit = PARSEABLE
104142
.get_stream(stream_name)?
105143
.get_time_partition_limit();

src/handlers/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ pub mod http;
2525
pub mod livetail;
2626

2727
pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
28-
pub const LOG_SOURCE_KEY: &str = "x-p-log-source";
29-
const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
30-
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
31-
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
32-
const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
33-
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
34-
const AUTHORIZATION_KEY: &str = "authorization";
35-
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
28+
pub pub const LOG_SOURCE_KEY: &str = "x-p-log-source";
29+
pub const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
30+
pub const TIME_PARTITION_KEY: &str = "x-p-time-partition";
31+
pub const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
32+
pub const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
33+
pub const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
34+
pub const AUTHORIZATION_KEY: &str = "authorization";
35+
pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3636
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3737
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3838
const COOKIE_AGE_DAYS: usize = 7;

src/storage/field_stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub async fn calculate_field_stats(
123123
DATASET_STATS_STREAM_NAME,
124124
&LogSource::Json,
125125
&HashMap::new(),
126+
None,
126127
)
127128
.await?;
128129
Ok(stats_calculated)

0 commit comments

Comments
 (0)