Skip to content

Conversation

@fabschn
Copy link

@fabschn fabschn commented Oct 27, 2025

Previously, the number of SinkIOWorkers could only be defined in cluster-wide setting.

This patch allows to set the number of SinkIOWorkers per changefeed.

If both (cluster-wide, per-changefeed) values are given, the min of the two will be used.

Release note (sql change): The CREATE CHANGEFEED statement was extended with an optional num_sink_workers setting that can be used to set the number of SinkIOWorkers per changefeed. Note that the number of workers is capped by the cluster-wide setting if present.

Fixes: #154546

@fabschn fabschn requested a review from a team as a code owner October 27, 2025 07:17
@fabschn fabschn requested review from rharding6373 and removed request for a team October 27, 2025 07:17
@blathers-crl
Copy link

blathers-crl bot commented Oct 27, 2025

Thank you for contributing to CockroachDB. Please ensure you have followed the guidelines for creating a PR.

My owl senses detect your PR is good for review. Please keep an eye out for any test failures in CI.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@blathers-crl blathers-crl bot added the O-community Originated from the community label Oct 27, 2025
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@cockroachlabs-cla-agent
Copy link

cockroachlabs-cla-agent bot commented Oct 27, 2025

CLA assistant check
All committers have signed the CLA.

@asg0451 asg0451 requested review from aerfrei and removed request for rharding6373 October 27, 2025 13:56
Copy link
Contributor

@aerfrei aerfrei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitting this! Looks good, left a few comments.

} else if clusterWorkers > 0 {
numWorkers = clusterWorkers
} else {
numWorkers = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it would set numWorkers to 0, even if both the cluster setting and changefeed value were negative.

(Aside: Seems to me that we may not be currently respecting the negative value as documented which I can look at/file an issue for.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the current implementation does not adhere to the documentation:

"the number of workers used by changefeeds when sending requests to the sink "+
"(currently the batching versions of webhook, pubsub, and kafka sinks that are "+
"enabled by changefeed.new__sink_enabled only): <0 disables, 0 assigns "+
"a reasonable default, >0 assigns the setting value",

When a value <0 is provided, a default will be set, see current implementation:

numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
if numWorkers > 0 {
    return int(numWorkers)
}

idealNumber := runtime.GOMAXPROCS(0)
if idealNumber < 1 {
	return 1
}
if idealNumber > 32 {
	return 32
}
return idealNumber

Would you like to tackle this as part of this PR or create a separate issue for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to micromanage here. But I think the max that you are now using here could use a comment. But for this case where one of changefeedWorkers and clusterWorkers is zero and one is negative or both are negative, it may become useful to have the local numWorkers variable maintain that information if the current implementation doesn't respect it. I'd like to see tests for these cases also e.g. what happens if one value is negative and the other positive,

P.S. Out of curiousity, are you using AI to generate this PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to micromanage here.

This is not how interpret this or other comments - and after all, you're the code owner and will maintain this code. Let's make sure that it lives up to your standards.

But could you help me here on what you'd expect? My struggle with comments has been like a red thread through this PR, so I'd appreciate some guidance.

But for this case where one of changefeedWorkers and clusterWorkers is zero and one is negative or both are negative, it may become useful to have the local numWorkers variable maintain that information if the current implementation doesn't respect it.
I'd like to see tests for these cases also e.g. what happens if one value is negative and the other positive,

I added cases for

  • one value being negative, the other positive -> asserting that the positive value is applied
  • both values being negative -> asserting that a runtime.GOMAXPROCS based default is applied

Let me know that is according to your expectations - I have a hunch that I'm not really grasping what your intention in the first paragraph of the quote is.

P.S. Out of curiousity, are you using AI to generate this PR?

I used Claude Code to suggest where changes would be needed and where e.g. test cases should be added as I'm largely unfamiliar with the codebase. I had it then add some scaffolding in the form of function definitions and filled in the rest.
Also out of curiosity: what made you ask? Did you react to anything in the code or our conversation that had a "this is AI" flair to it?

@fabschn fabschn force-pushed the changefeed-num-sink-workers branch from 3a810a8 to 7213958 Compare October 27, 2025 20:05
@blathers-crl
Copy link

blathers-crl bot commented Oct 27, 2025

Thank you for updating your pull request.

My owl senses detect your PR is good for review. Please keep an eye out for any test failures in CI.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

Copy link
Author

@fabschn fabschn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the swift review! I addressed the comments and amended my initial commit as described in the Submitting your contribution section of the contribution guidelines.

Let me know if you'd like to see any other changes.

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)

// Test 1: Per-changefeed = 5, cluster setting = 0 -> should use 5.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These comments also don't adhere to the style guide. There are other comments in the test files (including inline comments) that also do not.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comments. As these tests are now implemented as subtests, the subtest names replaced the comments. Let me know if you'd like the subtest names to be changed.

require.NoError(t, foo1.Close())

// Test 2: Cluster setting = 10, per-changefeed = 3 → should use 3 (smaller).
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 10`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please also implement these tests as separate subtests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func numSinkIOWorkers(cfg *execinfra.ServerConfig, opts changefeedbase.StatementOptions) int {
changefeedWorkers, err := opts.GetNumSinkWorkers()
if err != nil {
// Log error but continue with cluster setting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we expect errors here? I don't think this is the right behavior to trudge forward with 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNumSinkWorkers() returns an error in case the user sets num_sink_workers to a value that cannot be parsed as an integer.
I opted for this approach as a value of 0 would lead to a sensible default number of workers while still creating the sink. But you have more context, so please let me know what you'd rather expect to happen.

Would you rather propagate the error and not create the sink?

} else if clusterWorkers > 0 {
numWorkers = clusterWorkers
} else {
numWorkers = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to micromanage here. But I think the max that you are now using here could use a comment. But for this case where one of changefeedWorkers and clusterWorkers is zero and one is negative or both are negative, it may become useful to have the local numWorkers variable maintain that information if the current implementation doesn't respect it. I'd like to see tests for these cases also e.g. what happens if one value is negative and the other positive,

P.S. Out of curiousity, are you using AI to generate this PR?


result := numSinkIOWorkers(&execCfg.DistSQLSrv.ServerConfig, opts)

if test.expectUseDefault {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we should check that we're using the GOMAXPROCS-based default value exactly, not just that the result falls in a range from 1 to 32, since most of the test values fall in that range anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

require.NoError(t, foo4.Close())
}

// Test with sinks that support parallel IO.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pubsub the only sink that supports parallel IO?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaiu kafka, webhook and pubsub support parallel IO (my assumption is based on numSinkIOWorkers being called during the creation of these three types of sink).

Let me know what the intention of your question is:

For now, I updated the comment to reflect that we're testing with "...a sink that supports...".
Alternatively, it can of course be tested with all sinks that support parallel IO if you think that this provides additional value.

clusterWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)

// Apply precedence logic:
// 1. If both are positive, use the smaller value (cluster can cap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: To be clear, I think this comment is also reading the code out loud. If you are to include, please write in full sentences (it took me a sec to parse "cluster can cap". With periods and all that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment.

@aerfrei
Copy link
Contributor

aerfrei commented Oct 28, 2025

@fabschn please also take a look at the extended CI failure. It may be unrelated but if it is, please verify that and leave a comment here. Thanks so much!

Previously, the number of SinkIOWorkers could only be defined in
cluster-wide setting.

This patch allows to set the number of SinkIOWorkers per changefeed.

If both (cluster-wide, per-changefeed) values are given, the min of the
two will be used.

Release note (sql change): The CREATE CHANGEFEED statement was extended
with an optional `num_sink_workers` setting that can be used to set the
number of SinkIOWorkers per changefeed. Note that the number of workers
is capped by the cluster-wide setting if present.

Fixes: cockroachdb#154546
@fabschn fabschn force-pushed the changefeed-num-sink-workers branch from 7213958 to d035bed Compare October 30, 2025 18:48
@fabschn
Copy link
Author

fabschn commented Oct 30, 2025

@fabschn please also take a look at the extended CI failure. It may be unrelated but if it is, please verify that and leave a comment here. Thanks so much!

I can't see details of the Bazel Exended CI job as I don't have access to TeamCity. Let me know if you can and notice that my changes have introduced an issue.

The Claude Code PR Review job fails with the following error:

Checking permissions for actor: fabschn
Permission level retrieved: read
Warning: Actor has insufficient permissions: read
Error: Prepare step failed with error: Actor does not have write permissions to the repository
Error: Process completed with exit code 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

O-community Originated from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

changefeedccl: make a changefeed setting for sink IO workers

3 participants