diff --git a/gemfiles/standard.rb b/gemfiles/standard.rb index c8065b3a1b..0d534f78d8 100644 --- a/gemfiles/standard.rb +++ b/gemfiles/standard.rb @@ -4,6 +4,7 @@ def standard_dependencies gem 'yard', '>= 0.9.35' gem 'ffi' + gem 'opentelemetry-sdk' group :development, :testing do gem 'jruby-openssl', platforms: :jruby diff --git a/lib/mongo.rb b/lib/mongo.rb index c866ad1a9e..0e62e92415 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -33,6 +33,7 @@ autoload :CGI, 'cgi' require 'bson' +require 'opentelemetry-api' require 'mongo/id' require 'mongo/bson' @@ -74,6 +75,7 @@ require 'mongo/socket' require 'mongo/srv' require 'mongo/timeout' +require 'mongo/tracing' require 'mongo/uri' require 'mongo/version' require 'mongo/write_concern' diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index 70f5768628..904524c360 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -112,6 +112,7 @@ class Client :ssl_verify_hostname, :ssl_verify_ocsp_endpoint, :timeout_ms, + :tracing, :truncate_logs, :user, :wait_queue_timeout, @@ -437,6 +438,20 @@ def hash # See Ruby's Zlib module for valid levels. # @option options [ Hash ] :resolv_options For internal driver use only. # Options to pass through to Resolv::DNS constructor for SRV lookups. + # @option options [ Hash ] :tracing OpenTelemetry tracing options. + # - :enabled => Boolean, whether to enable OpenTelemetry tracing. The default + # value is nil that means that the configuration will be taken from the + # OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED environment variable. + # - :tracer => OpenTelemetry::Trace::Tracer, the tracer to use for + # tracing. Must be an implementation of OpenTelemetry::Trace::Tracer + # interface. + # - :query_text_max_length => Integer, the maximum length of the query text + # to be included in the span attributes. If the query text exceeds this + # length, it will be truncated. Value 0 means no query text + # will be included in the span attributes. The default value is nil that + # means that the configuration will be taken from the + # OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH environment + # variable. # @option options [ Hash ] :auto_encryption_options Auto-encryption related # options. # - :key_vault_client => Client | nil, a client connected to the MongoDB @@ -574,8 +589,12 @@ def initialize(addresses_or_uri, options = nil) @connect_lock = Mutex.new @connect_lock.synchronize do - @cluster = Cluster.new(addresses, @monitoring, - cluster_options.merge(srv_uri: srv_uri)) + @cluster = Cluster.new( + addresses, + @monitoring, + tracer, + cluster_options.merge(srv_uri: srv_uri) + ) end begin @@ -1195,6 +1214,15 @@ def timeout_sec end end + def tracer + tracing_opts = @options[:tracing] || {} + @tracer ||= Tracing.create_tracer( + enabled: tracing_opts[:enabled], + query_text_max_length: tracing_opts[:query_text_max_length], + otel_tracer: tracing_opts[:tracer], + ) + end + private # Attempts to parse the given list of addresses, using the provided options. diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 46e9f556f1..30a7e56583 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -117,7 +117,7 @@ class Cluster # - *:deprecation_errors* -- boolean # # @since 2.0.0 - def initialize(seeds, monitoring, options = Options::Redacted.new) + def initialize(seeds, monitoring, tracer = nil, options = Options::Redacted.new) if seeds.nil? raise ArgumentError, 'Seeds cannot be nil' end @@ -136,6 +136,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) @update_lock = Mutex.new @servers = [] @monitoring = monitoring + @tracer = tracer @event_listeners = Event::Listeners.new @app_metadata = Server::AppMetadata.new(@options.merge(purpose: :application)) @monitor_app_metadata = Server::Monitor::AppMetadata.new(@options.merge(purpose: :monitor)) @@ -309,6 +310,8 @@ def self.create(client, monitoring: nil) # @return [ Monitoring ] monitoring The monitoring. attr_reader :monitoring + attr_reader :tracer + # @return [ Object ] The cluster topology. attr_reader :topology diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index b9cbefee0c..dee8a85da6 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -57,6 +57,8 @@ class Collection # Delegate to the cluster for the next primary. def_delegators :cluster, :next_primary + def_delegators :client, :tracer + # Options that can be updated on a new Collection instance via the #with method. # # @since 2.1.0 @@ -865,19 +867,22 @@ def insert_one(document, opts = {}) session: session, operation_timeouts: operation_timeouts(opts) ) - write_with_retry(write_concern, context: context) do |connection, txn_num, context| - Operation::Insert.new( - :documents => [ document ], - :db_name => database.name, - :coll_name => name, - :write_concern => write_concern, - :bypass_document_validation => !!opts[:bypass_document_validation], - :options => opts, - :id_generator => client.options[:id_generator], - :session => session, - :txn_num => txn_num, - :comment => opts[:comment] - ).execute_with_connection(connection, context: context) + operation = Operation::Insert.new( + :documents => [ document ], + :db_name => database.name, + :coll_name => name, + :write_concern => write_concern, + :bypass_document_validation => !!opts[:bypass_document_validation], + :options => opts, + :id_generator => client.options[:id_generator], + :session => session, + :comment => opts[:comment] + ) + tracer.trace_operation(operation, context) do + write_with_retry(write_concern, context: context) do |connection, txn_num, context| + operation.txn_num = txn_num + operation.execute_with_connection(connection, context: context) + end end end end diff --git a/lib/mongo/collection/view.rb b/lib/mongo/collection/view.rb index fc33d85b75..e7c221f0a8 100644 --- a/lib/mongo/collection/view.rb +++ b/lib/mongo/collection/view.rb @@ -72,6 +72,8 @@ class View # Delegate to the cluster for the next primary. def_delegators :cluster, :next_primary + def_delegators :client, :tracer + alias :selector :filter # @return [ Integer | nil | The timeout_ms value that was passed as an diff --git a/lib/mongo/collection/view/aggregation.rb b/lib/mongo/collection/view/aggregation.rb index f80a4f491b..5c263faeda 100644 --- a/lib/mongo/collection/view/aggregation.rb +++ b/lib/mongo/collection/view/aggregation.rb @@ -25,11 +25,14 @@ class View # # @since 2.0.0 class Aggregation + extend Forwardable include Behavior # @return [ Array ] pipeline The aggregation pipeline. attr_reader :pipeline + def_delegators :view, :tracer + # Initialize the aggregation for the provided collection view, pipeline # and options. # @@ -80,7 +83,7 @@ def new(options) Aggregation.new(view, pipeline, options) end - def initial_query_op(session, read_preference) + def initial_query_op(session, read_preference = nil) Operation::Aggregate.new(aggregate_spec(session, read_preference)) end diff --git a/lib/mongo/collection/view/aggregation/behavior.rb b/lib/mongo/collection/view/aggregation/behavior.rb index 349b82e4bc..db881106a7 100644 --- a/lib/mongo/collection/view/aggregation/behavior.rb +++ b/lib/mongo/collection/view/aggregation/behavior.rb @@ -88,7 +88,7 @@ def server_selector @view.send(:server_selector) end - def aggregate_spec(session, read_preference) + def aggregate_spec(session, read_preference = nil) Builder::Aggregation.new( pipeline, view, diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 99133c5e9f..0f88e8eec7 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -88,19 +88,21 @@ def select_cursor(session) operation_timeouts: operation_timeouts, view: self ) - - if respond_to?(:write?, true) && write? - server = server_selector.select_server(cluster, nil, session, write_aggregation: true) - result = send_initial_query(server, context) - - if use_query_cache? - CachingCursor.new(view, result, server, session: session, context: context) + op = initial_query_op(session) + tracer.trace_operation(op, context) do + if respond_to?(:write?, true) && write? + server = server_selector.select_server(cluster, nil, session, write_aggregation: true) + result = send_initial_query(server, context) + + if use_query_cache? + CachingCursor.new(view, result, server, session: session, context: context) + else + Cursor.new(view, result, server, session: session, context: context) + end else - Cursor.new(view, result, server, session: session, context: context) - end - else - read_with_retry_cursor(session, server_selector, view, context: context) do |server| - send_initial_query(server, context) + read_with_retry_cursor(session, server_selector, view, context: context) do |server| + send_initial_query(server, context) + end end end end diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 39b299ef76..4f5acec971 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -34,8 +34,10 @@ class OpMsg < OpMsgBase private def get_result(connection, context, options = {}) - # This is a Mongo::Operation::Insert::Result - Result.new(*dispatch_message(connection, context), @ids, context: context) + message = build_message(connection, context) + connection.tracer.trace_command(message, context, connection) do + Result.new(*dispatch_message(message, connection, context), @ids, context: context) + end end def selector(connection) diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 041e4d1e5b..85865c989a 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -104,12 +104,14 @@ def result_class end def get_result(connection, context, options = {}) - result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection) + message = build_message(connection, context) + connection.tracer.trace_command(message, context, connection) do + result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection) + end end # Returns a Protocol::Message or nil as reply. - def dispatch_message(connection, context, options = {}) - message = build_message(connection, context) + def dispatch_message(message, connection, context, options = {}) message = message.maybe_encrypt(connection, context) reply = connection.dispatch([ message ], context, options) [reply, connection.description, connection.global_id] diff --git a/lib/mongo/operation/shared/specifiable.rb b/lib/mongo/operation/shared/specifiable.rb index afc799f46e..dd2e37369e 100644 --- a/lib/mongo/operation/shared/specifiable.rb +++ b/lib/mongo/operation/shared/specifiable.rb @@ -233,7 +233,7 @@ def documents # # @since 2.0.0 def coll_name - spec.fetch(COLL_NAME) + spec[COLL_NAME] end # The id of the cursor created on the server. @@ -526,6 +526,10 @@ def txn_num @spec[:txn_num] end + def txn_num=(num) + @spec[:txn_num] = num + end + # The command. # # @return [ Hash ] The command. diff --git a/lib/mongo/server.rb b/lib/mongo/server.rb index c00285034e..edf2eb3b4f 100644 --- a/lib/mongo/server.rb +++ b/lib/mongo/server.rb @@ -218,7 +218,8 @@ def compressor # @api private def_delegators :cluster, :monitor_app_metadata, - :push_monitor_app_metadata + :push_monitor_app_metadata, + :tracer def_delegators :features, :check_driver_support! diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index f9874764cf..4c0024c529 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -139,6 +139,8 @@ def initialize(server, options = {}) # across all connections. attr_reader :global_id + def_delegators :server, :tracer + # The connection pool from which this connection was created. # May be nil. # @@ -388,6 +390,17 @@ def record_checkin! self end + def transport + return nil if @socket.nil? + + case @socket + when Mongo::Socket::Unix + :unix + else + :tcp + end + end + private def deliver(message, client, options = {}) diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index be9c1f2a42..f9cf3de279 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -130,6 +130,8 @@ def snapshot? # @since 2.5.0 attr_reader :operation_time + def_delegators :client, :tracer + # Sets the dirty state to the given value for the underlying server # session. If there is no server session, this does nothing. # diff --git a/lib/mongo/tracing.rb b/lib/mongo/tracing.rb new file mode 100644 index 0000000000..25a0e7eeac --- /dev/null +++ b/lib/mongo/tracing.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Mongo + module Tracing + def create_tracer(enabled: nil, query_text_max_length: nil, otel_tracer: nil) + OpenTelemetry::Tracer.new( + enabled: enabled, + query_text_max_length: query_text_max_length, + otel_tracer: otel_tracer, + ) + end + module_function :create_tracer + end +end + +require 'mongo/tracing/open_telemetry' diff --git a/lib/mongo/tracing/open_telemetry.rb b/lib/mongo/tracing/open_telemetry.rb new file mode 100644 index 0000000000..702d2885c4 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + end + end +end + +require 'mongo/tracing/open_telemetry/command_tracer' +require 'mongo/tracing/open_telemetry/operation_tracer' +require 'mongo/tracing/open_telemetry/tracer' diff --git a/lib/mongo/tracing/open_telemetry/command_tracer.rb b/lib/mongo/tracing/open_telemetry/command_tracer.rb new file mode 100644 index 0000000000..9157c97b78 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/command_tracer.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + # CommandTracer is responsible for tracing MongoDB server commands using OpenTelemetry. + # + # @api private + class CommandTracer + extend Forwardable + + def_delegators :@parent_tracer, + :cursor_context_map, + :parent_context_for, + :transaction_context_map, + :transaction_map_key + + def initialize(otel_tracer, parent_tracer, query_text_max_length: 0) + @otel_tracer = otel_tracer + @parent_tracer = parent_tracer + @query_text_max_length = query_text_max_length + end + + def trace_command(message, operation_context, connection) + parent_context = parent_context_for(operation_context, cursor_id(message)) + span = @otel_tracer.start_span( + command_span_name(message), + attributes: span_attributes(message, connection), + with_parent: parent_context, + kind: :client + ) + ::OpenTelemetry::Trace.with_span(span) do |s, c| + # TODO: process cursor context if applicable + yield.tap do |result| + process_cursor_context(result, cursor_id(message), c, s) + end + end + rescue Exception => e + span&.record_exception(e) + span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + span&.finish + end + + private + + def span_attributes(message, connection) + { + 'db.system' => 'mongodb', + 'db.namespace' => database(message), + 'db.collection.name' => collection_name(message), + 'db.command.name' => command_name(message), + 'db.query.summary' => command_span_name(message), + 'server.port' => connection.address.port, + 'server.address' => connection.address.host, + 'network.transport' => connection.transport.to_s, + 'db.mongodb.server_connection_id' => connection.server.description.server_connection_id, + 'db.mongodb.driver_connection_id' => connection.id, + 'db.mongodb.cursor_id' => cursor_id(message), + 'db.query.text' => query_text(message) + }.compact + end + + def process_cursor_context(result, cursor_id, context, span) + if result.has_cursor_id? && result.cursor_id.positive? + span.set_attribute('db.mongodb.cursor_id', result.cursor_id) + end + end + + def command_span_name(message) + if (coll_name = collection_name(message)) + "#{command_name(message)} #{database(message)}.#{coll_name}" + else + "#{command_name(message)} #{database(message)}" + end + end + + def collection_name(message) + case message.documents.first.keys.first + when 'getMore' + message.documents.first['collection'].to_s + else + message.documents.first.values.first.to_s + end + end + + def command_name(message) + message.documents.first.keys.first.to_s + end + + def database(message) + message.documents.first['$db'].to_s + end + + def query_text? + @query_text_max_length.positive? + end + + def cursor_id(message) + if command_name(message) == 'getMore' + message.documents.first['getMore'].value + end + end + + EXCLUDED_KEYS = %w[lsid $db $clusterTime signature].freeze + ELLIPSES = '...' + + def query_text(message) + return unless query_text? + + text = message + .documents + .first + .reject { |key, _| EXCLUDED_KEYS.include?(key) } + .to_json + if text.length > @query_text_max_length + "#{text[0...@query_text_max_length]}#{ELLIPSES}" + else + text + end + end + end + end + end +end diff --git a/lib/mongo/tracing/open_telemetry/operation_tracer.rb b/lib/mongo/tracing/open_telemetry/operation_tracer.rb new file mode 100644 index 0000000000..1db2b68a73 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/operation_tracer.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + # OperationTracer is responsible for tracing MongoDB driver operations using OpenTelemetry. + # + # @api private + class OperationTracer + extend Forwardable + + def_delegators :@parent_tracer, + :cursor_context_map, + :parent_context_for, + :transaction_context_map, + :transaction_map_key + + def initialize(otel_tracer, parent_tracer) + @otel_tracer = otel_tracer + @parent_tracer = parent_tracer + end + + def trace_operation(operation, operation_context) + parent_context = parent_context_for(operation_context, operation.cursor_id) + span = @otel_tracer.start_span( + operation_span_name(operation), + attributes: span_attributes(operation), + with_parent: parent_context, + kind: :client + ) + ::OpenTelemetry::Trace.with_span(span) do |s, c| + yield.tap do |result| + process_cursor_context(result, operation.cursor_id, c, s) + end + end + rescue Exception => e + span&.record_exception(e) + span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + span&.finish + end + + private + + def operation_name(operation) + operation.class.name.split('::').last.downcase + end + + def span_attributes(operation) + { + 'db.system' => 'mongodb', + 'db.namespace' => operation.db_name.to_s, + 'db.collection.name' => operation.coll_name.to_s, + 'db.operation.name' => operation_name(operation), + 'db.operation.summary' => operation_span_name(operation), + 'db.mongodb.cursor_id' => operation.cursor_id, + }.compact + end + + def process_cursor_context(result, cursor_id, context, span) + return unless result.is_a?(Cursor) + + if result.id.zero? + # If the cursor is closed, remove it from the context map. + cursor_context_map.delete(cursor_id) + elsif result.id && cursor_id.nil? + # New cursor created, store its context. + cursor_context_map[result.id] = context + span.set_attribute('db.mongodb.cursor_id', result.id) + end + end + + def operation_span_name(operation) + if operation.coll_name + "#{operation_name(operation)} #{operation.db_name}.#{operation.coll_name}" + else + "#{operation_name(operation)} #{operation.db_name}" + end + end + end + end + end +end diff --git a/lib/mongo/tracing/open_telemetry/tracer.rb b/lib/mongo/tracing/open_telemetry/tracer.rb new file mode 100644 index 0000000000..f9b61bb7f8 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/tracer.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + class Tracer + # @return [ OpenTelemetry::Trace::Tracer ] the OpenTelemetry tracer implementation + # used to create spans for MongoDB operations and commands. + # + # @api private + attr_reader :otel_tracer + + # Initializes a new OpenTelemetry tracer. + # + # @param enabled [ Boolean | nil ] whether OpenTelemetry is enabled or not. + # If nil, it will check the environment variable + # OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED. + # @param otel_tracer [ OpenTelemetry::Trace::Tracer | nil ] the OpenTelemetry tracer + # implementation to use. If nil, it will use the default tracer from + # OpenTelemetry's tracer provider. + def initialize(enabled: nil, query_text_max_length: nil, otel_tracer: nil) + @enabled = if enabled.nil? + %w[true 1 yes].include?(ENV['OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED']&.downcase) + else + enabled + end + @query_text_max_length = if query_text_max_length.nil? + ENV['OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH'].to_i + else + query_text_max_length + end + @otel_tracer = otel_tracer || initialize_tracer + @operation_tracer = OperationTracer.new(@otel_tracer, self) + @command_tracer = CommandTracer.new(@otel_tracer, self, query_text_max_length: @query_text_max_length) + end + + # Whether OpenTelemetry is enabled or not. + # + # # @return [Boolean] true if OpenTelemetry is enabled, false otherwise. + def enabled? + @enabled + end + + def trace_operation(operation, operation_context, &block) + return yield unless enabled? + + @operation_tracer.trace_operation(operation, operation_context, &block) + end + + def trace_command(message, operation_context, connection, &block) + return yield unless enabled? + + @command_tracer.trace_command(message, operation_context, connection, &block) + end + + def cursor_context_map + @cursor_context_map ||= {} + end + + def cursor_map_key(session, cursor_id) + return if cursor_id.nil? || session.nil? + + "#{session.session_id['id'].to_uuid}-#{cursor_id}" + end + + def parent_context_for(operation_context, cursor_id) + if (key = transaction_map_key(operation_context.session)) + transaction_context_map[key] + elsif (_key = cursor_map_key(operation_context.session, cursor_id)) + # We return nil here unless we decide how to nest cursor operations. + nil + # cursor_context_map[key] + end + end + + def transaction_context_map + @transaction_context_map ||= {} + end + + def transaction_map_key(session) + return if session.nil? || session.implicit? || !session.in_transaction? + + "#{session.session_id['id'].to_uuid}-#{session.txn_num}" + end + + private + + def initialize_tracer + if enabled? + # Obtain the proper tracer from OpenTelemetry's tracer provider. + ::OpenTelemetry.tracer_provider.tracer( + 'mongo-ruby-driver', + Mongo::VERSION + ) + else + # No-op tracer when OpenTelemetry is not enabled. + ::OpenTelemetry::Trace::Tracer.new + end + end + end + end + end +end diff --git a/spec/lite_spec_helper.rb b/spec/lite_spec_helper.rb index 486d9c4235..ddd8b25efd 100644 --- a/spec/lite_spec_helper.rb +++ b/spec/lite_spec_helper.rb @@ -94,6 +94,7 @@ module Mrss require 'support/json_ext_formatter' require 'support/sdam_formatter_integration' require 'support/background_thread_registry' +require 'support/tracing' require 'mrss/session_registry' require 'support/local_resource_registry' diff --git a/spec/mongo/tracer/open_telemetry_spec.rb b/spec/mongo/tracer/open_telemetry_spec.rb new file mode 100644 index 0000000000..a7d115a9a0 --- /dev/null +++ b/spec/mongo/tracer/open_telemetry_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Mongo::Tracer::OpenTelemetry do + describe '#initialize' do + it 'disables OpenTelemetry by default' do + tracer = described_class.new + expect(tracer.enabled?).to be false + end + + it 'disables OpenTelemetry when the environment variable is not set' do + allow(ENV).to receive(:[]).with('OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED').and_return(nil) + tracer = described_class.new + expect(tracer.enabled?).to be false + end + + %w[ true 1 yes ].each do |value| + it "enables OpenTelemetry when the environment variable is set to '#{value}'" do + allow(ENV).to receive(:[]).with('OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED').and_return(value) + tracer = described_class.new + expect(tracer.enabled?).to be true + end + end + end +end diff --git a/spec/runners/unified.rb b/spec/runners/unified.rb index 042b1c3947..a58a469d9c 100644 --- a/spec/runners/unified.rb +++ b/spec/runners/unified.rb @@ -99,6 +99,7 @@ def define_unified_spec_tests(base_path, paths, expect_failure: false) test.run test.assert_outcome test.assert_events + test.assert_tracing_messages test.cleanup end end diff --git a/spec/runners/unified/assertions.rb b/spec/runners/unified/assertions.rb index 69cb2282ad..98850dcd56 100644 --- a/spec/runners/unified/assertions.rb +++ b/spec/runners/unified/assertions.rb @@ -140,9 +140,14 @@ def assert_documents_match(actual, expected) end end - def assert_document_matches(actual, expected, msg) - unless actual == expected - raise Error::ResultMismatch, "#{msg} does not match" + def assert_document_matches(actual, expected, msg, as_root: false) + if !as_root && actual.keys.to_set != expected.keys.to_set + raise Error::ResultMismatch, "#{msg} keys do not match: expected #{expected.keys}, actual #{actual.keys}" + end + expected.each do |key, expected_value| + raise Error::ResultMismatch, "#{msg} has no key #{key}" unless actual.key?(key) + actual_value = actual[key] + assert_value_matches(actual_value, expected_value, "#{msg} key #{key}") end end @@ -383,6 +388,14 @@ def assert_value_matches(actual, expected, msg) if actual.nil? || actual >= expected_v raise Error::ResultMismatch, "Actual value #{actual} should be less than #{expected_v}" end + when '$$matchAsDocument' + actual_v = BSON::ExtJSON.parse(actual) + match_as_root = false + if expected_v.keys.first == '$$matchAsRoot' + expected_v = expected_v.values.first + match_as_root = true + end + assert_document_matches(actual_v, expected_v, msg, as_root: match_as_root) else raise NotImplementedError, "Unknown operator #{operator}" end @@ -392,5 +405,44 @@ def assert_value_matches(actual, expected, msg) end end end + + def assert_tracing_messages + return unless @expected_tracing_messages + @expected_tracing_messages.each do |spec| + spec = UsingHash[spec] + client_id = spec.use!('client') + client = entities.get(:client, client_id) + tracer = @tracers.fetch(client) + expected_spans = spec.use!('spans') + ignore_extra_spans = if ignore = spec.use('ignoreExtraSpans') + # Ruby treats 0 as truthy, whereas the spec tests use it as falsy. + ignore == 0 ? false : ignore + else + false + end + actual_spans = tracer.span_hierarchy + if (!ignore_extra_spans && actual_spans.length != expected_spans.length) || + (ignore_extra_spans && actual_spans.length < expected_spans.length) + raise Error::ResultMismatch, "Span count mismatch: expected #{expected_spans.length}, actual #{actual_spans.length}\nExpected: #{expected_spans}\nActual: #{actual_spans}" + end + expected_spans.each_with_index do |expected, i| + assert_span_matches(actual_spans[i], expected) + end + end + end + + def assert_span_matches(actual, expected) + assert_eq(actual.name, expected.use!('name'), 'Span name does not match') + expected_attributes = UsingHash[expected.use!('tags')] + expected_attributes.each do |key, value| + actual_value = actual.attributes[key] + assert_value_matches(actual_value, value, "Span attribute #{key}") + end + + expected_nested_spans = expected.use('nested') || [] + expected_nested_spans.each_with_index do |nested_expected, i| + assert_span_matches(actual.nested[i], nested_expected) + end + end end end diff --git a/spec/runners/unified/test.rb b/spec/runners/unified/test.rb index 32b0ba0a82..8c40a37579 100644 --- a/spec/runners/unified/test.rb +++ b/spec/runners/unified/test.rb @@ -37,6 +37,8 @@ def initialize(spec, **opts) @description = @test_spec.use('description') @outcome = @test_spec.use('outcome') @expected_events = @test_spec.use('expectEvents') + @expected_tracing_messages = @test_spec.use('expectTracingMessages') + @expected_spans = @test_spec.use('expectSpans') @skip_reason = @test_spec.use('skipReason') if req = @test_spec.use('runOnRequirements') @reqs = req.map { |r| Mongo::CRUD::Requirement.new(r) } @@ -54,6 +56,7 @@ def initialize(spec, **opts) end @test_spec.freeze @subscribers = {} + @tracers = {} @observe_sensitive = {} @options = opts end @@ -195,9 +198,24 @@ def generate_entities(es) end end + observe_tracing_messages = spec.use('observeTracingMessages') + tracer = ::Tracing::Tracer.new + if observe_tracing_messages + opts[:tracing] = { + enabled: true, + tracer: tracer, + } + if observe_tracing_messages['enableCommandPayload'] + # Set the maximum length of the query text to reasonably high + # value so that we can capture the full query text + opts[:tracing][:query_text_max_length] = 4096 + end + end + create_client(**opts).tap do |client| @observe_sensitive[id] = spec.use('observeSensitiveCommands') @subscribers[client] ||= subscriber + @tracers[client] ||= tracer end when 'database' client = entities.get(:client, spec.use!('client')) diff --git a/spec/spec_tests/data/open_telemetry/README.md b/spec/spec_tests/data/open_telemetry/README.md new file mode 100644 index 0000000000..c3d14f6c93 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/README.md @@ -0,0 +1,34 @@ +# OpenTelemetry Tests + +______________________________________________________________________ + +## Testing + +### Automated Tests + +The YAML and JSON files in this directory are platform-independent tests meant to exercise a driver's implementation of +the OpenTelemetry specification. These tests utilize the +[Unified Test Format](../../unified-test-format/unified-test-format.md). + +For each test, create a MongoClient, configure it to enable tracing. + +```yaml +createEntities: + - client: + id: client0 + observeTracingMessages: + enableCommandPayload: true +``` + +These tests require the ability to collect tracing [spans](../open-telemetry.md#span) data in a structured form as +described in the +[Unified Test Format specification.expectTracingMessages](../../unified-test-format/unified-test-format.md#expectTracingMessages). +For example the Java driver uses [Micrometer](https://jira.mongodb.org/browse/JAVA-5732) to collect tracing spans. + +```yaml +expectTracingMessages: + client: client0 + ignoreExtraSpans: false + spans: + ... +``` diff --git a/spec/spec_tests/data/open_telemetry/cursor/cursor.yml b/spec/spec_tests/data/open_telemetry/cursor/cursor.yml new file mode 100644 index 0000000000..54675a8a67 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/cursor/cursor.yml @@ -0,0 +1,131 @@ +description: cursor retrieval +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: cursor + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: cursor + documents: + - { _id: 1 } + - { _id: 2 } + - { _id: 3 } + - { _id: 4 } + - { _id: 5 } + - { _id: 6 } +tests: + - description: find with a cursor + operations: + - name: find + object: *collection0 + arguments: + filter: { _id: { $gt: 1 } } + batchSize: 2 + expectResult: + - { _id: 2 } + - { _id: 3 } + - { _id: 4 } + - { _id: 5 } + - { _id: 6 } + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: find cursor.test + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: test + db.operation.name: find + db.operation.summary: find cursor.test + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: { _id: { $gt: 1 } } + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + + - name: command getMore + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: getMore + network.transport: tcp + db.mongodb.cursor_id: { $$type: [ 'int', 'long' ] } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: getMore + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + getMore: { $$type: long } + collection: test + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + - name: command getMore + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: getMore + network.transport: tcp + db.mongodb.cursor_id: { $$type: [ 'int', 'long' ] } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: getMore + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + getMore: { $$type: long } + collection: test + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] diff --git a/spec/spec_tests/data/open_telemetry/operation/find.yml b/spec/spec_tests/data/open_telemetry/operation/find.yml new file mode 100644 index 0000000000..292c641755 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/find.yml @@ -0,0 +1,64 @@ +description: operation find +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-find + - collection: + id: &collection0 collection0 + database: database0 + collectionName: &collection0Name test +initialData: + - collectionName: test + databaseName: operation-find + documents: [] +tests: + - description: find an element + operations: + - name: find + object: *collection0 + arguments: { filter: { x: 1 } } + + expectTracingMessages: + - client: *client0 + ignoreExtraSpans: false + spans: + - name: find operation-find.test + tags: + db.system: mongodb + db.namespace: operation-find + db.collection.name: test + db.operation.name: find + db.operation.summary: find operation-find.test + nested: + - name: find operation-find.test + tags: + db.system: mongodb + db.namespace: operation-find + db.collection.name: *collection0Name + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + db.query.summary: find operation-find.test + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] diff --git a/spec/spec_tests/data/open_telemetry/operation/find_retries.yml b/spec/spec_tests/data/open_telemetry/operation/find_retries.yml new file mode 100644 index 0000000000..5daebe5fdb --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/find_retries.yml @@ -0,0 +1,104 @@ +description: operation find retrying failed command +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-find-retries + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: operation-find-retries + documents: [] +tests: + - description: find an element + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ find ] + errorCode: 89 + errorLabels: [ RetryableWriteError ] + + - name: find + object: *collection0 + arguments: + filter: { x: 1 } + expectTracingMessages: + client: *client0 + ignoreExtraSpans: true + spans: + - name: find operation-find-retries.test + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: test + db.operation.name: find + db.operation.summary: find operation-find-retries.test + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: operation-find-retries.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$type: string } + exception.type: { $$type: string } + exception.stacktrace: { $$type: string } + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + - name: command find + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: operation-find-retries.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + diff --git a/spec/spec_tests/data/open_telemetry/operation/insert.yml b/spec/spec_tests/data/open_telemetry/operation/insert.yml new file mode 100644 index 0000000000..0a02a6128b --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/insert.yml @@ -0,0 +1,61 @@ +description: operation insert +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-insert + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: operation-insert + documents: [ ] +tests: + - description: insert one element + operations: + - object: *collection0 + name: insertOne + arguments: { document: { _id: 1 } } + + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: insert operation-insert.test + tags: + db.system: mongodb + db.namespace: operation-insert + db.collection.name: test + db.operation.name: insert + db.operation.summary: insert operation-insert.test + nested: + - name: command insert + tags: + db.system: mongodb + db.namespace: operation-insert + server.address: { $$type: string } + server.port: { $$type: [ 'long', 'string' ] } + server.type: { $$type: string } + db.query.summary: insert + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + insert: test + ordered: true + txnNumber: 1 + documents: + - _id: 1 + + outcome: + - collectionName: test + databaseName: operation-insert + documents: + - _id: 1 diff --git a/spec/spec_tests/data/open_telemetry/transaction/transaction.yml b/spec/spec_tests/data/open_telemetry/transaction/transaction.yml new file mode 100644 index 0000000000..eb76cc5fd7 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/transaction/transaction.yml @@ -0,0 +1,112 @@ +description: transaction spans +schemaVersion: '1.26' +runOnRequirements: + - minServerVersion: '4.0' + topologies: + - replicaset + - minServerVersion: '4.1.8' + topologies: + - sharded + - load-balanced +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: transaction-tests + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test + - session: + id: &session0 session0 + client: client0 +initialData: + - collectionName: test + databaseName: transaction-tests + documents: [] +tests: + - description: observeTracingMessages around transaction + operations: + - object: *session0 + name: startTransaction + - object: *collection0 + name: insertOne + arguments: + session: *session0 + document: + _id: 1 + - object: *session0 + name: commitTransaction + - name: find + object: *collection0 + arguments: { filter: { x: 1 } } + + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: transaction + tags: + db.system: mongodb + nested: + - name: insert transaction-tests.test + tags: + db.system: mongodb + db.namespace: transaction-tests + db.collection.name: test + db.operation.name: insert + db.operation.summary: insert transaction-tests.test + nested: + - name: command insert + tags: + db.system: mongodb + db.namespace: transaction-tests + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: insert + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + insert: test + ordered: true + txnNumber: 1 + startTransaction: true + autocommit: false + documents: + - _id: 1 + db.mongodb.lsid: { $$sessionLsid: *session0 } + - name: commitTransaction admin.$cmd + tags: + db.system: mongodb + nested: + - name: command commitTransaction + tags: + db.system: mongodb + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + commitTransaction: 1 + txnNumber: 1 + autocommit: false + - name: find transaction-tests.test + tags: {} + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: transaction-tests + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: find + outcome: + - collectionName: test + databaseName: transaction-tests + documents: + - _id: 1 diff --git a/spec/spec_tests/open_telemetry_spec.rb b/spec/spec_tests/open_telemetry_spec.rb new file mode 100644 index 0000000000..040574597d --- /dev/null +++ b/spec/spec_tests/open_telemetry_spec.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true +# rubocop:todo all + +require 'spec_helper' + +require 'runners/unified' + +base = "#{CURRENT_PATH}/spec_tests/data/open_telemetry" +OTEL_UNIFIED_TESTS = Dir.glob("#{base}/**/*.yml").sort + +describe 'OpenTelemetry unified spec tests' do + define_unified_spec_tests(base, OTEL_UNIFIED_TESTS) +end diff --git a/spec/support/tracing.rb b/spec/support/tracing.rb new file mode 100644 index 0000000000..ec542bafc0 --- /dev/null +++ b/spec/support/tracing.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +module Tracing + Error = Class.new(StandardError) + + class Span + + attr_reader :tracer, :name, :attributes, :events, :with_parent, :kind, :finished, :nested + + attr_accessor :status + + def initialize(tracer, name, attributes = {}, with_parent: nil, kind: :internal) + @tracer = tracer + @name = name + @attributes = attributes + @events = [] + @with_parent = with_parent + @kind = kind + @finished = false + @nested = [] + end + + def set_attribute(key, value) + @attributes[key] = value + end + + def add_event(name, attributes: {}) + event_attributes = { 'event.name' => name } + event_attributes.merge!(attributes) unless attributes.nil? + @events << event_attributes + end + + def record_exception(exception, attributes: nil) + event_attributes = { + 'exception.type' => exception.class.to_s, + 'exception.message' => exception.message, + 'exception.stacktrace' => exception.full_message(highlight: false, order: :top).encode('UTF-8', invalid: :replace, undef: :replace, replace: '�') + } + event_attributes.merge!(attributes) unless attributes.nil? + add_event('exception', attributes: event_attributes) + end + + def finish + raise Tracing::Error, 'Span already finished' if @finished + @finished = true + tracer.finish_span(self) + end + end + + class Tracer + + attr_reader :spans + + def initialize + @spans = [] + @stack = [] + end + + def start_span(name, attributes: {}, with_parent: nil, kind: :internal) + parent = if with_parent.nil? + @stack.last + else + with_parent + end + Span.new(self, name, attributes, with_parent: parent, kind: kind).tap do |span| + @spans << span + @stack << span + end + end + + def finish_span(span) + raise Error, 'Span not found' unless @spans.include?(span) + @stack.pop if @stack.last == span + end + + def span_hierarchy + hierarchy = {} + @spans.each do |span| + if span.with_parent.nil? + hierarchy[span.object_id] = span + elsif (parent = hierarchy[span.with_parent.object_id]) + parent.nested << span + else + raise Error, "Parent span not found for span #{span.name}" + end + end + hierarchy.values + end + end +end