Skip to content

Commit 3a810a8

Browse files
committed
changefeedccl: changefeed-specific option for num of sink workers
Previously, the number of SinkIOWorkers could only be defined in cluster-wide setting. This patch allows to set the number of SinkIOWorkers per changefeed. If both (cluster-wide, per-changefeed) values are given, the min of the two will be used. Release note (sql change): The CREATE CHANGEFEED statement was extended with an optional `num_sink_workers` setting that can be used to set the number of SinkIOWorkers per changefeed. Note that the number of workers is capped by the cluster-wide setting if present. Fixes: #154546
1 parent c4dbc61 commit 3a810a8

File tree

5 files changed

+299
-6
lines changed

5 files changed

+299
-6
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12785,3 +12785,79 @@ func TestCreateTableLevelChangefeedWithDBPrivilege(t *testing.T) {
1278512785
}
1278612786
cdcTest(t, testFn, feedTestEnterpriseSinks)
1278712787
}
12788+
12789+
// TestChangefeedNumSinkWorkersPrecedence verifies that the num_sink_workers
12790+
// option works correctly and that the precedence logic (smaller value wins)
12791+
// is applied when both cluster setting and per-changefeed option are set.
12792+
func TestChangefeedNumSinkWorkersPrecedence(t *testing.T) {
12793+
defer leaktest.AfterTest(t)()
12794+
defer log.Scope(t).Close(t)
12795+
12796+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12797+
registry := s.Server.JobRegistry().(*jobs.Registry)
12798+
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
12799+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12800+
12801+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
12802+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
12803+
12804+
// Test 1: Per-changefeed option only (cluster setting = 0)
12805+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 0`)
12806+
foo1 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH num_sink_workers = '5'`)
12807+
defer closeFeed(t, foo1)
12808+
12809+
testutils.SucceedsSoon(t, func() error {
12810+
workers := metrics.ParallelIOWorkers.Value()
12811+
if workers != 5 {
12812+
return errors.Newf("expected 5 workers, got %d", workers)
12813+
}
12814+
return nil
12815+
})
12816+
require.NoError(t, foo1.Close())
12817+
12818+
// Test 2: Cluster setting = 10, per-changefeed = 3 → should use 3 (smaller)
12819+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
12820+
foo2 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH num_sink_workers = '3'`)
12821+
defer closeFeed(t, foo2)
12822+
12823+
testutils.SucceedsSoon(t, func() error {
12824+
workers := metrics.ParallelIOWorkers.Value()
12825+
if workers != 3 {
12826+
return errors.Newf("expected 3 workers (smaller of 10 and 3), got %d", workers)
12827+
}
12828+
return nil
12829+
})
12830+
require.NoError(t, foo2.Close())
12831+
12832+
// Test 3: Cluster setting = 2, per-changefeed = 8 → should use 2 (smaller)
12833+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 2`)
12834+
foo3 := feed(t, f, `CREATE CHANGEFEED FOR foo WITH num_sink_workers = '8'`)
12835+
defer closeFeed(t, foo3)
12836+
12837+
testutils.SucceedsSoon(t, func() error {
12838+
workers := metrics.ParallelIOWorkers.Value()
12839+
if workers != 2 {
12840+
return errors.Newf("expected 2 workers (smaller of 2 and 8), got %d", workers)
12841+
}
12842+
return nil
12843+
})
12844+
require.NoError(t, foo3.Close())
12845+
12846+
// Test 4: Cluster setting only (no per-changefeed option)
12847+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 7`)
12848+
foo4 := feed(t, f, `CREATE CHANGEFEED FOR foo`)
12849+
defer closeFeed(t, foo4)
12850+
12851+
testutils.SucceedsSoon(t, func() error {
12852+
workers := metrics.ParallelIOWorkers.Value()
12853+
if workers != 7 {
12854+
return errors.Newf("expected 7 workers (from cluster setting), got %d", workers)
12855+
}
12856+
return nil
12857+
})
12858+
require.NoError(t, foo4.Close())
12859+
}
12860+
12861+
// Test with sinks that support parallel IO
12862+
cdcTest(t, testFn, feedTestForceSink("pubsub"))
12863+
}

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ const (
115115
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
116116
OptIgnoreDisableChangefeedReplication = `ignore_disable_changefeed_replication`
117117
OptEncodeJSONValueNullAsObject = `encode_json_value_null_as_object`
118+
OptNumSinkWorkers = `num_sink_workers`
118119
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
119120
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
120121
OptHeadersJSONColumnName = `headers_json_column_name`
@@ -411,6 +412,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
411412
OptLaggingRangesPollingInterval: durationOption,
412413
OptIgnoreDisableChangefeedReplication: flagOption,
413414
OptEncodeJSONValueNullAsObject: flagOption,
415+
OptNumSinkWorkers: stringOption,
414416
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
415417
OptHeadersJSONColumnName: stringOption,
416418
OptExtraHeaders: jsonOption,
@@ -428,6 +430,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428430
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
429431
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
430432
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
433+
OptNumSinkWorkers,
431434
)
432435

433436
// SQLValidOptions is options exclusive to SQL sink
@@ -1203,6 +1206,27 @@ func (s StatementOptions) GetPTSExpiration() (time.Duration, error) {
12031206
return *exp, nil
12041207
}
12051208

1209+
// GetNumSinkWorkers returns the number of sink IO workers to use.
1210+
// Returns 0 if not set (which means use a reasonable default).
1211+
// Negative values disable sink IO workers.
1212+
func (s StatementOptions) GetNumSinkWorkers() (int64, error) {
1213+
v, ok := s.m[OptNumSinkWorkers]
1214+
if !ok {
1215+
return 0, nil
1216+
}
1217+
// Parse as int64
1218+
var result int64
1219+
if _, err := fmt.Sscanf(v, "%d", &result); err != nil {
1220+
return 0, errors.Newf("invalid integer value for %s: %q", OptNumSinkWorkers, v)
1221+
}
1222+
// Verify it's a clean parse by formatting back and comparing
1223+
// This rejects values like "3.14" which Sscanf would truncate to 3
1224+
if fmt.Sprintf("%d", result) != v {
1225+
return 0, errors.Newf("invalid integer value for %s: %q", OptNumSinkWorkers, v)
1226+
}
1227+
return result, nil
1228+
}
1229+
12061230
// ForceKeyInValue sets the encoding option KeyInValue to true and then validates the
12071231
// resoluting encoding options.
12081232
func (s StatementOptions) ForceKeyInValue() error {

pkg/ccl/changefeedccl/changefeedbase/options_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,68 @@ func TestOptionsValidations(t *testing.T) {
5151
}
5252
}
5353

54+
func TestNumSinkWorkersOption(t *testing.T) {
55+
defer leaktest.AfterTest(t)()
56+
defer log.Scope(t).Close(t)
57+
58+
tests := []struct {
59+
name string
60+
input map[string]string
61+
expected int64
62+
expectErr bool
63+
}{
64+
{
65+
name: "positive value",
66+
input: map[string]string{"num_sink_workers": "5"},
67+
expected: 5,
68+
expectErr: false,
69+
},
70+
{
71+
name: "zero value (default)",
72+
input: map[string]string{"num_sink_workers": "0"},
73+
expected: 0,
74+
expectErr: false,
75+
},
76+
{
77+
name: "negative value (disable)",
78+
input: map[string]string{"num_sink_workers": "-1"},
79+
expected: -1,
80+
expectErr: false,
81+
},
82+
{
83+
name: "not set",
84+
input: map[string]string{},
85+
expected: 0,
86+
expectErr: false,
87+
},
88+
{
89+
name: "invalid non-integer",
90+
input: map[string]string{"num_sink_workers": "abc"},
91+
expected: 0,
92+
expectErr: true,
93+
},
94+
{
95+
name: "invalid float",
96+
input: map[string]string{"num_sink_workers": "3.14"},
97+
expected: 0,
98+
expectErr: true,
99+
},
100+
}
101+
102+
for _, test := range tests {
103+
t.Run(test.name, func(t *testing.T) {
104+
o := MakeStatementOptions(test.input)
105+
val, err := o.GetNumSinkWorkers()
106+
if test.expectErr {
107+
require.Error(t, err)
108+
} else {
109+
require.NoError(t, err)
110+
require.Equal(t, test.expected, val)
111+
}
112+
})
113+
}
114+
}
115+
54116
func TestEncodingOptionsValidations(t *testing.T) {
55117
defer leaktest.AfterTest(t)()
56118
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/sink.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ func getSink(
259259
}
260260
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
261261
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts,
262-
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
262+
numSinkIOWorkers(serverCfg, opts), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
263263
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
264264
} else {
265265
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts, serverCfg.Settings, metricsBuilder)
@@ -283,7 +283,7 @@ func getSink(
283283
}
284284
return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
285285
return makeWebhookSink(ctx, &changefeedbase.SinkURL{URL: u}, encodingOpts, webhookOpts,
286-
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
286+
numSinkIOWorkers(serverCfg, opts), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
287287
metricsBuilder, serverCfg.Settings)
288288
})
289289
case isPubsubSink(u):
@@ -292,7 +292,7 @@ func getSink(
292292
testingKnobs = knobs
293293
}
294294
return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), targets,
295-
opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg),
295+
opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg, opts),
296296
newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
297297
metricsBuilder, serverCfg.Settings, testingKnobs)
298298
case isCloudStorageSink(u):
@@ -446,9 +446,11 @@ type encDatumRowBuffer []rowenc.EncDatumRow
446446
func (b *encDatumRowBuffer) IsEmpty() bool {
447447
return b == nil || len(*b) == 0
448448
}
449+
449450
func (b *encDatumRowBuffer) Push(r rowenc.EncDatumRow) {
450451
*b = append(*b, r)
451452
}
453+
452454
func (b *encDatumRowBuffer) Pop() rowenc.EncDatumRow {
453455
ret := (*b)[0]
454456
*b = (*b)[1:]
@@ -745,7 +747,7 @@ func getSinkConfigFromJson(
745747
) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) {
746748
retryCfg = defaultRetryConfig()
747749

748-
var cfg = baseConfig
750+
cfg := baseConfig
749751
cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries)
750752
cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff)
751753
if jsonStr != `` {
@@ -802,8 +804,38 @@ func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
802804
return nil
803805
}
804806

805-
func numSinkIOWorkers(cfg *execinfra.ServerConfig) int {
806-
numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
807+
func numSinkIOWorkers(cfg *execinfra.ServerConfig, opts changefeedbase.StatementOptions) int {
808+
// Get per-changefeed option
809+
changefeedWorkers, err := opts.GetNumSinkWorkers()
810+
if err != nil {
811+
// Log error but continue with cluster setting
812+
log.Changefeed.Warningf(context.Background(), "error getting num_sink_workers option: %v", err)
813+
changefeedWorkers = 0
814+
}
815+
816+
// Get cluster setting
817+
clusterWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
818+
819+
// Apply precedence logic:
820+
// 1. If both are positive, use the smaller value (cluster can cap)
821+
// 2. If one is positive and the other is <=0, use the positive value
822+
// 3. If both are <=0, use automatic default based on GOMAXPROCS
823+
824+
var numWorkers int64
825+
if changefeedWorkers > 0 && clusterWorkers > 0 {
826+
if changefeedWorkers < clusterWorkers {
827+
numWorkers = changefeedWorkers
828+
} else {
829+
numWorkers = clusterWorkers
830+
}
831+
} else if changefeedWorkers > 0 {
832+
numWorkers = changefeedWorkers
833+
} else if clusterWorkers > 0 {
834+
numWorkers = clusterWorkers
835+
} else {
836+
numWorkers = 0
837+
}
838+
807839
if numWorkers > 0 {
808840
return int(numWorkers)
809841
}

pkg/ccl/changefeedccl/sink_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2323
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2424
"github.com/cockroachdb/cockroach/pkg/security/username"
25+
"github.com/cockroachdb/cockroach/pkg/sql"
2526
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2627
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2728
"github.com/cockroachdb/cockroach/pkg/testutils/pgurlutils"
@@ -1014,3 +1015,101 @@ func TestChangefeedConsistentPartitioning(t *testing.T) {
10141015
}
10151016

10161017
}
1018+
1019+
func TestNumSinkIOWorkers(t *testing.T) {
1020+
defer leaktest.AfterTest(t)()
1021+
defer log.Scope(t).Close(t)
1022+
1023+
ctx := context.Background()
1024+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
1025+
defer s.Stopper().Stop(ctx)
1026+
1027+
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
1028+
1029+
tests := []struct {
1030+
name string
1031+
clusterSetting int64
1032+
changefeedOption string // empty means not set
1033+
expectedWorkers int
1034+
expectUseDefault bool // true if we expect GOMAXPROCS-based default
1035+
}{
1036+
{
1037+
name: "both zero - use default",
1038+
clusterSetting: 0,
1039+
changefeedOption: "0",
1040+
expectUseDefault: true,
1041+
},
1042+
{
1043+
name: "cluster positive, option not set - use cluster",
1044+
clusterSetting: 10,
1045+
changefeedOption: "",
1046+
expectedWorkers: 10,
1047+
},
1048+
{
1049+
name: "cluster zero, option positive - use option",
1050+
clusterSetting: 0,
1051+
changefeedOption: "15",
1052+
expectedWorkers: 15,
1053+
},
1054+
{
1055+
name: "both positive, option smaller - use option",
1056+
clusterSetting: 10,
1057+
changefeedOption: "5",
1058+
expectedWorkers: 5,
1059+
},
1060+
{
1061+
name: "both positive, cluster smaller - use cluster",
1062+
clusterSetting: 10,
1063+
changefeedOption: "15",
1064+
expectedWorkers: 10,
1065+
},
1066+
{
1067+
name: "cluster negative - use default",
1068+
clusterSetting: -1,
1069+
changefeedOption: "",
1070+
expectUseDefault: true,
1071+
},
1072+
{
1073+
name: "option negative - use default",
1074+
clusterSetting: 0,
1075+
changefeedOption: "-1",
1076+
expectUseDefault: true,
1077+
},
1078+
{
1079+
name: "both negative - use default",
1080+
clusterSetting: -1,
1081+
changefeedOption: "-1",
1082+
expectUseDefault: true,
1083+
},
1084+
{
1085+
name: "option not set, cluster zero - use default",
1086+
clusterSetting: 0,
1087+
changefeedOption: "",
1088+
expectUseDefault: true,
1089+
},
1090+
}
1091+
1092+
for _, test := range tests {
1093+
t.Run(test.name, func(t *testing.T) {
1094+
// Set cluster setting
1095+
changefeedbase.SinkIOWorkers.Override(ctx, &execCfg.Settings.SV, test.clusterSetting)
1096+
1097+
// Create statement options
1098+
optsMap := make(map[string]string)
1099+
if test.changefeedOption != "" {
1100+
optsMap[changefeedbase.OptNumSinkWorkers] = test.changefeedOption
1101+
}
1102+
opts := changefeedbase.MakeStatementOptions(optsMap)
1103+
1104+
result := numSinkIOWorkers(&execCfg.DistSQLSrv.ServerConfig, opts)
1105+
1106+
if test.expectUseDefault {
1107+
// Should be between 1 and 32 (GOMAXPROCS-based)
1108+
require.GreaterOrEqual(t, result, 1)
1109+
require.LessOrEqual(t, result, 32)
1110+
} else {
1111+
require.Equal(t, test.expectedWorkers, result)
1112+
}
1113+
})
1114+
}
1115+
}

0 commit comments

Comments
 (0)