diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 36d6cfc9be99..00939bfa02d3 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -387,7 +387,8 @@ sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up or sql.temp_object_cleaner.wait_interval duration 30m0s how long after creation a temporary object will be cleaned up application sql.log.all_statements.enabled (alias: sql.trace.log_statement_execute) boolean false set to true to enable logging of all executed statements application sql.trace.stmt.enable_threshold duration 0s enables tracing on all statements; statements executing for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold application -sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries) application +sql.trace.txn.enable_threshold duration 0s enables transaction traces for transactions exceeding this duration, used with `sql.trace.txn.sample_rate` application +sql.trace.txn.sample_rate float 1 enables probabilistic transaction tracing. It should be used in conjunction with `sql.trace.txn.enable_threshold`. A percentage of transactions between 0 and 1.0 will have tracing enabled, and only those which exceed the configured threshold will be logged. application sql.ttl.changefeed_replication.disabled boolean false if true, deletes issued by TTL will not be replicated via changefeeds (this setting will be ignored by changefeeds that have the ignore_disable_changefeed_replication option set; such changefeeds will continue to replicate all TTL deletes) application sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job application sql.ttl.default_delete_rate_limit integer 100 default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5fa95a16e295..61f5b5b2a35f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -342,7 +342,8 @@
sql.temp_object_cleaner.wait_interval
duration30m0show long after creation a temporary object will be cleaned upServerless/Dedicated/Self-Hosted
sql.log.all_statements.enabled
(alias: sql.trace.log_statement_execute)
booleanfalseset to true to enable logging of all executed statementsServerless/Dedicated/Self-Hosted
sql.trace.stmt.enable_threshold
duration0senables tracing on all statements; statements executing for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_thresholdServerless/Dedicated/Self-Hosted -
sql.trace.txn.enable_threshold
duration0senables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries)Serverless/Dedicated/Self-Hosted +
sql.trace.txn.enable_threshold
duration0senables transaction traces for transactions exceeding this duration, used with `sql.trace.txn.sample_rate`Serverless/Dedicated/Self-Hosted +
sql.trace.txn.sample_rate
float1enables probabilistic transaction tracing. It should be used in conjunction with `sql.trace.txn.enable_threshold`. A percentage of transactions between 0 and 1.0 will have tracing enabled, and only those which exceed the configured threshold will be logged.Serverless/Dedicated/Self-Hosted
sql.ttl.changefeed_replication.disabled
booleanfalseif true, deletes issued by TTL will not be replicated via changefeeds (this setting will be ignored by changefeeds that have the ignore_disable_changefeed_replication option set; such changefeeds will continue to replicate all TTL deletes)Serverless/Dedicated/Self-Hosted
sql.ttl.default_delete_batch_size
integer100default amount of rows to delete in a single query during a TTL jobServerless/Dedicated/Self-Hosted
sql.ttl.default_delete_rate_limit
integer100default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.Serverless/Dedicated/Self-Hosted diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 9a4023ec0f52..fb3fabd1a7c9 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -3664,6 +3664,7 @@ func (ex *connExecutor) execStmtInNoTxnState( ex.txnIsolationLevelToKV(ctx, s.Modes.Isolation), ex.omitInRangefeeds(), ex.bufferedWritesEnabled(ctx), + ex.rng.internal, ) case *tree.ShowCommitTimestamp: return ex.execShowCommitTimestampInNoTxnState(ctx, s, res) @@ -3698,6 +3699,7 @@ func (ex *connExecutor) execStmtInNoTxnState( ex.txnIsolationLevelToKV(ctx, tree.UnspecifiedIsolation), ex.omitInRangefeeds(), ex.bufferedWritesEnabled(ctx), + ex.rng.internal, ) } } @@ -3732,6 +3734,7 @@ func (ex *connExecutor) beginImplicitTxn( ex.txnIsolationLevelToKV(ctx, tree.UnspecifiedIsolation), ex.omitInRangefeeds(), ex.bufferedWritesEnabled(ctx), + ex.rng.internal, ) } diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index 03e142f21348..dcd0abded038 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -12,6 +12,7 @@ package sql import ( + "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" @@ -121,6 +122,7 @@ type eventTxnStartPayload struct { isoLevel isolation.Level omitInRangefeeds bool bufferedWritesEnabled bool + rng *rand.Rand } // makeEventTxnStartPayload creates an eventTxnStartPayload. @@ -134,6 +136,7 @@ func makeEventTxnStartPayload( isoLevel isolation.Level, omitInRangefeeds bool, bufferedWritesEnabled bool, + rng *rand.Rand, ) eventTxnStartPayload { return eventTxnStartPayload{ pri: pri, @@ -145,6 +148,7 @@ func makeEventTxnStartPayload( isoLevel: isoLevel, omitInRangefeeds: omitInRangefeeds, bufferedWritesEnabled: bufferedWritesEnabled, + rng: rng, } } @@ -601,6 +605,7 @@ func noTxnToOpen(args fsm.Args) error { payload.isoLevel, payload.omitInRangefeeds, payload.bufferedWritesEnabled, + payload.rng, ) ts.setAdvanceInfo( advCode, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 0b40424e7103..9a40efaa9a89 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -260,22 +260,28 @@ var SecondaryTenantScatterEnabled = settings.RegisterBoolSetting( settings.WithName("sql.virtual_cluster.feature_access.manual_range_scatter.enabled"), ) -// traceTxnThreshold can be used to log SQL transactions that take -// longer than duration to complete. For example, traceTxnThreshold=1s -// will log the trace for any transaction that takes 1s or longer. To -// log traces for all transactions use traceTxnThreshold=1ns. Note -// that any positive duration will enable tracing and will slow down -// all execution because traces are gathered for all transactions even -// if they are not output. -var traceTxnThreshold = settings.RegisterDurationSetting( +// TraceTxnThreshold logs SQL transactions exceeding a duration, captured via +// probabilistic tracing. For example, with `sql.trace.txn.percent` set to 0.5, +// 50% of transactions are traced, and those exceeding this threshold are +// logged. +var TraceTxnThreshold = settings.RegisterDurationSetting( settings.ApplicationLevel, "sql.trace.txn.enable_threshold", - "enables tracing on all transactions; transactions open for longer than "+ - "this duration will have their trace logged (set to 0 to disable); "+ - "note that enabling this may have a negative performance impact; "+ - "this setting is coarser-grained than sql.trace.stmt.enable_threshold "+ - "because it applies to all statements within a transaction as well as "+ - "client communication (e.g. retries)", 0, + "enables transaction traces for transactions exceeding this duration, used "+ + "with `sql.trace.txn.sample_rate`", + 0, + settings.WithPublic) + +// TraceTxnSampleRate Enables probabilistic transaction tracing. +var TraceTxnSampleRate = settings.RegisterFloatSetting( + settings.ApplicationLevel, + "sql.trace.txn.sample_rate", + "enables probabilistic transaction tracing. It should be used in conjunction "+ + "with `sql.trace.txn.enable_threshold`. A percentage of transactions between 0 and 1.0 "+ + "will have tracing enabled, and only those which exceed the configured "+ + "threshold will be logged.", + 1.0, + settings.NonNegativeFloatWithMaximum(1.0), settings.WithPublic) // TraceStmtThreshold is identical to traceTxnThreshold except it applies to diff --git a/pkg/sql/explain_bundle_test.go b/pkg/sql/explain_bundle_test.go index ad499b0c1d0f..33485963e850 100644 --- a/pkg/sql/explain_bundle_test.go +++ b/pkg/sql/explain_bundle_test.go @@ -178,7 +178,9 @@ CREATE TABLE users(id UUID DEFAULT gen_random_uuid() PRIMARY KEY, promo_id INT R t.Run("basic when tracing already enabled", func(t *testing.T) { r.Exec(t, "SET CLUSTER SETTING sql.trace.txn.enable_threshold='100ms';") + r.Exec(t, "SET CLUSTER SETTING sql.trace.txn.sample_rate='1.0';") defer r.Exec(t, "SET CLUSTER SETTING sql.trace.txn.enable_threshold='0ms';") + defer r.Exec(t, "SET CLUSTER SETTING sql.trace.txn.sample_rate='0.0';") rows := r.QueryStr(t, "EXPLAIN ANALYZE (DEBUG) SELECT * FROM abc WHERE c=1") checkBundle( t, fmt.Sprint(rows), "public.abc", nil, false, /* expectErrors */ diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 4c72062fa720..a55e3ca11b2d 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -480,6 +480,7 @@ func (ie *InternalExecutor) newConnExecutorWithTxn( // TODO(yuzefovich): re-evaluate whether we want to allow buffered // writes for internal executor. false, /* bufferedWritesEnabled */ + ex.rng.internal, ) // Modify the Collection to match the parent executor's Collection. diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index e1fe5a485e0b..f9c395c8f2d3 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "fmt" "net/url" + "regexp" "sort" "strings" "testing" @@ -20,13 +21,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/pgurlutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" ) @@ -584,3 +588,90 @@ func TestStatementThreshold(t *testing.T) { r.Exec(t, "select 1") // TODO(andrei): check the logs for traces somehow. } + +func TestTraceTxnSampleRateAndThreshold(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: settings, + }) + defer s.Stopper().Stop(ctx) + + appLogsSpy := logtestutils.NewLogSpy( + t, + // This string match is constructed from the log.SqlExec.Infof format + // string found in conn_executor_exec.go:logTraceAboveThreshold + logtestutils.MatchesF("exceeding threshold of"), + ) + cleanup := log.InterceptWith(ctx, appLogsSpy) + defer cleanup() + + for _, tc := range []struct { + name string + sampleRate float64 + threshold time.Duration + exptToTraceEventually bool + }{ + { + name: "no sample rate and no threshold", + sampleRate: 0.0, + threshold: 0 * time.Nanosecond, + exptToTraceEventually: false, + }, + { + name: "sample rate 1.0 and threshold 1ns should trace", + sampleRate: 1.0, + threshold: 1 * time.Nanosecond, + exptToTraceEventually: true, + }, + { + name: "sample rate 0.0 and threshold 1ns should not trace", + sampleRate: 0.0, + threshold: 1 * time.Nanosecond, + exptToTraceEventually: false, + }, + { + name: "sample rate 1.0 and threshold 0ns should not trace", + sampleRate: 1.0, + threshold: 0 * time.Nanosecond, + exptToTraceEventually: false, + }, + { + name: "sample rate 0.5 and threshold 1ns should trace eventually", + sampleRate: 0.5, + threshold: 1 * time.Nanosecond, + exptToTraceEventually: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + sql.TraceTxnThreshold.Override(ctx, &settings.SV, tc.threshold) + sql.TraceTxnSampleRate.Override(ctx, &settings.SV, tc.sampleRate) + log.FlushAllSync() + appLogsSpy.Reset() + r := sqlutils.MakeSQLRunner(db) + + if tc.exptToTraceEventually { + testutils.SucceedsSoon(t, func() error { + r.Exec(t, "SELECT pg_sleep(0.01)") + log.FlushAllSync() + if !appLogsSpy.Has(logtestutils.MatchesF(regexp.QuoteMeta("ExecStmt: SELECT pg_sleep(0.01)"))) { + return errors.New("no sql txn log found (tracing did not happen)") + } + return nil + }) + } else { + r.Exec(t, "SELECT pg_sleep(0.01)") + log.FlushAllSync() + + spyLogs := appLogsSpy.ReadAll() + if appLogsSpy.Has(logtestutils.MatchesF(regexp.QuoteMeta("ExecStmt: SELECT pg_sleep(0.01)"))) { + t.Fatalf("sql txn log found (tracing happened when it should not have): %v", spyLogs) + } + } + }) + } +} diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index d4cbcb752bec..1cd1d3e84fed 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -7,6 +7,7 @@ package sql import ( "context" + "math/rand" "sync/atomic" "time" @@ -94,6 +95,11 @@ type txnState struct { // txn context. txnCancelFn context.CancelFunc + // shouldRecord is used to indicate whether this transaction should record a + // trace. This is set to true if we have a positive sample rate and a + // positive duration trigger for logging. + shouldRecord bool + // recordingThreshold, is not zero, indicates that sp is recording and that // the recording should be dumped to the log if execution of the transaction // took more than this. @@ -193,6 +199,7 @@ func (ts *txnState) resetForNewSQLTxn( isoLevel isolation.Level, omitInRangefeeds bool, bufferedWritesEnabled bool, + rng *rand.Rand, ) (txnID uuid.UUID) { // Reset state vars to defaults. ts.sqlTimestamp = sqlTimestamp @@ -206,8 +213,12 @@ func (ts *txnState) resetForNewSQLTxn( alreadyRecording := tranCtx.sessionTracing.Enabled() ctx, cancelFn := context.WithCancel(connCtx) var sp *tracing.Span - duration := traceTxnThreshold.Get(&tranCtx.settings.SV) - if alreadyRecording || duration > 0 { + duration := TraceTxnThreshold.Get(&tranCtx.settings.SV) + + sampleRate := TraceTxnSampleRate.Get(&tranCtx.settings.SV) + ts.shouldRecord = sampleRate > 0 && duration > 0 && rng.Float64() < sampleRate + + if alreadyRecording || ts.shouldRecord { ts.Ctx, sp = tracing.EnsureChildSpan(ctx, tranCtx.tracer, opName, tracing.WithRecording(tracingpb.RecordingVerbose)) } else if ts.testingForceRealTracingSpans { @@ -220,7 +231,7 @@ func (ts *txnState) resetForNewSQLTxn( sp.SetTag("implicit", attribute.StringValue("true")) } - if !alreadyRecording && (duration > 0) { + if !alreadyRecording && ts.shouldRecord { ts.recordingThreshold = duration ts.recordingStart = timeutil.Now() } @@ -285,7 +296,7 @@ func (ts *txnState) finishSQLTxn() (txnID uuid.UUID, commitTimestamp hlc.Timesta ts.mon.Stop(ts.Ctx) sp := tracing.SpanFromContext(ts.Ctx) - if ts.recordingThreshold > 0 { + if ts.shouldRecord { if elapsed := timeutil.Since(ts.recordingStart); elapsed >= ts.recordingThreshold { logTraceAboveThreshold(ts.Ctx, sp.GetRecording(sp.RecordingType()), /* recording */ @@ -302,6 +313,7 @@ func (ts *txnState) finishSQLTxn() (txnID uuid.UUID, commitTimestamp hlc.Timesta ts.txnCancelFn() } ts.Ctx = nil + ts.shouldRecord = false ts.recordingThreshold = 0 return func() (txnID uuid.UUID, timestamp hlc.Timestamp) { ts.mu.Lock() diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 0ccd9d1c69f9..1b6c896e6bff 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -213,6 +214,7 @@ func TestTransitions(t *testing.T) { ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) + rng, _ := randutil.NewTestRand() dummyRewCap := rewindCapability{rewindPos: CmdPos(12)} testCon := makeTestContext(stopper) tranCtx := transitionCtx{ @@ -282,7 +284,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.True}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal, isolation.Serializable, - false /* omitInRangefeeds */, false, /* bufferedWritesEnabled */ + false /* omitInRangefeeds */, false /* bufferedWritesEnabled */, rng, ), expState: stateOpen{ImplicitTxn: fsm.True, WasUpgraded: fsm.False}, expAdv: expAdvance{ @@ -309,7 +311,7 @@ func TestTransitions(t *testing.T) { ev: eventTxnStart{ImplicitTxn: fsm.False}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), nil /* historicalTimestamp */, tranCtx, sessiondatapb.Normal, isolation.Serializable, - false /* omitInRangefeeds */, false, /* bufferedWritesEnabled */ + false /* omitInRangefeeds */, false /* bufferedWritesEnabled */, rng, ), expState: stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.False}, expAdv: expAdvance{ diff --git a/pkg/util/log/logtestutils/BUILD.bazel b/pkg/util/log/logtestutils/BUILD.bazel index bdea6158a9e1..a7e8ed6f0f87 100644 --- a/pkg/util/log/logtestutils/BUILD.bazel +++ b/pkg/util/log/logtestutils/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "logtestutils", srcs = [ + "log_spy.go", "log_test_utils.go", "structured_log_spy.go", "telemetry_logging_test_utils.go", diff --git a/pkg/util/log/logtestutils/log_spy.go b/pkg/util/log/logtestutils/log_spy.go new file mode 100644 index 000000000000..4e0a6a1b4159 --- /dev/null +++ b/pkg/util/log/logtestutils/log_spy.go @@ -0,0 +1,139 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package logtestutils + +import ( + "context" + "encoding/json" + "math" + "regexp" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// LogSpy is a simple test utility which intercepts and stores +// log entries for testing purposes. It is not be used outside +// of tests. +type LogSpy struct { + testState *testing.T + + filters []func(entry logpb.Entry) bool + + mu struct { + syncutil.Mutex + + logs []logpb.Entry + } +} + +// NewLogSpy takes a test state and list of filters and returns +// a new LogSpy. +func NewLogSpy(t *testing.T, filters ...func(entry logpb.Entry) bool) *LogSpy { + return &LogSpy{ + testState: t, + filters: filters, + mu: struct { + syncutil.Mutex + logs []logpb.Entry + }{ + logs: []logpb.Entry{}, + }, + } +} + +// Intercept satisfies the log.Interceptor interface. +// It parses the log entry, checks if it passes the filters if +// any exist and if it does, stores it in the logs slice. +func (s *LogSpy) Intercept(body []byte) { + s.mu.Lock() + defer s.mu.Unlock() + + var entry logpb.Entry + err := json.Unmarshal(body, &entry) + + if err != nil { + s.testState.Fatal(err) + } + + for _, filter := range s.filters { + if !filter(entry) { + return + } + } + + s.mu.logs = append(s.mu.logs, entry) +} + +// ReadAll consumes all logs contained within the spy and returns +// them as a list. Once the logs are consumed they cannot be read again. +func (s *LogSpy) ReadAll() []logpb.Entry { + s.mu.Lock() + defer s.mu.Unlock() + + return s.readLocked(math.MaxUint32) +} + +// Reset clears the logs contained within the spy. +func (s *LogSpy) Reset() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.logs = []logpb.Entry{} +} + +// ReadAll consumes the specified number of logs contained within +// the spy and returns them as a list. Once the logs are consumed +// they cannot be read again. +func (s *LogSpy) Read(n int) []logpb.Entry { + s.mu.Lock() + defer s.mu.Unlock() + + return s.readLocked(n) +} + +func (s *LogSpy) readLocked(n int) []logpb.Entry { + if n > len(s.mu.logs) { + n = len(s.mu.logs) + } + + entries := s.mu.logs[:n] + s.mu.logs = s.mu.logs[n:] + + return entries +} + +// Has checks whether the spy has any log which passes the passed +// in filters. +func (s *LogSpy) Has(filters ...func(logpb.Entry) bool) bool { + s.mu.Lock() + defer s.mu.Unlock() + +logLoop: + for _, entry := range s.mu.logs { + for _, f := range filters { + if !f(entry) { + continue logLoop + } + } + // only gets here if all filters matched this log + return true + } + return false +} + +// MatchesF returns a filter that matches log entries which contain +// the input pattern. +func MatchesF(pattern string) func(entry logpb.Entry) bool { + return func(entry logpb.Entry) bool { + exists, err := regexp.MatchString(pattern, entry.Message) + if err != nil { + log.Errorf(context.Background(), "failed to match regex %s: %s", pattern, err) + } + return exists + } +}