diff --git a/pkg/workload/changefeeds/BUILD.bazel b/pkg/workload/changefeeds/BUILD.bazel index cbdea764909e..9de703848afa 100644 --- a/pkg/workload/changefeeds/BUILD.bazel +++ b/pkg/workload/changefeeds/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/hlc", + "//pkg/util/log", "//pkg/util/timeutil", "//pkg/workload", "//pkg/workload/histogram", diff --git a/pkg/workload/changefeeds/changefeeds.go b/pkg/workload/changefeeds/changefeeds.go index 667e8396501b..c7014ae5946b 100644 --- a/pkg/workload/changefeeds/changefeeds.go +++ b/pkg/workload/changefeeds/changefeeds.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" @@ -20,6 +21,8 @@ import ( "github.com/jackc/pgx/v5" ) +var logResolvedEvery = log.Every(10 * time.Second) + // AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra // worker to run a changefeed over the tables of the generator. func AddChangefeedToQueryLoad( @@ -27,6 +30,7 @@ func AddChangefeedToQueryLoad( gen workload.ConnFlagser, dbName string, resolvedTarget time.Duration, + cursorStr string, urls []string, reg *histogram.Registry, ql *workload.QueryLoad, @@ -76,8 +80,14 @@ func AddChangefeedToQueryLoad( return err } - var cursorStr string - if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil { + if cursorStr == "" { + if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil { + return err + } + } + + epoch, err := hlc.ParseHLC(cursorStr) + if err != nil { return err } @@ -120,10 +130,15 @@ func AddChangefeedToQueryLoad( return true } var rows pgx.Rows + var changefeedStartTime time.Time maybeSetupRows := func() (done bool) { if rows != nil { return false } + if changefeedStartTime.IsZero() { + changefeedStartTime = timeutil.Now() + } + log.Dev.Infof(ctx, "creating changefeed after %s with stmt: %s with args %v", timeutil.Since(epoch.GoTime()), stmt, args) var err error rows, err = conn.Query(cfCtx, stmt, args...) return maybeMarkDone(err) @@ -167,6 +182,11 @@ func AddChangefeedToQueryLoad( return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved) } lastResolved = resolved + if !lastResolved.IsEmpty() { + if logResolvedEvery.ShouldLog() { + log.Dev.Infof(ctx, "received resolved timestamp: lag=%s, ts=%s, sinceStart=%s", timeutil.Since(lastResolved.GoTime()), lastResolved, timeutil.Since(changefeedStartTime)) + } + } } else { return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2]) } diff --git a/pkg/workload/cli/run.go b/pkg/workload/cli/run.go index 59c5ff84e6d1..0c9e2f123ff4 100644 --- a/pkg/workload/cli/run.go +++ b/pkg/workload/cli/run.go @@ -116,6 +116,8 @@ var ( "changefeed-max-rate", 0, "Maximum frequency of changefeed ingestion. If 0, no limit.") changefeedResolvedTarget = runFlags.Duration("changefeed-resolved-target", 5*time.Second, "The target frequency of resolved messages. O to disable resolved reporting and accept server defaults.") + changefeedCursor = runFlags.String("changefeed-cursor", "", + "The cursor to start the changefeed from. If empty, the changefeed will start from the current cluster logical timestamp.") ) func init() { @@ -507,7 +509,7 @@ func runRun(gen workload.Generator, urls []string, dbName string) error { if *changefeed { log.Dev.Infof(ctx, "adding changefeed to query load...") - err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, urls, reg, &ops) + err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, *changefeedCursor, urls, reg, &ops) if err != nil && !*tolerateErrors { return errors.Wrapf(err, "failed to initialize changefeed") }