Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/workload/changefeeds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/timeutil",
"//pkg/workload",
"//pkg/workload/histogram",
Expand Down
24 changes: 22 additions & 2 deletions pkg/workload/changefeeds/changefeeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
)

var logResolvedEvery = log.Every(10 * time.Second)

// AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra
// worker to run a changefeed over the tables of the generator.
func AddChangefeedToQueryLoad(
ctx context.Context,
gen workload.ConnFlagser,
dbName string,
resolvedTarget time.Duration,
cursorStr string,
urls []string,
reg *histogram.Registry,
ql *workload.QueryLoad,
Expand Down Expand Up @@ -76,8 +80,14 @@ func AddChangefeedToQueryLoad(
return err
}

var cursorStr string
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
if cursorStr == "" {
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
return err
}
}

epoch, err := hlc.ParseHLC(cursorStr)
if err != nil {
return err
}

Expand Down Expand Up @@ -120,10 +130,15 @@ func AddChangefeedToQueryLoad(
return true
}
var rows pgx.Rows
var changefeedStartTime time.Time
maybeSetupRows := func() (done bool) {
if rows != nil {
return false
}
if changefeedStartTime.IsZero() {
changefeedStartTime = timeutil.Now()
}
log.Dev.Infof(ctx, "creating changefeed after %s with stmt: %s with args %v", timeutil.Since(epoch.GoTime()), stmt, args)
var err error
rows, err = conn.Query(cfCtx, stmt, args...)
return maybeMarkDone(err)
Expand Down Expand Up @@ -167,6 +182,11 @@ func AddChangefeedToQueryLoad(
return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved)
}
lastResolved = resolved
if !lastResolved.IsEmpty() {
if logResolvedEvery.ShouldLog() {
log.Dev.Infof(ctx, "received resolved timestamp: lag=%s, ts=%s, sinceStart=%s", timeutil.Since(lastResolved.GoTime()), lastResolved, timeutil.Since(changefeedStartTime))
}
}
} else {
return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2])
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/workload/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ var (
"changefeed-max-rate", 0, "Maximum frequency of changefeed ingestion. If 0, no limit.")
changefeedResolvedTarget = runFlags.Duration("changefeed-resolved-target", 5*time.Second,
"The target frequency of resolved messages. O to disable resolved reporting and accept server defaults.")
changefeedCursor = runFlags.String("changefeed-cursor", "",
"The cursor to start the changefeed from. If empty, the changefeed will start from the current cluster logical timestamp.")
)

func init() {
Expand Down Expand Up @@ -507,7 +509,7 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {

if *changefeed {
log.Dev.Infof(ctx, "adding changefeed to query load...")
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, urls, reg, &ops)
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, *changefeedCursor, urls, reg, &ops)
if err != nil && !*tolerateErrors {
return errors.Wrapf(err, "failed to initialize changefeed")
}
Expand Down
Loading