Skip to content

KAFKA-19078: Automatic controller addition to cluster metadata partition #19589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: trunk
Choose a base branch
from

Conversation

kevin-wu24
Copy link
Contributor

@kevin-wu24 kevin-wu24 commented Apr 28, 2025

Add the controller.quorum.auto.join.enable configuration. When enabled
with KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:

  • Automatically remove voter set entries which match their replica id
    but not directory id by sending the RemoveVoterRPC to the leader.
  • Automatically add themselves as a voter when their replica id is not
    present in the voter set by sending the AddVoterRPC to the leader.

Reviewers: José Armando García Sancio
[email protected]

@github-actions github-actions bot added triage PRs from the community kraft labels Apr 28, 2025
@github-actions github-actions bot removed the triage PRs from the community label Apr 30, 2025
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feature @kevin-wu24 . Reviewed src/main. Need to review the tests.

Comment on lines 3323 to 3324
} else if (partitionState.lastKraftVersion().isReconfigSupported() && followersAlwaysFlush &&
quorumConfig.autoJoinEnable() && state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I think we should document why we require both followersAlwaysFlush and autoJoinEnable to be true.

@jsancio
Copy link
Member

jsancio commented May 1, 2025

Feature description: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217391519#KIP853:KRaftControllerMembershipChanges-Controllerautojoining

Add the controller.quorum.auto.join.enable configuration. When enabled with KIP-853 supported, follower controllers who are observers (their replica id + directory id are not in the voter set) will:

  • Automatically remove voter set entries which match their replica id but not directory id.
  • Automatically add themselves as a voter when their replica id is not present in the voter set.

I am going to use this description to generate the commit message. I don't think we should include URLs in the commit description. Ideally, the commit message should explain the changes without having to read other documents.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes.

RaftResponse.Inbound responseMetadata,
long currentTimeMs
) {
RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast to RemoveRaftVoterResponseData yet the method is for handleAddVoterResponse. Do the tests pass for you? How is that possible with this cast?

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because we are not testing the response handling anywhere. In the unit tests we are only asserting that the add/remove voter request was sent when we expect it.

I think we should do something like with fetch where we complete the request (I assume this testing method executes the handleFetchResponse). I need to see how we implement this for fetch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't KafkaRaftClientAutoJoinTest send add voter and remove voter responses to the replica?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does, I just looked at the code again. The tests do pass, but this is because this line in the test is sending a remove voter response: https://github.com/apache/kafka/pull/19589/files#diff-7b1538d9f4f7f1e27a444aac55ced6f780e22bfdd8af4462e82d62c24f778c85R94.
Both the implementation and test need to be fixed, thanks for the catch.

private static final int NUMBER_FETCH_TIMEOUTS_IN_ADD_REMOVE_PERIOD = 1;

@Test
public void testAutoRemoveOldVoter() throws Exception {
Copy link
Member

@jsancio jsancio May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test that fully does a remove followed by an add? E.g.

  1. Start with the local replica not in the voter set but the id in the voter set.
  2. Remove voter is sent and acknowledged.
  3. Next FETCH response send the VOTER_RECORD control batch without the voter in the old voter in the voter set.
  4. Add voter is sent and acknowledged.
  5. Next FETCH response send a VOTER_RECORD control btach with the local replica in the voter set.

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add one since it acts as like a pseudo-integration test for the feature. I'm not seeing anywhere in KafkaRaftClientReconfigTest or KafkaRaftClientFetchTest that covers step 2. I see a KafkaRaftClientTest#testFollowerReplication but it doesn't add control records in the FETCH response when kraft.version == 1.

Comment on lines 63 to 66
// after sending a remove voter the next request should be a fetch
context.pollUntilRequest();
var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this, I think we should check that remove voter is sent after completeFetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Why would we do this check again? I'm already doing this check from L52-56.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to cover the case where the new voter sent a add/remove voter RPC. The RPC was acknowledge but the log was never updated (fetch responses didn't include the updated voter set). In this case the new voter will send another add/remove voter RPC after "update voter set period timer", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Since the local replica is still an observer, it should still try to add/remove itself if the log hasn't been updated with fetch.

Comment on lines 97 to 100
// after sending an add voter the next request should be a fetch
context.pollUntilRequest();
var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this, I think we should check that add voter is sent after another completeFetch?

@github-actions github-actions bot added the core Kafka Broker label May 7, 2025
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great tests. We should be able to merge this improvement soon.

@@ -178,6 +182,7 @@ public void pollOnce() {
requestThread.doWork();
}

@SuppressWarnings("NPathComplexity")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.

You can also try using Java's new pattern matching and lambda syntax for switch statements. E.g.

return switch (requestData) {
    case VoterRequestData voterData -> new VoterRequest.Builder(voterData);
    ...
    default -> throw new IllegalArgumentException(...);
}

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did this, but I got a build error when running ./gradlew jar. It's pretty weird, because we're supposed to be on Java 17 according to gradle, and my IDE said that this kind of instanceof switch matching is supported in Java 17+. Let me try this again and put the actual error message here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I get when running ./gradlew jar:

error: patterns in switch statements are a preview feature and are disabled by default.
...
(use --enable-preview to enable patterns in switch statements)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try using if (...) { return } else if (...) { return } ... and see if that reduces the path complexity so you can remove this suppression.

This worked, thanks!

Copy link
Member

@jsancio jsancio May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think I have seen this before. I think Java 21 is the oldest release that implemented pattern matching.

)
);
}

private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendFetchToAnyBootstrap(currentTimeMs);
} else if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
/* Only replicas that are always flushing and are configured to auto join should
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "are always flushing" with "can become a voter."

0,
epoch,
BufferSupplier.NO_CACHING.get(300),
VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

                    VoterSetTest.voterSet(Stream.of(leader)).toVotersRecord((short) 0)
                ),

0,
epoch,
BufferSupplier.NO_CACHING.get(300),
VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing newline.

                    VoterSetTest.voterSet(Stream.of(leader, newFollowerKey)).toVotersRecord((short) 0)
                ),

)
);
// poll kraft to update the replica's voter set
context.client.poll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding this time advancement and showing that the replica sent a fetch request?

        context.advanceTimeAndFetchToUpdateVoterSetTimer(epoch, leader.id());
        context.time.sleep(context.fetchTimeoutMs - 1);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added and cleaning up this test file for readability, since there's a lot of duplicate code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sleep method as part of the advanceTimeAndCompleteFetch helper, since that is what actually expires the timer. I also added a boolean flag to the method to determine whether sleep is called again.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin-wu24 , can you resolve the conflicts?

@kevin-wu24 kevin-wu24 changed the title KAFKA-19078: Implement automatic controller addition to cluster metadata partition KAFKA-19078: Automatic controller addition to cluster metadata partition Jul 30, 2025
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @kevin-wu24 . Partial review.

Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Is there a way to get the exact directory id (UUID) and compare against that instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can check that each replica has the exact metadata dir ID as what is in the TestKitNodes.

quorumConfig.requestTimeoutMs(),
quorum.localReplicaKeyOrThrow(),
localListeners,
!quorumConfig.autoJoin()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the negative of auto join? Shouldn't it always be false? If KRaft send an "add voter" request, it should always be version 1 and return before committing.

@@ -524,14 +526,16 @@ public static AddRaftVoterRequestData addVoterRequest(
String clusterId,
int timeoutMs,
ReplicaKey voter,
Endpoints listeners
Endpoints listeners,
boolean ackWhenCommitted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the add voter request specific for the kraft implementation. "Ack when committed" should always be false. If that's true then let's remove this parameter and not give the caller the option to set it. In the implementation, the method should always call setAckWhenCommitted(false).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Ack when committed" should always be false

I don't think this is true, since there are callers of this method in KafkaRaftClientReconfigTest that do test sending an AddVoterRequest with ackWhenCommitted == true since it is testing the leader state when receiving this RPC. I agree that KafkaRaftClient should always call this method with false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fair.

Comment on lines 3449 to 3450
long currentTimeMs,
ReplicaKey replicaKey
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flip the order of these parameters. The kraft module has a pattern of using the last parameter as the current time when needed.

Comment on lines 105 to 106
pollAndDeliverFetchToUpdateVoterSet(context, epoch,
VoterSetTest.voterSet(Stream.of(leader, newVoter)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this indentation. How about:

        pollAndDeliverFetchToUpdateVoterSet(
            context,
            epoch,
            VoterSetTest.voterSet(Stream.of(leader, newVoter))
        );

RaftRequest.Outbound assertSentAddVoterRequest(
ReplicaKey replicaKey,
Endpoints endpoints,
boolean expectedAckWhenCommitted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not give the caller the option to override this. This value should always be false and this method should just check for that explicitly.

BufferSupplier.NO_CACHING.get(300),
newVoterSet.toVotersRecord((short) 0)
),
context.log.endOffset().offset() + 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the voter set only has one voter hence one record.

Copy link
Contributor Author

@kevin-wu24 kevin-wu24 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this assume the voter set only has one voter? Which voter set are you referencing?

testObserverRemovesOldVoterAndAutoJoins has the voter set go from size 2, to size 1, and then back to size 2 by having a follower node complete the whole "auto-join" flow (i.e. remove its old self and and its new self to the voter set).

fetchRequest.destination().id(),
MemoryRecords.withVotersRecord(
context.log.endOffset().offset(),
0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe we can use context.time.milliseconds().

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kevin-wu24 . I think we should be able to merge this soon.

@@ -524,14 +526,16 @@ public static AddRaftVoterRequestData addVoterRequest(
String clusterId,
int timeoutMs,
ReplicaKey voter,
Endpoints listeners
Endpoints listeners,
boolean ackWhenCommitted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fair.

)
);
} else {
return maybeSendFetchToBestNode(state, currentTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This backoff not correct now that observers can send AddVoter and RemoveVoter requests. Take a look how I solved it for pollFollowerAsVoter.

Comment on lines 3326 to 3327
if (partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a shouldSendAddOrRemoveVoterRequest similar to shouldSendUpdateVoterRequest. This would allow you to better document this predicate.

Comment on lines +3326 to +3327
return partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document why we need this predicate. See shouldSendUpdateVoteRequest for an example.

Comment on lines 3338 to 3340
/* Only replicas that can become a voter and are configured to auto join should
* attempt to automatically join the voter set for the configured topic partition.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment but you can move this comment to shouldSendAddOrRemoveVoterRequest.

return Math.min(
backoffMs,
Math.min(
state.remainingFetchTimeMs(currentTimeMs),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observer don't need to backoff until the fetch timeout since observer do not read or handle fetch timeouts.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants