Skip to content

Commit 917f913

Browse files
craig[bot]wenyihu6
andcommitted
Merge #156276
156276: workload/changefeed: allow changefeed-cursor r=wenyihu6 a=wenyihu6 Part of: https://cockroachlabs.atlassian.net/browse/CRDB-55203 Release note: none --- **workload/changefeed: allow changefeed-cursor** This commit adds a new option, changefeed-cursor, which allows specifying the timestamp after which the changefeed should start emitting events and trigger a catch-up scan. If not specified, the changefeed defaults to using the current cluster logical timestamp. This helps tests specify catch-up scan behavior. --- **workload/changefeed: add more logs** Co-authored-by: wenyihu6 <[email protected]>
2 parents 40466eb + f902e55 commit 917f913

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

pkg/workload/changefeeds/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//pkg/util/hlc",
10+
"//pkg/util/log",
1011
"//pkg/util/timeutil",
1112
"//pkg/workload",
1213
"//pkg/workload/histogram",

pkg/workload/changefeeds/changefeeds.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,24 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
1617
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1718
"github.com/cockroachdb/cockroach/pkg/workload"
1819
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
1920
"github.com/cockroachdb/errors"
2021
"github.com/jackc/pgx/v5"
2122
)
2223

24+
var logResolvedEvery = log.Every(10 * time.Second)
25+
2326
// AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra
2427
// worker to run a changefeed over the tables of the generator.
2528
func AddChangefeedToQueryLoad(
2629
ctx context.Context,
2730
gen workload.ConnFlagser,
2831
dbName string,
2932
resolvedTarget time.Duration,
33+
cursorStr string,
3034
urls []string,
3135
reg *histogram.Registry,
3236
ql *workload.QueryLoad,
@@ -88,8 +92,14 @@ func AddChangefeedToQueryLoad(
8892
return err
8993
}
9094

91-
var cursorStr string
92-
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
95+
if cursorStr == "" {
96+
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
97+
return err
98+
}
99+
}
100+
101+
epoch, err := hlc.ParseHLC(cursorStr)
102+
if err != nil {
93103
return err
94104
}
95105

@@ -132,10 +142,15 @@ func AddChangefeedToQueryLoad(
132142
return true
133143
}
134144
var rows pgx.Rows
145+
var changefeedStartTime time.Time
135146
maybeSetupRows := func() (done bool) {
136147
if rows != nil {
137148
return false
138149
}
150+
if changefeedStartTime.IsZero() {
151+
changefeedStartTime = timeutil.Now()
152+
}
153+
log.Dev.Infof(ctx, "creating changefeed after %s with stmt: %s with args %v", timeutil.Since(epoch.GoTime()), stmt, args)
139154
var err error
140155
rows, err = conn.Query(cfCtx, stmt, args...)
141156
return maybeMarkDone(err)
@@ -179,6 +194,11 @@ func AddChangefeedToQueryLoad(
179194
return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved)
180195
}
181196
lastResolved = resolved
197+
if !lastResolved.IsEmpty() {
198+
if logResolvedEvery.ShouldLog() {
199+
log.Dev.Infof(ctx, "received resolved timestamp: lag=%s, ts=%s, sinceStart=%s", timeutil.Since(lastResolved.GoTime()), lastResolved, timeutil.Since(changefeedStartTime))
200+
}
201+
}
182202
} else {
183203
return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2])
184204
}

pkg/workload/cli/run.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ var (
116116
"changefeed-max-rate", 0, "Maximum frequency of changefeed ingestion. If 0, no limit.")
117117
changefeedResolvedTarget = runFlags.Duration("changefeed-resolved-target", 5*time.Second,
118118
"The target frequency of resolved messages. O to disable resolved reporting and accept server defaults.")
119+
changefeedCursor = runFlags.String("changefeed-cursor", "",
120+
"The cursor to start the changefeed from. If empty, the changefeed will start from the current cluster logical timestamp.")
119121
)
120122

121123
func init() {
@@ -507,7 +509,7 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
507509

508510
if *changefeed {
509511
log.Dev.Infof(ctx, "adding changefeed to query load...")
510-
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, urls, reg, &ops)
512+
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, *changefeedCursor, urls, reg, &ops)
511513
if err != nil && !*tolerateErrors {
512514
return errors.Wrapf(err, "failed to initialize changefeed")
513515
}

0 commit comments

Comments
 (0)