diff --git a/relay-server/src/processing/utils/dynamic_sampling.rs b/relay-server/src/processing/utils/dynamic_sampling.rs new file mode 100644 index 0000000000..31e69185b6 --- /dev/null +++ b/relay-server/src/processing/utils/dynamic_sampling.rs @@ -0,0 +1,295 @@ +//! Dynamic sampling processor related code. +use std::ops::ControlFlow; + +use chrono::Utc; +use relay_dynamic_config::ErrorBoundary; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; +use relay_sampling::config::RuleType; +use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator}; +use relay_sampling::{DynamicSamplingContext, SamplingConfig}; + +use crate::processing::Context; +use crate::utils::SamplingResult; + +/// Computes the sampling decision on the incoming event +pub async fn run( + dsc: Option<&DynamicSamplingContext>, + event: &mut Annotated, + ctx: &Context<'_>, + reservoir: Option<&ReservoirEvaluator<'_>>, +) -> SamplingResult { + let sampling_config = match ctx.project_info.config.sampling { + Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), + _ => None, + }; + + let root_state = ctx.sampling_project_info.as_ref(); + let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { + Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config), + _ => None, + }; + + compute_sampling_decision( + ctx.config.processing_enabled(), + reservoir, + sampling_config, + event.value(), + root_config, + dsc, + ) + .await +} + +/// Computes the sampling decision on the incoming envelope. +async fn compute_sampling_decision( + processing_enabled: bool, + reservoir: Option<&ReservoirEvaluator<'_>>, + sampling_config: Option<&SamplingConfig>, + event: Option<&Event>, + root_sampling_config: Option<&SamplingConfig>, + dsc: Option<&DynamicSamplingContext>, +) -> SamplingResult { + if (sampling_config.is_none() || event.is_none()) + && (root_sampling_config.is_none() || dsc.is_none()) + { + return SamplingResult::NoMatch; + } + + if sampling_config.is_some_and(|config| config.unsupported()) + || root_sampling_config.is_some_and(|config| config.unsupported()) + { + if processing_enabled { + relay_log::error!("found unsupported rules even as processing relay"); + } else { + return SamplingResult::NoMatch; + } + } + + let mut evaluator = match reservoir { + Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir), + None => SamplingEvaluator::new(Utc::now()), + }; + + if let (Some(event), Some(sampling_state)) = (event, sampling_config) + && let Some(seed) = event.id.value().map(|id| id.0) + { + let rules = sampling_state.filter_rules(RuleType::Transaction); + evaluator = match evaluator.match_rules(seed, event, rules).await { + ControlFlow::Continue(evaluator) => evaluator, + ControlFlow::Break(sampling_match) => { + return SamplingResult::Match(sampling_match); + } + } + }; + + if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) { + let rules = sampling_state.filter_rules(RuleType::Project); + evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await { + ControlFlow::Continue(evaluator) => evaluator, + ControlFlow::Break(sampling_match) => { + return SamplingResult::Match(sampling_match); + } + } + }; + + if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) { + let rules = sampling_state.filter_rules(RuleType::Trace); + return evaluator + .match_rules(*dsc.trace_id, dsc, rules) + .await + .into(); + } + + SamplingResult::NoMatch +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use relay_base_schema::events::EventType; + use relay_base_schema::project::ProjectKey; + use relay_event_schema::protocol::{EventId, LenientString}; + use relay_protocol::RuleCondition; + use relay_sampling::config::{ + DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange, + }; + use relay_sampling::evaluation::SamplingMatch; + + use super::*; + + fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event { + Event { + id: Annotated::new(EventId::new()), + ty: Annotated::new(event_type), + transaction: Annotated::new(transaction.to_owned()), + release: Annotated::new(LenientString(release.to_owned())), + ..Event::default() + } + } + + fn mock_dsc() -> DynamicSamplingContext { + DynamicSamplingContext { + trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(), + public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(), + release: Some("1.1.1".to_owned()), + user: Default::default(), + replay_id: None, + environment: None, + transaction: Some("transaction1".into()), + sample_rate: Some(0.5), + sampled: Some(true), + other: BTreeMap::new(), + } + } + + // Helper to extract the sampling match from SamplingResult if thats the variant. + fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch { + if let SamplingResult::Match(sampling_match) = sampling_result { + sampling_match + } else { + panic!() + } + } + + #[tokio::test] + async fn test_it_keeps_or_drops_transactions() { + let event = Event { + id: Annotated::new(EventId::new()), + ty: Annotated::new(EventType::Transaction), + transaction: Annotated::new("testing".to_owned()), + ..Event::default() + }; + + for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] { + let sampling_config = SamplingConfig { + rules: vec![SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: sample_rate }, + ty: RuleType::Transaction, + id: RuleId(1), + time_range: Default::default(), + decaying_fn: DecayingFunction::Constant, + }], + ..SamplingConfig::new() + }; + + // TODO: This does not test if the sampling decision is actually applied. This should be + // refactored to send a proper Envelope in and call process_state to cover the full + // pipeline. + let res = compute_sampling_decision( + false, + None, + Some(&sampling_config), + Some(&event), + None, + None, + ) + .await; + assert_eq!(res.decision().is_keep(), should_keep); + } + } + + /// Happy path test for compute_sampling_decision. + #[tokio::test] + async fn test_compute_sampling_decision_matching() { + for rule_type in [RuleType::Transaction, RuleType::Project] { + let event = mocked_event(EventType::Transaction, "foo", "bar"); + let rule = SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: 1.0 }, + ty: rule_type, + id: RuleId(0), + time_range: TimeRange::default(), + decaying_fn: Default::default(), + }; + + let sampling_config = SamplingConfig { + rules: vec![rule], + ..SamplingConfig::new() + }; + + let res = compute_sampling_decision( + false, + None, + Some(&sampling_config), + Some(&event), + None, + Some(&mock_dsc()), + ) + .await; + assert!(res.is_match()); + } + } + + #[tokio::test] + async fn test_matching_with_unsupported_rule() { + let event = mocked_event(EventType::Transaction, "foo", "bar"); + let rule = SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: 1.0 }, + ty: RuleType::Transaction, + id: RuleId(0), + time_range: TimeRange::default(), + decaying_fn: Default::default(), + }; + + let unsupported_rule = SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: 1.0 }, + ty: RuleType::Unsupported, + id: RuleId(0), + time_range: TimeRange::default(), + decaying_fn: Default::default(), + }; + + let sampling_config = SamplingConfig { + rules: vec![rule, unsupported_rule], + ..SamplingConfig::new() + }; + + // Unsupported rule should result in no match if processing is not enabled. + let res = compute_sampling_decision( + false, + None, + Some(&sampling_config), + Some(&event), + None, + None, + ) + .await; + assert!(res.is_no_match()); + + // Match if processing is enabled. + let res = + compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None) + .await; + assert!(res.is_match()); + } + + #[tokio::test] + async fn test_client_sample_rate() { + let dsc = mock_dsc(); + + let rule = SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: 0.2 }, + ty: RuleType::Trace, + id: RuleId(0), + time_range: TimeRange::default(), + decaying_fn: Default::default(), + }; + + let sampling_config = SamplingConfig { + rules: vec![rule], + ..SamplingConfig::new() + }; + + let res = + compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc)) + .await; + + assert_eq!(get_sampling_match(res).sample_rate(), 0.2); + } +} diff --git a/relay-server/src/processing/utils/event.rs b/relay-server/src/processing/utils/event.rs index 8847e7050f..b583943b26 100644 --- a/relay-server/src/processing/utils/event.rs +++ b/relay-server/src/processing/utils/event.rs @@ -24,6 +24,7 @@ use relay_event_schema::processor::{self, ProcessingState}; use relay_event_schema::protocol::IpAddr; use relay_event_schema::protocol::Span; use relay_event_schema::protocol::{Event, Metrics, OtelContext, RelayInfo}; +use relay_filter::FilterStatKey; use relay_metrics::MetricNamespace; use relay_protocol::Annotated; use relay_protocol::Empty; @@ -318,6 +319,67 @@ pub fn normalize( Ok(event_fully_normalized) } +/// Status for applying some filters that don't drop the event. +/// +/// The enum represents either the success of running all filters and keeping +/// the event, [`FiltersStatus::Ok`], or not running all the filters because +/// some are unsupported, [`FiltersStatus::Unsupported`]. +/// +/// If there are unsuppported filters, Relay should forward the event upstream +/// so that a more up-to-date Relay can apply filters appropriately. Actions +/// that depend on the outcome of event filtering, such as metric extraction, +/// should be postponed until a filtering decision is made. +#[must_use] +pub enum FiltersStatus { + /// All filters have been applied and the event should be kept. + Ok, + /// Some filters are not supported and were not applied. + /// + /// Relay should forward events upstream for a more up-to-date Relay to apply these filters. + /// Supported filters were applied and they don't reject the event. + Unsupported, +} + +pub fn filter( + headers: &EnvelopeHeaders, + event: &mut Annotated, + ctx: &Context, +) -> Result { + let event = match event.value_mut() { + Some(event) => event, + // Some events are created by processing relays (e.g. unreal), so they do not yet + // exist at this point in non-processing relays. + None => return Ok(FiltersStatus::Ok), + }; + + let client_ip = headers.meta().client_addr(); + let filter_settings = &ctx.project_info.config.filter_settings; + + metric!(timer(RelayTimers::EventProcessingFiltering), { + relay_filter::should_filter( + event, + client_ip, + filter_settings, + ctx.global_config.filters(), + ) + })?; + + // Don't extract metrics if relay can't apply generic filters. A filter + // applied in another up-to-date relay in chain may need to drop the event, + // and there should not be metrics from dropped events. + // In processing relays, always extract metrics to avoid losing them. + let supported_generic_filters = ctx.global_config.filters.is_ok() + && relay_filter::are_generic_filters_supported( + ctx.global_config.filters().map(|f| f.version), + ctx.project_info.config.filter_settings.generic.version, + ); + if supported_generic_filters { + Ok(FiltersStatus::Ok) + } else { + Ok(FiltersStatus::Unsupported) + } +} + /// New type representing the normalization state of the event. #[derive(Copy, Clone)] pub struct EventFullyNormalized(pub bool); diff --git a/relay-server/src/processing/utils/mod.rs b/relay-server/src/processing/utils/mod.rs index 6260af3ff6..27d396f7ea 100644 --- a/relay-server/src/processing/utils/mod.rs +++ b/relay-server/src/processing/utils/mod.rs @@ -1,3 +1,4 @@ +pub mod dynamic_sampling; pub mod event; #[cfg(feature = "processing")] pub mod store; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 7fd5486655..7534a10d34 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -53,14 +53,14 @@ use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; use crate::processing::trace_metrics::TraceMetricsProcessor; use crate::processing::utils::event::{ - EventFullyNormalized, EventMetricsExtracted, SpansExtracted, event_category, event_type, + EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category, + event_type, }; use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets}; use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome}; -use crate::services::processor::event::FiltersStatus; use crate::services::projects::cache::ProjectCacheHandle; use crate::services::projects::project::{ProjectInfo, ProjectState}; use crate::services::upstream::{ @@ -69,7 +69,6 @@ use crate::services::upstream::{ use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult}; use crate::{http, processing}; -use relay_base_schema::organization::OrganizationId; use relay_threading::AsyncPool; #[cfg(feature = "processing")] use { @@ -178,29 +177,9 @@ macro_rules! processing_group { /// Should be used only with groups which are responsible for processing envelopes with events. pub trait EventProcessing {} -/// A trait for processing groups that can be dynamically sampled. -pub trait Sampling { - /// Whether dynamic sampling should run under the given project's conditions. - fn supports_sampling(project_info: &ProjectInfo) -> bool; - - /// Whether reservoir sampling applies to this processing group (a.k.a. data type). - fn supports_reservoir_sampling() -> bool; -} - processing_group!(TransactionGroup, Transaction); impl EventProcessing for TransactionGroup {} -impl Sampling for TransactionGroup { - fn supports_sampling(project_info: &ProjectInfo) -> bool { - // For transactions, we require transaction metrics to be enabled before sampling. - matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()) - } - - fn supports_reservoir_sampling() -> bool { - true - } -} - processing_group!(ErrorGroup, Error); impl EventProcessing for ErrorGroup {} @@ -213,17 +192,6 @@ processing_group!(LogGroup, Log, Nel); processing_group!(TraceMetricGroup, TraceMetric); processing_group!(SpanGroup, Span); -impl Sampling for SpanGroup { - fn supports_sampling(project_info: &ProjectInfo) -> bool { - // If no metrics could be extracted, do not sample anything. - matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()) - } - - fn supports_reservoir_sampling() -> bool { - false - } -} - processing_group!(ProfileChunkGroup, ProfileChunk); processing_group!(MetricsGroup, Metrics); processing_group!(ForwardUnknownGroup, ForwardUnknown); @@ -1510,12 +1478,15 @@ impl EnvelopeProcessorService { &ctx, &self.inner.geoip_lookup, )?; - let filter_run = event::filter( - managed_envelope, + let filter_run = processing::utils::event::filter( + managed_envelope.envelope().headers(), &mut event, - ctx.project_info, - &self.inner.global_config.current(), - )?; + &ctx, + ) + .map_err(|err| { + managed_envelope.reject(Outcome::Filtered(err.clone())); + ProcessingError::EventFiltered(err) + })?; if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) { dynamic_sampling::tag_error_with_sampling_decision( @@ -1628,32 +1599,36 @@ impl EnvelopeProcessorService { &self.inner.geoip_lookup, )?; - let filter_run = event::filter( - managed_envelope, + let filter_run = processing::utils::event::filter( + managed_envelope.envelope().headers(), &mut event, - ctx.project_info, - ctx.global_config, - )?; + &ctx, + ) + .map_err(|err| { + managed_envelope.reject(Outcome::Filtered(err.clone())); + ProcessingError::EventFiltered(err) + })?; // Always run dynamic sampling on processing Relays, // but delay decision until inbound filters have been fully processed. - let run_dynamic_sampling = - matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled(); - - let reservoir = self.new_reservoir_evaluator( - managed_envelope.scoping().organization_id, - reservoir_counters, - ); + // Also, we require transaction metrics to be enabled before sampling. + let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok) + || self.inner.config.processing_enabled()) + && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()); let sampling_result = match run_dynamic_sampling { true => { - dynamic_sampling::run( - managed_envelope, + #[allow(unused_mut)] + let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters)); + #[cfg(feature = "processing")] + if let Some(quotas_client) = self.inner.quotas_client.as_ref() { + reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client); + } + processing::utils::dynamic_sampling::run( + managed_envelope.envelope().headers().dsc(), &mut event, - ctx.config, - ctx.project_info, - ctx.sampling_project_info, - &reservoir, + &ctx, + Some(&reservoir), ) .await } @@ -1721,7 +1696,6 @@ impl EnvelopeProcessorService { // Unconditionally scrub to make sure PII is removed as early as possible. event::scrub(&mut event, ctx.project_info)?; - // TODO: remove once `relay.drop-transaction-attachments` has graduated. attachment::scrub(managed_envelope, ctx.project_info); if_processing!(self.inner.config, { @@ -1953,13 +1927,11 @@ impl EnvelopeProcessorService { /// Processes standalone spans. /// /// This function does *not* run for spans extracted from transactions. - #[allow(clippy::too_many_arguments)] async fn process_standalone_spans( &self, managed_envelope: &mut TypedEnvelope, _project_id: ProjectId, ctx: processing::Context<'_>, - _reservoir_counters: &ReservoirCounters, ) -> Result, ProcessingError> { let mut extracted_metrics = ProcessingExtractedMetrics::new(); @@ -1967,11 +1939,6 @@ impl EnvelopeProcessorService { span::convert_otel_traces_data(managed_envelope); if_processing!(self.inner.config, { - let reservoir = self.new_reservoir_evaluator( - managed_envelope.scoping().organization_id, - _reservoir_counters, - ); - span::process( managed_envelope, &mut Annotated::empty(), @@ -1979,7 +1946,6 @@ impl EnvelopeProcessorService { _project_id, ctx, &self.inner.geoip_lookup, - &reservoir, ) .await; }); @@ -2105,12 +2071,7 @@ impl EnvelopeProcessorService { self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx) .await } - ProcessingGroup::Span => run!( - process_standalone_spans, - project_id, - ctx, - reservoir_counters - ), + ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx), ProcessingGroup::ProfileChunk => { run!(process_profile_chunks, ctx) } @@ -3072,22 +3033,6 @@ impl EnvelopeProcessorService { EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(), } } - - fn new_reservoir_evaluator( - &self, - _organization_id: OrganizationId, - reservoir_counters: &ReservoirCounters, - ) -> ReservoirEvaluator<'_> { - #[cfg_attr(not(feature = "processing"), expect(unused_mut))] - let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters)); - - #[cfg(feature = "processing")] - if let Some(quotas_client) = self.inner.quotas_client.as_ref() { - reservoir.set_redis(_organization_id, quotas_client); - } - - reservoir - } } impl Service for EnvelopeProcessorService { diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 4764880d0d..55f10643e7 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -1,24 +1,19 @@ //! Dynamic sampling processor related code. -use std::ops::ControlFlow; -use chrono::Utc; use relay_config::Config; use relay_dynamic_config::ErrorBoundary; use relay_event_schema::protocol::{Contexts, Event, TraceContext}; use relay_protocol::{Annotated, Empty}; use relay_quotas::DataCategory; -use relay_sampling::config::RuleType; -use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator}; -use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use crate::envelope::ItemType; use crate::managed::TypedEnvelope; use crate::services::outcome::Outcome; use crate::services::processor::{ - EventProcessing, Sampling, SpansExtracted, TransactionGroup, event_category, + EventProcessing, SpansExtracted, TransactionGroup, event_category, }; use crate::services::projects::project::ProjectInfo; -use crate::utils::{self, SamplingResult}; +use crate::utils::{self}; /// Ensures there is a valid dynamic sampling context and corresponding project state. /// @@ -75,46 +70,6 @@ pub fn validate_and_set_dsc<'a, T>( None } -/// Computes the sampling decision on the incoming event -pub async fn run( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - config: &Config, - project_info: &ProjectInfo, - sampling_project_info: Option<&ProjectInfo>, - reservoir: &ReservoirEvaluator<'_>, -) -> SamplingResult -where - Group: Sampling, -{ - if !Group::supports_sampling(project_info) { - return SamplingResult::Pending; - } - - let sampling_config = match project_info.config.sampling { - Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), - _ => None, - }; - - let root_state = sampling_project_info.as_ref(); - let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { - Some(ErrorBoundary::Ok(config)) if !config.unsupported() => Some(config), - _ => None, - }; - - let reservoir = Group::supports_reservoir_sampling().then_some(reservoir); - - compute_sampling_decision( - config.processing_enabled(), - reservoir, - sampling_config, - event.value(), - root_config, - managed_envelope.envelope().dsc(), - ) - .await -} - /// Apply the dynamic sampling decision from `compute_sampling_decision`. pub fn drop_unsampled_items( managed_envelope: &mut TypedEnvelope, @@ -169,69 +124,6 @@ pub fn drop_unsampled_items( } } -/// Computes the sampling decision on the incoming envelope. -async fn compute_sampling_decision( - processing_enabled: bool, - reservoir: Option<&ReservoirEvaluator<'_>>, - sampling_config: Option<&SamplingConfig>, - event: Option<&Event>, - root_sampling_config: Option<&SamplingConfig>, - dsc: Option<&DynamicSamplingContext>, -) -> SamplingResult { - if (sampling_config.is_none() || event.is_none()) - && (root_sampling_config.is_none() || dsc.is_none()) - { - return SamplingResult::NoMatch; - } - - if sampling_config.is_some_and(|config| config.unsupported()) - || root_sampling_config.is_some_and(|config| config.unsupported()) - { - if processing_enabled { - relay_log::error!("found unsupported rules even as processing relay"); - } else { - return SamplingResult::NoMatch; - } - } - - let mut evaluator = match reservoir { - Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir), - None => SamplingEvaluator::new(Utc::now()), - }; - - if let (Some(event), Some(sampling_state)) = (event, sampling_config) - && let Some(seed) = event.id.value().map(|id| id.0) - { - let rules = sampling_state.filter_rules(RuleType::Transaction); - evaluator = match evaluator.match_rules(seed, event, rules).await { - ControlFlow::Continue(evaluator) => evaluator, - ControlFlow::Break(sampling_match) => { - return SamplingResult::Match(sampling_match); - } - } - }; - - if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) { - let rules = sampling_state.filter_rules(RuleType::Project); - evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await { - ControlFlow::Continue(evaluator) => evaluator, - ControlFlow::Break(sampling_match) => { - return SamplingResult::Match(sampling_match); - } - } - }; - - if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) { - let rules = sampling_state.filter_rules(RuleType::Trace); - return evaluator - .match_rules(*dsc.trace_id, dsc, rules) - .await - .into(); - } - - SamplingResult::NoMatch -} - /// Runs dynamic sampling on an incoming error and tags it in case of successful sampling /// decision. /// @@ -283,69 +175,26 @@ pub async fn tag_error_with_sampling_decision( #[cfg(test)] mod tests { use std::collections::BTreeMap; - use std::sync::Arc; - use bytes::Bytes; - use relay_base_schema::events::EventType; use relay_base_schema::project::ProjectKey; use relay_cogs::Token; - use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig}; - use relay_event_schema::protocol::{EventId, LenientString}; + use relay_event_schema::protocol::EventId; use relay_protocol::RuleCondition; - use relay_sampling::config::{ - DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange, - }; - use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision, SamplingMatch}; + use relay_sampling::config::{RuleId, RuleType, SamplingRule, SamplingValue}; + use relay_sampling::evaluation::ReservoirCounters; + use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_system::Addr; use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; use crate::processing; - use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, SpanGroup, Submit}; + use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::services::projects::project::ProjectInfo; - use crate::testutils::{create_test_processor, new_envelope, state_with_rule_and_condition}; + use crate::testutils::create_test_processor; use super::*; - fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event { - Event { - id: Annotated::new(EventId::new()), - ty: Annotated::new(event_type), - transaction: Annotated::new(transaction.to_owned()), - release: Annotated::new(LenientString(release.to_owned())), - ..Event::default() - } - } - - fn dummy_reservoir() -> ReservoirEvaluator<'static> { - ReservoirEvaluator::new(ReservoirCounters::default()) - } - - fn test_dsc() -> DynamicSamplingContext { - DynamicSamplingContext { - trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(), - public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(), - release: Some("1.1.1".to_owned()), - user: Default::default(), - replay_id: None, - environment: None, - transaction: Some("transaction1".into()), - sample_rate: Some(0.5), - sampled: Some(true), - other: BTreeMap::new(), - } - } - - // Helper to extract the sampling match from SamplingResult if thats the variant. - fn get_sampling_match(sampling_result: SamplingResult) -> SamplingMatch { - if let SamplingResult::Match(sampling_match) = sampling_result { - sampling_match - } else { - panic!() - } - } - /// Always sets the processing item type to event. async fn process_envelope_with_root_project_state( envelope: Box, @@ -423,136 +272,6 @@ mod tests { assert!(event.contexts.value().is_none()); } - #[tokio::test] - async fn test_it_keeps_or_drops_transactions() { - let event = Event { - id: Annotated::new(EventId::new()), - ty: Annotated::new(EventType::Transaction), - transaction: Annotated::new("testing".to_owned()), - ..Event::default() - }; - - for (sample_rate, should_keep) in [(0.0, false), (1.0, true)] { - let sampling_config = SamplingConfig { - rules: vec![SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: sample_rate }, - ty: RuleType::Transaction, - id: RuleId(1), - time_range: Default::default(), - decaying_fn: DecayingFunction::Constant, - }], - ..SamplingConfig::new() - }; - - // TODO: This does not test if the sampling decision is actually applied. This should be - // refactored to send a proper Envelope in and call process_state to cover the full - // pipeline. - let res = compute_sampling_decision( - false, - None, - Some(&sampling_config), - Some(&event), - None, - None, - ) - .await; - assert_eq!(res.decision().is_keep(), should_keep); - } - } - - #[tokio::test] - async fn test_dsc_respects_metrics_extracted() { - relay_test::setup(); - let outcome_aggregator = Addr::dummy(); - - let config = Arc::new( - Config::from_json_value(serde_json::json!({ - "processing": { - "enabled": true, - "kafka_config": [], - } - })) - .unwrap(), - ); - - let get_test_params = |version: Option| { - let event = Event { - id: Annotated::new(EventId::new()), - ty: Annotated::new(EventType::Transaction), - transaction: Annotated::new("testing".to_owned()), - ..Event::default() - }; - - let mut project_info = state_with_rule_and_condition( - Some(0.0), - RuleType::Transaction, - RuleCondition::all(), - ); - - if let Some(version) = version { - project_info.config.transaction_metrics = - ErrorBoundary::Ok(relay_dynamic_config::TransactionMetricsConfig { - version, - ..Default::default() - }) - .into(); - } - - let envelope = new_envelope(false, "foo"); - let managed_envelope: TypedEnvelope = ( - ManagedEnvelope::new(envelope, outcome_aggregator.clone()), - ProcessingGroup::Transaction, - ) - .try_into() - .unwrap(); - - let event = Annotated::from(event); - (managed_envelope, event, project_info) - }; - - let reservoir = dummy_reservoir(); - - // None represents no TransactionMetricsConfig, DS will not be run - let (mut managed_envelope, mut event, project_info) = get_test_params(None); - let sampling_result = run( - &mut managed_envelope, - &mut event, - &config, - &project_info, - None, - &reservoir, - ) - .await; - assert_eq!(sampling_result.decision(), SamplingDecision::Keep); - - // Current version is 3, so it won't run DS if it's outdated - let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2)); - let sampling_result = run( - &mut managed_envelope, - &mut event, - &config, - &project_info, - None, - &reservoir, - ) - .await; - assert_eq!(sampling_result.decision(), SamplingDecision::Keep); - - // Dynamic sampling is run, as the transaction metrics version is up to date. - let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3)); - let sampling_result = run( - &mut managed_envelope, - &mut event, - &config, - &project_info, - None, - &reservoir, - ) - .await; - assert_eq!(sampling_result.decision(), SamplingDecision::Drop); - } - fn project_state_with_single_rule(sample_rate: f64) -> ProjectInfo { let sampling_config = SamplingConfig { rules: vec![SamplingRule { @@ -571,6 +290,21 @@ mod tests { sampling_project_state } + fn mock_dsc() -> DynamicSamplingContext { + DynamicSamplingContext { + trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(), + public_key: ProjectKey::parse("abd0f232775f45feab79864e580d160b").unwrap(), + release: Some("1.1.1".to_owned()), + user: Default::default(), + replay_id: None, + environment: None, + transaction: Some("transaction1".into()), + sample_rate: Some(0.5), + sampled: Some(true), + other: BTreeMap::new(), + } + } + #[tokio::test] async fn test_error_is_tagged_correctly_if_trace_sampling_result_is_some() { let event_id = EventId::new(); @@ -579,7 +313,7 @@ mod tests { .unwrap(); let request_meta = RequestMeta::new(dsn); let mut envelope = Envelope::from_request(Some(event_id), request_meta); - envelope.set_dsc(test_dsc()); + envelope.set_dsc(mock_dsc()); envelope.add_item(mocked_error_item()); // We test with sample rate equal to 100%. @@ -643,191 +377,4 @@ mod tests { let trace_context = event.context::().unwrap(); assert!(trace_context.sampled.value().unwrap()); } - - /// Happy path test for compute_sampling_decision. - #[tokio::test] - async fn test_compute_sampling_decision_matching() { - for rule_type in [RuleType::Transaction, RuleType::Project] { - let event = mocked_event(EventType::Transaction, "foo", "bar"); - let rule = SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: 1.0 }, - ty: rule_type, - id: RuleId(0), - time_range: TimeRange::default(), - decaying_fn: Default::default(), - }; - - let sampling_config = SamplingConfig { - rules: vec![rule], - ..SamplingConfig::new() - }; - - let res = compute_sampling_decision( - false, - None, - Some(&sampling_config), - Some(&event), - None, - Some(&test_dsc()), - ) - .await; - assert!(res.is_match()); - } - } - - #[tokio::test] - async fn test_matching_with_unsupported_rule() { - let event = mocked_event(EventType::Transaction, "foo", "bar"); - let rule = SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: 1.0 }, - ty: RuleType::Transaction, - id: RuleId(0), - time_range: TimeRange::default(), - decaying_fn: Default::default(), - }; - - let unsupported_rule = SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: 1.0 }, - ty: RuleType::Unsupported, - id: RuleId(0), - time_range: TimeRange::default(), - decaying_fn: Default::default(), - }; - - let sampling_config = SamplingConfig { - rules: vec![rule, unsupported_rule], - ..SamplingConfig::new() - }; - - // Unsupported rule should result in no match if processing is not enabled. - let res = compute_sampling_decision( - false, - None, - Some(&sampling_config), - Some(&event), - None, - None, - ) - .await; - assert!(res.is_no_match()); - - // Match if processing is enabled. - let res = - compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None) - .await; - assert!(res.is_match()); - } - - #[tokio::test] - async fn test_client_sample_rate() { - let dsc = test_dsc(); - - let rule = SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: 0.2 }, - ty: RuleType::Trace, - id: RuleId(0), - time_range: TimeRange::default(), - decaying_fn: Default::default(), - }; - - let sampling_config = SamplingConfig { - rules: vec![rule], - ..SamplingConfig::new() - }; - - let res = - compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc)) - .await; - - assert_eq!(get_sampling_match(res).sample_rate(), 0.2); - } - - async fn run_with_reservoir_rule(processing_group: ProcessingGroup) -> SamplingResult - where - Group: Sampling + TryFrom, - { - let project_info = { - let mut info = ProjectInfo::default(); - info.config.transaction_metrics = Some(ErrorBoundary::Ok(TransactionMetricsConfig { - version: 1, - ..Default::default() - })); - info - }; - - let bytes = Bytes::from( - r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#, - ); - let envelope = Envelope::parse_bytes(bytes).unwrap(); - let config = Config::default(); - - let mut managed_envelope: TypedEnvelope = ( - ManagedEnvelope::new(envelope, Addr::dummy()), - processing_group, - ) - .try_into() - .unwrap(); - - let mut event = Annotated::new(Event::default()); - - let sampling_project_info = { - let mut state = ProjectInfo::default(); - state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default()); - state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig { - version: 2, - rules: vec![ - // Set up a reservoir (only used for transactions): - SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::Reservoir { limit: 100 }, - ty: RuleType::Trace, - id: RuleId(1), - time_range: Default::default(), - decaying_fn: Default::default(), - }, - // Reject everything that does not go into the reservoir: - SamplingRule { - condition: RuleCondition::all(), - sampling_value: SamplingValue::SampleRate { value: 0.0 }, - ty: RuleType::Trace, - id: RuleId(2), - time_range: Default::default(), - decaying_fn: Default::default(), - }, - ], - rules_v2: vec![], - })); - Some(state) - }; - - let reservoir = dummy_reservoir(); - run::( - &mut managed_envelope, - &mut event, - &config, - &project_info, - sampling_project_info.as_ref(), - &reservoir, - ) - .await - } - - #[tokio::test] - async fn test_reservoir_applied_for_transactions() { - let result = - run_with_reservoir_rule::(ProcessingGroup::Transaction).await; - // Default sampling rate is 0.0, but transaction is retained because of reservoir: - assert_eq!(result.decision(), SamplingDecision::Keep); - } - - #[tokio::test] - async fn test_reservoir_not_applied_for_spans() { - let result = run_with_reservoir_rule::(ProcessingGroup::Span).await; - // Default sampling rate is 0.0, and the reservoir does not apply to spans: - assert_eq!(result.decision(), SamplingDecision::Drop); - } } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 3de3c29910..f782ad6c78 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -4,7 +4,6 @@ use std::error::Error; use relay_base_schema::events::EventType; use relay_config::Config; -use relay_dynamic_config::GlobalConfig; use relay_event_schema::processor::{self, ProcessingState}; use relay_event_schema::protocol::{ Breadcrumb, Csp, Event, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, @@ -18,7 +17,6 @@ use serde_json::Value as SerdeValue; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::managed::TypedEnvelope; -use crate::services::outcome::Outcome; use crate::services::processor::{ EventFullyNormalized, EventMetricsExtracted, EventProcessing, ExtractedEvent, ProcessingError, SpansExtracted, event_type, @@ -142,67 +140,6 @@ pub fn extract( }) } -/// Status for applying some filters that don't drop the event. -/// -/// The enum represents either the success of running all filters and keeping -/// the event, [`FiltersStatus::Ok`], or not running all the filters because -/// some are unsupported, [`FiltersStatus::Unsupported`]. -/// -/// If there are unsuppported filters, Relay should forward the event upstream -/// so that a more up-to-date Relay can apply filters appropriately. Actions -/// that depend on the outcome of event filtering, such as metric extraction, -/// should be postponed until a filtering decision is made. -#[must_use] -pub enum FiltersStatus { - /// All filters have been applied and the event should be kept. - Ok, - /// Some filters are not supported and were not applied. - /// - /// Relay should forward events upstream for a more up-to-date Relay to apply these filters. - /// Supported filters were applied and they don't reject the event. - Unsupported, -} - -pub fn filter( - managed_envelope: &mut TypedEnvelope, - event: &mut Annotated, - project_info: &ProjectInfo, - global_config: &GlobalConfig, -) -> Result { - let event = match event.value_mut() { - Some(event) => event, - // Some events are created by processing relays (e.g. unreal), so they do not yet - // exist at this point in non-processing relays. - None => return Ok(FiltersStatus::Ok), - }; - - let client_ip = managed_envelope.envelope().meta().client_addr(); - let filter_settings = &project_info.config.filter_settings; - - metric!(timer(RelayTimers::EventProcessingFiltering), { - relay_filter::should_filter(event, client_ip, filter_settings, global_config.filters()) - .map_err(|err| { - managed_envelope.reject(Outcome::Filtered(err.clone())); - ProcessingError::EventFiltered(err) - }) - })?; - - // Don't extract metrics if relay can't apply generic filters. A filter - // applied in another up-to-date relay in chain may need to drop the event, - // and there should not be metrics from dropped events. - // In processing relays, always extract metrics to avoid losing them. - let supported_generic_filters = global_config.filters.is_ok() - && relay_filter::are_generic_filters_supported( - global_config.filters().map(|f| f.version), - project_info.config.filter_settings.generic.version, - ); - if supported_generic_filters { - Ok(FiltersStatus::Ok) - } else { - Ok(FiltersStatus::Unsupported) - } -} - /// Apply data privacy rules to the event payload. /// /// This uses both the general `datascrubbing_settings`, as well as the the PII rules. diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index f0d85ef1b4..a86512edaf 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -9,10 +9,11 @@ use crate::processing::utils::event::extract_transaction_span; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{ EventMetricsExtracted, ProcessingError, ProcessingExtractedMetrics, SpanGroup, SpansExtracted, - TransactionGroup, dynamic_sampling, event_type, + TransactionGroup, event_type, }; use crate::services::projects::project::ProjectInfo; use crate::statsd::RelayCounters; +use crate::utils::SamplingResult; use crate::{processing, utils}; use chrono::{DateTime, Utc}; use relay_base_schema::events::EventType; @@ -39,7 +40,6 @@ use relay_metrics::{FractionUnit, MetricNamespace, MetricUnit, UnixTimestamp}; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty, Value}; use relay_quotas::DataCategory; -use relay_sampling::evaluation::ReservoirEvaluator; #[derive(thiserror::Error, Debug)] enum ValidationError { @@ -59,7 +59,6 @@ enum ValidationError { MissingExclusiveTime, } -#[allow(clippy::too_many_arguments)] pub async fn process( managed_envelope: &mut TypedEnvelope, event: &mut Annotated, @@ -67,21 +66,25 @@ pub async fn process( project_id: ProjectId, ctx: processing::Context<'_>, geo_lookup: &GeoIpLookup, - reservoir_counters: &ReservoirEvaluator<'_>, ) { use relay_event_normalization::RemoveOtherProcessor; - // We only implement trace-based sampling rules for now, which can be computed - // once for all spans in the envelope. - let sampling_result = dynamic_sampling::run( - managed_envelope, - event, - ctx.config, - ctx.project_info, - ctx.sampling_project_info, - reservoir_counters, - ) - .await; + // If no metrics could be extracted, do not sample anything. + let should_sample = matches!(&ctx.project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()); + let sampling_result = match should_sample { + true => { + // We only implement trace-based sampling rules for now, which can be computed + // once for all spans in the envelope. + processing::utils::dynamic_sampling::run( + managed_envelope.envelope().headers().dsc(), + event, + &ctx, + None, + ) + .await + } + false => SamplingResult::Pending, + }; relay_statsd::metric!( counter(RelayCounters::SamplingDecision) += 1, diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 16cc3f1e66..4e751ca893 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -3,12 +3,9 @@ use std::sync::Arc; use bytes::Bytes; use relay_cogs::Cogs; use relay_config::Config; -use relay_dynamic_config::ErrorBoundary; use relay_event_schema::protocol::EventId; -use relay_protocol::RuleCondition; -use relay_sampling::config::{DecayingFunction, RuleId, RuleType, SamplingRule, SamplingValue}; -use relay_sampling::{DynamicSamplingContext, SamplingConfig}; +use relay_sampling::DynamicSamplingContext; use relay_system::Addr; #[cfg(feature = "processing")] use relay_system::Service; @@ -24,34 +21,8 @@ use crate::services::global_config::GlobalConfigHandle; use crate::services::global_rate_limits::GlobalRateLimitsService; use crate::services::processor::{self, EnvelopeProcessorService, EnvelopeProcessorServicePool}; use crate::services::projects::cache::ProjectCacheHandle; -use crate::services::projects::project::ProjectInfo; use crate::utils::ThreadPoolBuilder; -pub fn state_with_rule_and_condition( - sample_rate: Option, - rule_type: RuleType, - condition: RuleCondition, -) -> ProjectInfo { - let rules = match sample_rate { - Some(sample_rate) => vec![SamplingRule { - condition, - sampling_value: SamplingValue::SampleRate { value: sample_rate }, - ty: rule_type, - id: RuleId(1), - time_range: Default::default(), - decaying_fn: DecayingFunction::Constant, - }], - None => Vec::new(), - }; - - let mut state = ProjectInfo::default(); - state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig { - rules, - ..SamplingConfig::new() - })); - state -} - pub fn create_sampling_context(sample_rate: Option) -> DynamicSamplingContext { DynamicSamplingContext { trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),