Skip to content

Commit 6a313da

Browse files
authored
ref(processor): Move more functions (#5322)
1 parent c7f1307 commit 6a313da

File tree

8 files changed

+434
-673
lines changed

8 files changed

+434
-673
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
//! Dynamic sampling processor related code.
2+
use std::ops::ControlFlow;
3+
4+
use chrono::Utc;
5+
use relay_dynamic_config::ErrorBoundary;
6+
use relay_event_schema::protocol::Event;
7+
use relay_protocol::Annotated;
8+
use relay_sampling::config::RuleType;
9+
use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator};
10+
use relay_sampling::{DynamicSamplingContext, SamplingConfig};
11+
12+
use crate::processing::Context;
13+
use crate::utils::SamplingResult;
14+
15+
/// Computes the sampling decision on the incoming event
16+
pub async fn run(
17+
dsc: Option<&DynamicSamplingContext>,
18+
event: &mut Annotated<Event>,
19+
ctx: &Context<'_>,
20+
reservoir: Option<&ReservoirEvaluator<'_>>,
21+
) -> SamplingResult {
22+
let sampling_config = match ctx.project_info.config.sampling {
23+
Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config),
24+
_ => None,
25+
};
26+
27+
let root_state = ctx.sampling_project_info.as_ref();
28+
let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) {
29+
Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config),
30+
_ => None,
31+
};
32+
33+
compute_sampling_decision(
34+
ctx.config.processing_enabled(),
35+
reservoir,
36+
sampling_config,
37+
event.value(),
38+
root_config,
39+
dsc,
40+
)
41+
.await
42+
}
43+
44+
/// Computes the sampling decision on the incoming envelope.
45+
async fn compute_sampling_decision(
46+
processing_enabled: bool,
47+
reservoir: Option<&ReservoirEvaluator<'_>>,
48+
sampling_config: Option<&SamplingConfig>,
49+
event: Option<&Event>,
50+
root_sampling_config: Option<&SamplingConfig>,
51+
dsc: Option<&DynamicSamplingContext>,
52+
) -> SamplingResult {
53+
if (sampling_config.is_none() || event.is_none())
54+
&& (root_sampling_config.is_none() || dsc.is_none())
55+
{
56+
return SamplingResult::NoMatch;
57+
}
58+
59+
if sampling_config.is_some_and(|config| config.unsupported())
60+
|| root_sampling_config.is_some_and(|config| config.unsupported())
61+
{
62+
if processing_enabled {
63+
relay_log::error!("found unsupported rules even as processing relay");
64+
} else {
65+
return SamplingResult::NoMatch;
66+
}
67+
}
68+
69+
let mut evaluator = match reservoir {
70+
Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir),
71+
None => SamplingEvaluator::new(Utc::now()),
72+
};
73+
74+
if let (Some(event), Some(sampling_state)) = (event, sampling_config)
75+
&& let Some(seed) = event.id.value().map(|id| id.0)
76+
{
77+
let rules = sampling_state.filter_rules(RuleType::Transaction);
78+
evaluator = match evaluator.match_rules(seed, event, rules).await {
79+
ControlFlow::Continue(evaluator) => evaluator,
80+
ControlFlow::Break(sampling_match) => {
81+
return SamplingResult::Match(sampling_match);
82+
}
83+
}
84+
};
85+
86+
if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) {
87+
let rules = sampling_state.filter_rules(RuleType::Project);
88+
evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await {
89+
ControlFlow::Continue(evaluator) => evaluator,
90+
ControlFlow::Break(sampling_match) => {
91+
return SamplingResult::Match(sampling_match);
92+
}
93+
}
94+
};
95+
96+
if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) {
97+
let rules = sampling_state.filter_rules(RuleType::Trace);
98+
return evaluator
99+
.match_rules(*dsc.trace_id, dsc, rules)
100+
.await
101+
.into();
102+
}
103+
104+
SamplingResult::NoMatch
105+
}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use std::collections::BTreeMap;
110+
111+
use relay_base_schema::events::EventType;
112+
use relay_base_schema::project::ProjectKey;
113+
use relay_event_schema::protocol::{EventId, LenientString};
114+
use relay_protocol::RuleCondition;
115+
use relay_sampling::config::{
116+
DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange,
117+
};
118+
use relay_sampling::evaluation::SamplingMatch;
119+
120+
use super::*;
121+
122+
fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event {
123+
Event {
124+
id: Annotated::new(EventId::new()),
125+
ty: Annotated::new(event_type),
126+
transaction: Annotated::new(transaction.to_owned()),
127+
release: Annotated::new(LenientString(release.to_owned())),
128+
..Event::default()
129+
}
130+
}
131+
132+
fn mock_dsc() -> DynamicSamplingContext {
133+
DynamicSamplingContext {
134+
trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
135+
public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(),
136+
release: Some("1.1.1".to_owned()),
137+
user: Default::default(),
138+
replay_id: None,
139+
environment: None,
140+
transaction: Some("transaction1".into()),
141+
sample_rate: Some(0.5),
142+
sampled: Some(true),
143+
other: BTreeMap::new(),
144+
}
145+
}
146+
147+
// Helper to extract the sampling match from SamplingResult if thats the variant.
148+
fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch {
149+
if let SamplingResult::Match(sampling_match) = sampling_result {
150+
sampling_match
151+
} else {
152+
panic!()
153+
}
154+
}
155+
156+
#[tokio::test]
157+
async fn test_it_keeps_or_drops_transactions() {
158+
let event = Event {
159+
id: Annotated::new(EventId::new()),
160+
ty: Annotated::new(EventType::Transaction),
161+
transaction: Annotated::new("testing".to_owned()),
162+
..Event::default()
163+
};
164+
165+
for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] {
166+
let sampling_config = SamplingConfig {
167+
rules: vec![SamplingRule {
168+
condition: RuleCondition::all(),
169+
sampling_value: SamplingValue::SampleRate { value: sample_rate },
170+
ty: RuleType::Transaction,
171+
id: RuleId(1),
172+
time_range: Default::default(),
173+
decaying_fn: DecayingFunction::Constant,
174+
}],
175+
..SamplingConfig::new()
176+
};
177+
178+
// TODO: This does not test if the sampling decision is actually applied. This should be
179+
// refactored to send a proper Envelope in and call process_state to cover the full
180+
// pipeline.
181+
let res = compute_sampling_decision(
182+
false,
183+
None,
184+
Some(&sampling_config),
185+
Some(&event),
186+
None,
187+
None,
188+
)
189+
.await;
190+
assert_eq!(res.decision().is_keep(), should_keep);
191+
}
192+
}
193+
194+
/// Happy path test for compute_sampling_decision.
195+
#[tokio::test]
196+
async fn test_compute_sampling_decision_matching() {
197+
for rule_type in [RuleType::Transaction, RuleType::Project] {
198+
let event = mocked_event(EventType::Transaction, "foo", "bar");
199+
let rule = SamplingRule {
200+
condition: RuleCondition::all(),
201+
sampling_value: SamplingValue::SampleRate { value: 1.0 },
202+
ty: rule_type,
203+
id: RuleId(0),
204+
time_range: TimeRange::default(),
205+
decaying_fn: Default::default(),
206+
};
207+
208+
let sampling_config = SamplingConfig {
209+
rules: vec![rule],
210+
..SamplingConfig::new()
211+
};
212+
213+
let res = compute_sampling_decision(
214+
false,
215+
None,
216+
Some(&sampling_config),
217+
Some(&event),
218+
None,
219+
Some(&mock_dsc()),
220+
)
221+
.await;
222+
assert!(res.is_match());
223+
}
224+
}
225+
226+
#[tokio::test]
227+
async fn test_matching_with_unsupported_rule() {
228+
let event = mocked_event(EventType::Transaction, "foo", "bar");
229+
let rule = SamplingRule {
230+
condition: RuleCondition::all(),
231+
sampling_value: SamplingValue::SampleRate { value: 1.0 },
232+
ty: RuleType::Transaction,
233+
id: RuleId(0),
234+
time_range: TimeRange::default(),
235+
decaying_fn: Default::default(),
236+
};
237+
238+
let unsupported_rule = SamplingRule {
239+
condition: RuleCondition::all(),
240+
sampling_value: SamplingValue::SampleRate { value: 1.0 },
241+
ty: RuleType::Unsupported,
242+
id: RuleId(0),
243+
time_range: TimeRange::default(),
244+
decaying_fn: Default::default(),
245+
};
246+
247+
let sampling_config = SamplingConfig {
248+
rules: vec![rule, unsupported_rule],
249+
..SamplingConfig::new()
250+
};
251+
252+
// Unsupported rule should result in no match if processing is not enabled.
253+
let res = compute_sampling_decision(
254+
false,
255+
None,
256+
Some(&sampling_config),
257+
Some(&event),
258+
None,
259+
None,
260+
)
261+
.await;
262+
assert!(res.is_no_match());
263+
264+
// Match if processing is enabled.
265+
let res =
266+
compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None)
267+
.await;
268+
assert!(res.is_match());
269+
}
270+
271+
#[tokio::test]
272+
async fn test_client_sample_rate() {
273+
let dsc = mock_dsc();
274+
275+
let rule = SamplingRule {
276+
condition: RuleCondition::all(),
277+
sampling_value: SamplingValue::SampleRate { value: 0.2 },
278+
ty: RuleType::Trace,
279+
id: RuleId(0),
280+
time_range: TimeRange::default(),
281+
decaying_fn: Default::default(),
282+
};
283+
284+
let sampling_config = SamplingConfig {
285+
rules: vec![rule],
286+
..SamplingConfig::new()
287+
};
288+
289+
let res =
290+
compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc))
291+
.await;
292+
293+
assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
294+
}
295+
}

relay-server/src/processing/utils/event.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use relay_event_schema::processor::{self, ProcessingState};
2424
use relay_event_schema::protocol::IpAddr;
2525
use relay_event_schema::protocol::Span;
2626
use relay_event_schema::protocol::{Event, Metrics, OtelContext, RelayInfo};
27+
use relay_filter::FilterStatKey;
2728
use relay_metrics::MetricNamespace;
2829
use relay_protocol::Annotated;
2930
use relay_protocol::Empty;
@@ -318,6 +319,67 @@ pub fn normalize(
318319
Ok(event_fully_normalized)
319320
}
320321

322+
/// Status for applying some filters that don't drop the event.
323+
///
324+
/// The enum represents either the success of running all filters and keeping
325+
/// the event, [`FiltersStatus::Ok`], or not running all the filters because
326+
/// some are unsupported, [`FiltersStatus::Unsupported`].
327+
///
328+
/// If there are unsuppported filters, Relay should forward the event upstream
329+
/// so that a more up-to-date Relay can apply filters appropriately. Actions
330+
/// that depend on the outcome of event filtering, such as metric extraction,
331+
/// should be postponed until a filtering decision is made.
332+
#[must_use]
333+
pub enum FiltersStatus {
334+
/// All filters have been applied and the event should be kept.
335+
Ok,
336+
/// Some filters are not supported and were not applied.
337+
///
338+
/// Relay should forward events upstream for a more up-to-date Relay to apply these filters.
339+
/// Supported filters were applied and they don't reject the event.
340+
Unsupported,
341+
}
342+
343+
pub fn filter(
344+
headers: &EnvelopeHeaders,
345+
event: &mut Annotated<Event>,
346+
ctx: &Context,
347+
) -> Result<FiltersStatus, FilterStatKey> {
348+
let event = match event.value_mut() {
349+
Some(event) => event,
350+
// Some events are created by processing relays (e.g. unreal), so they do not yet
351+
// exist at this point in non-processing relays.
352+
None => return Ok(FiltersStatus::Ok),
353+
};
354+
355+
let client_ip = headers.meta().client_addr();
356+
let filter_settings = &ctx.project_info.config.filter_settings;
357+
358+
metric!(timer(RelayTimers::EventProcessingFiltering), {
359+
relay_filter::should_filter(
360+
event,
361+
client_ip,
362+
filter_settings,
363+
ctx.global_config.filters(),
364+
)
365+
})?;
366+
367+
// Don't extract metrics if relay can't apply generic filters. A filter
368+
// applied in another up-to-date relay in chain may need to drop the event,
369+
// and there should not be metrics from dropped events.
370+
// In processing relays, always extract metrics to avoid losing them.
371+
let supported_generic_filters = ctx.global_config.filters.is_ok()
372+
&& relay_filter::are_generic_filters_supported(
373+
ctx.global_config.filters().map(|f| f.version),
374+
ctx.project_info.config.filter_settings.generic.version,
375+
);
376+
if supported_generic_filters {
377+
Ok(FiltersStatus::Ok)
378+
} else {
379+
Ok(FiltersStatus::Unsupported)
380+
}
381+
}
382+
321383
/// New type representing the normalization state of the event.
322384
#[derive(Copy, Clone)]
323385
pub struct EventFullyNormalized(pub bool);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod dynamic_sampling;
12
pub mod event;
23
#[cfg(feature = "processing")]
34
pub mod store;

0 commit comments

Comments
 (0)