diff --git a/changelog.d/24025_opentelemetry_instrumentation_scope_partitioning.feature.md b/changelog.d/24025_opentelemetry_instrumentation_scope_partitioning.feature.md new file mode 100644 index 0000000000000..346d153376dd2 --- /dev/null +++ b/changelog.d/24025_opentelemetry_instrumentation_scope_partitioning.feature.md @@ -0,0 +1,3 @@ +The `opentelemetry` sink now supports an `instrumentation_scope` partitioning strategy that significantly improves batching and performance for OTLP data. This new strategy groups events by their InstrumentationScope (name + version) instead of URI and headers, allowing multiple ResourceLogs/ResourceMetrics/ResourceSpans with the same instrumentation scope to be batched together efficiently. This addresses poor batching efficiency when all events target the same endpoint, reducing request overhead and improving throughput. + +authors: Sambhram1 diff --git a/src/sinks/opentelemetry/config.rs b/src/sinks/opentelemetry/config.rs new file mode 100644 index 0000000000000..54469bd4f258d --- /dev/null +++ b/src/sinks/opentelemetry/config.rs @@ -0,0 +1,259 @@ +//! Configuration for the OpenTelemetry sink with custom partitioning strategies. + +use std::collections::BTreeMap; + +use http::StatusCode; +use hyper::Body; +use vector_config::configurable_component; +use vector_lib::codecs::encoding::{Framer, Serializer}; + +use super::sink::OpenTelemetrySink; +use crate::{ + codecs::EncodingConfigWithFraming, + http::{Auth, HttpClient, MaybeAuth}, + sinks::{ + http::{ + config::{validate_headers, validate_payload_wrapper, HttpMethod, HttpSinkConfig}, + encoder::HttpEncoder, + request_builder::HttpRequestBuilder, + service::{HttpService, HttpSinkRequestBuilder}, + }, + prelude::*, + util::{ + http::{http_response_retry_logic, OrderedHeaderName, RequestConfig}, + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + }, + }, +}; + +/// Partitioning strategy for OpenTelemetry events. +/// +/// This determines how events are grouped into batches for transmission. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PartitionStrategy { + /// Partition by URI and headers. + /// + /// This is the legacy behavior that partitions events based on the + /// templated URI and headers. This can lead to poor batching for OTLP + /// data where all events typically go to the same endpoint. + #[default] + UriHeaders, + + /// Partition by InstrumentationScope. + /// + /// Groups events by their OTLP InstrumentationScope (name + version). + /// This allows multiple ResourceLogs/ResourceMetrics/ResourceSpans with + /// the same instrumentation scope to be batched together efficiently, + /// improving throughput and reducing request overhead. + /// + /// This is the recommended strategy for OTLP data. + InstrumentationScope, +} + +/// Configuration options specific to the OpenTelemetry sink. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct OpenTelemetryOptions { + /// The partitioning strategy for batching events. + /// + /// This determines how events are grouped into batches before transmission. + /// Using `instrumentation_scope` can significantly improve batching efficiency + /// for OTLP data. + #[serde(default)] + #[configurable(metadata(docs::examples = "instrumentation_scope"))] + pub partition_strategy: PartitionStrategy, +} + +impl Default for OpenTelemetryOptions { + fn default() -> Self { + Self { + partition_strategy: PartitionStrategy::InstrumentationScope, + } + } +} + +/// Build an OpenTelemetry sink from HTTP sink configuration with custom partitioning. +pub async fn build_opentelemetry_sink( + http_config: &HttpSinkConfig, + opentelemetry_options: &OpenTelemetryOptions, + cx: SinkContext, +) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = http_config.batch.validate()?.into_batcher_settings()?; + + let encoder = http_config.build_encoder()?; + let transformer = http_config.encoding.transformer(); + + let mut request = http_config.request.clone(); + request.add_old_option(http_config.headers.clone()); + + validate_headers(&request.headers, http_config.auth.is_some())?; + let (static_headers, template_headers) = request.split_headers(); + + let (payload_prefix, payload_suffix) = validate_payload_wrapper( + &http_config.payload_prefix, + &http_config.payload_suffix, + &encoder, + )?; + + let client = build_http_client(http_config, &cx)?; + + let healthcheck = match cx.healthcheck.uri { + Some(healthcheck_uri) => { + healthcheck(healthcheck_uri, http_config.auth.clone(), client.clone()).boxed() + } + None => future::ok(()).boxed(), + }; + + let content_type = determine_content_type(&encoder); + + let request_builder = HttpRequestBuilder { + encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix), + compression: http_config.compression, + }; + + let content_encoding = http_config.compression.is_compressed().then(|| { + http_config + .compression + .content_encoding() + .expect("Encoding should be specified for compression.") + .to_string() + }); + + let converted_static_headers = convert_headers(static_headers)?; + + let http_sink_request_builder = HttpSinkRequestBuilder::new( + http_config.method, + http_config.auth.clone(), + converted_static_headers, + content_type, + content_encoding, + ); + + let service = build_service(http_config, client, http_sink_request_builder).await?; + + let request_limits = http_config.request.tower.into_settings(); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = OpenTelemetrySink::new( + service, + http_config.uri.clone(), + template_headers, + batch_settings, + request_builder, + opentelemetry_options.partition_strategy, + ); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) +} + +fn build_http_client(config: &HttpSinkConfig, cx: &SinkContext) -> crate::Result { + let tls = TlsSettings::from_options(config.tls.as_ref())?; + Ok(HttpClient::new(tls, cx.proxy())?) +} + +async fn healthcheck(uri: UriSerde, auth: Option, client: HttpClient) -> crate::Result<()> { + let auth = auth.choose_one(&uri.auth)?; + let uri = uri.with_default_parts(); + let mut request = http::Request::head(&uri.uri) + .body(Body::empty()) + .unwrap(); + + if let Some(auth) = auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => Ok(()), + status => Err(HealthcheckError::UnexpectedStatus { status }.into()), + } +} + +fn determine_content_type(encoder: &Encoder) -> Option { + use Framer::*; + use Serializer::*; + use vector_lib::codecs::CharacterDelimitedEncoder; + + match (encoder.serializer(), encoder.framer()) { + (RawMessage(_) | Text(_), _) => Some("text/plain".to_owned()), + (Json(_), NewlineDelimited(_)) => Some("application/x-ndjson".to_owned()), + (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { + Some("application/json".to_owned()) + } + #[cfg(feature = "codecs-opentelemetry")] + (Otlp(_), _) => Some("application/x-protobuf".to_owned()), + _ => None, + } +} + +fn convert_headers( + static_headers: BTreeMap, +) -> crate::Result> { + static_headers + .into_iter() + .map(|(name, value)| -> crate::Result<_> { + let header_name = http::HeaderName::from_bytes(name.as_bytes()) + .map(OrderedHeaderName::from)?; + let header_value = http::HeaderValue::try_from(value)?; + Ok((header_name, header_value)) + }) + .collect::, _>>() +} + +#[cfg(feature = "aws-core")] +async fn build_service( + config: &HttpSinkConfig, + client: HttpClient, + http_sink_request_builder: HttpSinkRequestBuilder, +) -> crate::Result, Response = http::Response, Error = crate::Error>> +{ + use crate::{aws::AwsAuthentication, sinks::util::http::SigV4Config}; + use aws_config::meta::region::ProvideRegion; + use aws_types::region::Region; + use vector_lib::config::proxy::ProxyConfig; + + match &config.auth { + Some(Auth::Aws { auth, service }) => { + let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)? + .region() + .await; + let region = (match &auth { + AwsAuthentication::AccessKey { region, .. } => region.clone(), + AwsAuthentication::File { .. } => None, + AwsAuthentication::Role { region, .. } => region.clone(), + AwsAuthentication::Default { region, .. } => region.clone(), + }) + .map_or(default_region, |r| Some(Region::new(r.to_string()))) + .expect("Region must be specified"); + + Ok(HttpService::new_with_sig_v4( + client, + http_sink_request_builder, + SigV4Config { + shared_credentials_provider: auth + .credentials_provider(region.clone(), &ProxyConfig::default(), None) + .await?, + region: region.clone(), + service: service.clone(), + }, + )) + } + _ => Ok(HttpService::new(client, http_sink_request_builder)), + } +} + +#[cfg(not(feature = "aws-core"))] +async fn build_service( + _config: &HttpSinkConfig, + client: HttpClient, + http_sink_request_builder: HttpSinkRequestBuilder, +) -> crate::Result, Response = http::Response, Error = crate::Error>> +{ + Ok(HttpService::new(client, http_sink_request_builder)) +} diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 88963f8603cde..3e63ce09044ce 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -1,3 +1,6 @@ +mod config; +mod sink; + use indoc::indoc; use vector_config::component::GenerateConfig; use vector_lib::{ @@ -16,6 +19,7 @@ use crate::{ http::config::{HttpMethod, HttpSinkConfig}, }, }; +use config::OpenTelemetryOptions; /// Configuration for the `OpenTelemetry` sink. #[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))] @@ -24,6 +28,11 @@ pub struct OpenTelemetryConfig { /// Protocol configuration #[configurable(derived)] protocol: Protocol, + + /// OpenTelemetry-specific options + #[serde(default)] + #[configurable(derived)] + opentelemetry: OpenTelemetryOptions, } /// The protocol used to send data to OpenTelemetry. @@ -78,7 +87,9 @@ impl GenerateConfig for OpenTelemetryConfig { impl SinkConfig for OpenTelemetryConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { match &self.protocol { - Protocol::Http(config) => config.build(cx).await, + Protocol::Http(config) => { + config::build_opentelemetry_sink(config, &self.opentelemetry, cx).await + } } } diff --git a/src/sinks/opentelemetry/sink.rs b/src/sinks/opentelemetry/sink.rs new file mode 100644 index 0000000000000..3342fc3a98ffe --- /dev/null +++ b/src/sinks/opentelemetry/sink.rs @@ -0,0 +1,273 @@ +//! Implementation of the OpenTelemetry sink with custom partitioning strategies. + +use std::collections::BTreeMap; + +use super::config::PartitionStrategy; +use crate::sinks::{ + http::{ + batch::HttpBatchSizer, request_builder::HttpRequestBuilder, sink::PartitionKey as HttpPartitionKey, + }, + prelude::*, + util::http::HttpRequest, +}; + +pub(super) struct OpenTelemetrySink { + service: S, + uri: Template, + headers: BTreeMap, + batch_settings: BatcherSettings, + request_builder: HttpRequestBuilder, + partition_strategy: PartitionStrategy, +} + +impl OpenTelemetrySink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `OpenTelemetrySink`. + pub(super) const fn new( + service: S, + uri: Template, + headers: BTreeMap, + batch_settings: BatcherSettings, + request_builder: HttpRequestBuilder, + partition_strategy: PartitionStrategy, + ) -> Self { + Self { + service, + uri, + headers, + batch_settings, + request_builder, + partition_strategy, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_sizer = HttpBatchSizer { + encoder: self.request_builder.encoder.encoder.clone(), + }; + + let partitioner = OtelKeyPartitioner::new( + self.uri, + self.headers, + self.partition_strategy, + ); + + input + // Batch the input stream with size calculation based on the configured codec + .batched_partitioned(partitioner, || { + self.batch_settings.as_item_size_config(batch_sizer.clone()) + }) + .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) + // Build requests with default concurrency limit. + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for OpenTelemetrySink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// Partition key for OpenTelemetry events. +/// +/// This key type supports multiple partitioning strategies: +/// - `UriHeaders`: Partitions by URI and headers (legacy HTTP sink behavior) +/// - `InstrumentationScope`: Partitions by OTLP InstrumentationScope (name + version) +#[derive(Eq, PartialEq, Clone, Debug, Hash)] +pub enum PartitionKey { + /// Partition by URI and headers (legacy behavior) + UriHeaders(HttpPartitionKey), + /// Partition by InstrumentationScope (name + version) + InstrumentationScope { + uri: String, + headers: BTreeMap, + scope_name: String, + scope_version: String, + }, +} + +impl PartitionKey { + /// Get the URI from the partition key + pub fn uri(&self) -> &str { + match self { + PartitionKey::UriHeaders(key) => &key.uri, + PartitionKey::InstrumentationScope { uri, .. } => uri, + } + } + + /// Get the headers from the partition key + pub fn headers(&self) -> &BTreeMap { + match self { + PartitionKey::UriHeaders(key) => &key.headers, + PartitionKey::InstrumentationScope { headers, .. } => headers, + } + } +} + +// Implement conversion to HttpPartitionKey for compatibility with HTTP request builder +impl From for HttpPartitionKey { + fn from(key: PartitionKey) -> Self { + match key { + PartitionKey::UriHeaders(http_key) => http_key, + PartitionKey::InstrumentationScope { uri, headers, .. } => { + HttpPartitionKey { uri, headers } + } + } + } +} + +struct OtelKeyPartitioner { + uri: Template, + headers: BTreeMap, + strategy: PartitionStrategy, +} + +impl OtelKeyPartitioner { + const fn new( + uri: Template, + headers: BTreeMap, + strategy: PartitionStrategy, + ) -> Self { + Self { + uri, + headers, + strategy, + } + } + + fn extract_scope_info(&self, event: &Event) -> Option<(String, String)> { + // Extract instrumentation scope from event metadata + // The scope is stored at metadata path: opentelemetry.scope.name and opentelemetry.scope.version + match event { + Event::Log(log) => { + let scope_name = log + .metadata() + .value() + .get(path!("opentelemetry", "scope", "name")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let scope_version = log + .metadata() + .value() + .get(path!("opentelemetry", "scope", "version")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + Some((scope_name, scope_version)) + } + Event::Trace(trace) => { + let scope_name = trace + .metadata() + .value() + .get(path!("opentelemetry", "scope", "name")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let scope_version = trace + .metadata() + .value() + .get(path!("opentelemetry", "scope", "version")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + Some((scope_name, scope_version)) + } + Event::Metric(_) => { + // For metrics, check tags + // OTLP metrics store scope in tags as "scope.name" and "scope.version" + None // Will be handled separately for metrics + } + } + } +} + +impl Partitioner for OtelKeyPartitioner { + type Item = Event; + type Key = Option; + + fn partition(&self, event: &Event) -> Self::Key { + // First, render URI and headers + let uri = self + .uri + .render_string(event) + .map_err(|error| { + emit!(TemplateRenderingError { + error, + field: Some("uri"), + drop_event: true, + }); + }) + .ok()?; + + let mut headers = BTreeMap::new(); + for (name, template) in &self.headers { + let value = template + .render_string(event) + .map_err(|error| { + emit!(TemplateRenderingError { + error, + field: Some("headers"), + drop_event: true, + }); + }) + .ok()?; + headers.insert(name.clone(), value); + } + + // Choose partition strategy + match self.strategy { + PartitionStrategy::UriHeaders => Some(PartitionKey::UriHeaders(HttpPartitionKey { + uri, + headers, + })), + PartitionStrategy::InstrumentationScope => { + let (scope_name, scope_version) = self.extract_scope_info(event)?; + Some(PartitionKey::InstrumentationScope { + uri, + headers, + scope_name, + scope_version, + }) + } + } + } +}