Skip to content

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Aug 11, 2025

No description provided.

apalan60 and others added 30 commits April 21, 2025 15:35
…9521)

This patch addresses issue apache#19516 and corrects a typo in
`ApiKeyVersionsProvider`: when `toVersion` exceeds  `latestVersion`, the
`IllegalArgumentException` message was erroneously formatted with
`fromVersion`. The format argument has been updated to use `toVersion`
so that the error message reports the correct value.

Reviewers: Ken Huang <[email protected]>, PoAn Yang
 <[email protected]>, Jhen-Yung Hsu <[email protected]>, Chia-Ping
 Tsai <[email protected]>
The check for `scheduler.pendingTaskSize()` may fail if the thread pool
is too slow to consume the runnable objects

Reviewers: Ken Huang <[email protected]>, PoAn Yang
 <[email protected]>, Chia-Ping Tsai <[email protected]>
…#17099)

Two sets of tests are added:
1. KafkaProducerTest
- when send success, both record.headers() and onAcknowledgement headers
are read only
- when send failure, record.headers() is writable as before and
onAcknowledgement headers is read only
2. ProducerInterceptorsTest
- make both old and new onAcknowledgement method are called successfully

Reviewers: Lianet Magrans <[email protected]>, Omnia Ibrahim
<[email protected]>, Matthias J. Sax <[email protected]>,
Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
…pache#19437)

This PR adds the support for remote storage fetch for share groups.

There is a limitation in remote storage fetch for consumer groups that
we can only perform remote fetch for a single topic partition in a fetch
request. Since, the logic of share fetch requests is largely based on
how consumer groups work, we are following similar logic in implementing 
remote storage fetch. However, this problem should be addressed as 
part of KAFKA-19133 which should help us perform fetch for multiple 
remote fetch topic partition in a single share fetch request.

Reviewers: Jun Rao <[email protected]>
The release script was pushing the RC tag off of a temporary branch that
was never merged back into the release branch. This meant that our RC
and release tags were detached from the rest of the repository.

This patch changes the release script to merge the RC tag back into the
release branch and pushes both the tag and the branch.

Reviewers: Luke Chen <[email protected]>
This PR removes the unstable API flag for the KIP-932 RPCs.

The 4 RPCs which were exposed for the early access release in AK 4.0 are
stabilised at v1. This is because the RPCs have evolved over time and AK
4.0 clients are not compatible with AK 4.1 brokers. By stabilising at
v1, the API version checks prevent incompatible communication and
server-side exceptions when trying to parse the requests from the older
clients.

Reviewers: Apoorv Mittal <[email protected]>
…19500)

Currently the share session cache is desgined like the fetch session
cache. If the cache is full and a new share session is trying to get get
initialized, then the sessions which haven't been touched for more than
2minutes are evicted. This wouldn't be right for share sessions as the
members also hold locks on the acquired records, and session eviction
would mean theose locks will need to be dropped and the corresponding
records re-delivered. This PR removes the time based eviction logic for
share sessions.

Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159)

Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
Small improvements to share consumer javadoc.

Reviewers: Apoorv Mittal <[email protected]>
Updated the Kafka Streams documentation to include metrics for tasks,
process nodes, and threads that were missing. I was unable to find
metrics such as stream-state-metrics, client-metrics,
state-store-metrics, and record-cache-metrics in the codebase, so they
are not included in this update.

Reviewers: Bill Bejeck <[email protected]>
…ache#19416)

This change implements upgrading the kraft version from 0 to 1 in existing clusters.
Previously, clusters were formatted with either version 0 or version 1, and could not
be moved between them.

The kraft version for the cluster metadata partition is recorded using the
KRaftVersion control record. If there is no KRaftVersion control record
the default kraft version is 0.

The kraft version is upgraded using the UpdateFeatures RPC. These RPCs
are handled by the QuorumController and FeatureControlManager. This
change adds special handling in the FeatureControlManager so that
upgrades to the kraft.version are directed to
RaftClient#upgradeKRaftVersion.

To allow the FeatureControlManager to call
RaftClient#upgradeKRaftVersion is a non-blocking fashion, the kraft
version upgrade uses optimistic locking. The call to
RaftClient#upgradeKRaftVersion does validations of the version change.
If the validations succeeds, it generates the necessary control records
and adds them to the BatchAccumulator.

Before the kraft version can be upgraded to version 1, all of the
brokers and controllers in the cluster need to support kraft version 1.
The check that all brokers support kraft version 1 is done by the
FeatureControlManager. The check that all of the controllers support
kraft version is done by KafkaRaftClient and LeaderState.

When the kraft version is 0, the kraft leader starts by assuming that
all voters do not support kraft version 1. The leader discovers which
voters support kraft version 1 through the UpdateRaftVoter RPC. The
KRaft leader handles UpdateRaftVoter RPCs by storing the updated
information in-memory until the kraft version is upgraded to version 1.
This state is stored in LeaderState and contains the latest directory
id, endpoints and supported kraft version for each voter.

Only when the KRaft leader has received an UpdateRaftVoter RPC from all
of the voters will it allow the upgrade from kraft.version 0 to 1.

Reviewers: Alyssa Huang <[email protected]>, Colin P. McCabe <[email protected]>
This patch extends the OffsetCommit API to support topic ids. From
version 10 of the API, topic ids must be used. Originally, we wanted to
support both using topic ids and topic names from version 10 but it
turns out that it makes everything more complicated. Hence we propose to
only support topic ids from version 10. Clients which only support using
topic names can either lookup the topic ids using the Metadata API or
stay on using an earlier version.

The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.

Reviewers: Lianet Magrans <[email protected]>, PoAn Yang <[email protected]>
…a result of change in assignor algorithm (apache#19541)

The system test `ShareConsumerTest.test_share_multiple_partitions`
started failing because of the recent change in the SimpleAssignor
algorithm. The tests assumed that if a share group is subscribed to a
topic, then every share consumers part of the group will be assigned all
partitions of the topic. But that does not happen now, and partitions
are split between the share consumers in certain cases, in which some
partitions are only assigned to a subset of share consumers. This change
removes that assumption

Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>
…ionCache (apache#19505)

This PR removes the group.share.max.groups config. This config was used
to calculate the maximum size of share session cache. But with the new
config group.share.max.share.sessions in place with exactly this
purpose, the ShareSessionCache initialization has also been passed the
new config.

Refer: [KAFKA-19156](https://issues.apache.org/jira/browse/KAFKA-19156)

Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
…ache#19443)

* There could be scenarios where share partition records in
`__share_group_state` internal topic are not updated for a while
implying these partitions are basically cold.
* In this situation, the presence of these holds back the
pruner from keeping the topic clean and of manageable size.
* To remedy the situation, we have added a periodic
`setupSnapshotColdPartitions` in `ShareCoordinatorService` which does a
writeAll operation on the associated shards in the coordinator and
forces snapshot creation for any cold partitions. In this way the pruner
can continue.
This job has been added as a timer task.
* A new internal config
`share.coordinator.cold.partition.snapshot.interval.ms` has been
introduced to set the period of the job.
* Any failures are logged and ignored.
* New tests have been added to verify the feature.

Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>
Improves a variable name and handling of an Optional.

Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
…pache#19440)

Introduces a concrete subclass of `KafkaThread` named `SenderThread`.
The poisoning of the TransactionManager on invalid state changes is
determined by looking at the type of the current thread.

Reviewers: Chia-Ping Tsai <[email protected]>
…pache#19457)

- Construct `AsyncKafkaConsumer` constructor and verify that the
`RequestManagers.supplier()` contains Streams-specific data structures.
- Verify that `RequestManagers` constructs the Streams request managers
correctly
- Test `StreamsGroupHeartbeatManager#resetPollTimer()`
- Test `StreamsOnTasksRevokedCallbackCompletedEvent`,
`StreamsOnTasksAssignedCallbackCompletedEvent`, and
`StreamsOnAllTasksLostCallbackCompletedEvent` in
`ApplicationEventProcessor`
- Test `DefaultStreamsRebalanceListener`
- Test `StreamThread`.
  - Test `handleStreamsRebalanceData`.
  - Test `StreamsRebalanceData`.

Reviewers: Lucas Brutschy <[email protected]>, Bill Bejeck <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
…he#19547)

Change the log messages which used to warn that KIP-932 was an Early
Access feature to say that it is now a Preview feature. This will make
the broker logs far less noisy when share groups are enabled.

Reviewers: Apoorv Mittal <[email protected]>
The generated response data classes take Readable as input to parse the
Response. However, the associated response objects take ByteBuffer as
input and thus convert them to Readable using `new ByteBufferAccessor`
call.

This PR changes the parse method of all the response classes to take the
Readable interface instead so that no such conversion is needed.

To support parsing the ApiVersionsResponse twice for different version
this change adds the "slice" method to the Readable interface.

Reviewers: José Armando García Sancio <[email protected]>, Truc Nguyen
<[[email protected]](mailto:[email protected])>, Aadithya
Chandra <[[email protected]](mailto:[email protected])>
…#19549)

The heartbeat logic for share groups is tricky when the set of
topic-partitions eligible for assignment changes. We have observed epoch
mismatches when brokers are restarted, which should not be possible.
Improving the logging so we can see the previous member epoch and tally
this with the logged state.

Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan <[email protected]>
…19536)

This PR marks the records as non-nullable for ShareFetch.

This PR is as per the changes for Fetch:
apache#18726 and some work for ShareFetch
was done here: apache#19167. I tested with
marking `records` as non-nullable in ShareFetch, which required
additional handling. The same has been fixed in current PR.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
 <[email protected]>, TengYao Chi <[email protected]>, PoAn Yang
 <[email protected]>
…tProducerId (KIP-939) (apache#19429)

This is part of the client side changes required to enable 2PC for
KIP-939

**Producer Config:**
transaction.two.phase.commit.enable The default would be ‘false’.  If
set to ‘true’, the broker is informed that the client is participating
in two phase commit protocol and transactions that this client starts
never expire.

**Overloaded InitProducerId method**
If the value is 'true' then the corresponding field is set in the
InitProducerIdRequest

Reviewers: Justine Olshan <[email protected]>, Artem Livshits
 <[email protected]>
This patch does a few code changes:
* It cleans up the GroupCoordinatorService;
* It moves the helper methods to validate request to Utils;
* It moves the helper methods to create the assignment for the
ConsumerGroupHeartbeatResponse and the ShareGroupHeartbeatResponse from
the GroupMetadataManager to the respective classes.

Reviewers: Chia-Ping Tsai <[email protected]>, Jeff Kim <[email protected]>
…rvers (apache#19545)

Old bootstrap.metadata files cause problems with server that include
KAFKA-18601. When the server tries to read the bootstrap.checkpoint
file, it will fail if the metadata.version is older than 3.3-IV3
(feature level 7). This causes problems when these clusters are
upgraded.

This PR makes it possible to represent older MVs in BootstrapMetadata
objects without causing an exception. An exception is thrown only if we
attempt to access the BootstrapMetadata. This ensures that only the code
path in which we start with an empty metadata log checks that the
metadata version is 7 or newer.

Reviewers: José Armando García Sancio <[email protected]>, Ismael Juma
 <[email protected]>, PoAn Yang <[email protected]>, Liu Zeyu
 <[email protected]>, Alyssa Huang <[email protected]>
Replace names like a, b, c, ... with meaningful names in
AsyncKafkaConsumerTest.

Follow-up:
apache#19457 (comment)

Signed-off-by: PoAn Yang <[email protected]>

Reviewers: Bill Bejeck <[email protected]>, Ken Huang <[email protected]>
aliehsaeedii and others added 7 commits April 24, 2025 21:23
…pache#19450)

Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`.
However, the dirty task must not get committed and therefore,
prepare-commit tasks such as getting offsets should not be needed as
well. The only thing needed before closing a task dirty is flushing.
Therefore, separating `flush` and `prepareCommit` could be a good fix.

Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
…ache#19548)

If a streams, share or consumer group is described, all group IDs sent
to all shards of the group coordinator. This change fixes it. It tested
in the unit tests, since it's somewhat inconvenient to test the passed
read operation lambda.

Reviewers: David Jacot <[email protected]>, Andrew Schofield
<[email protected]>
apache#19552)

This PR just resolves an NPE when a topic assigned in a share group is
deleted. The NPE is caused by code which uses the current metadata image
to convert from a topic ID to the topic name. For a deleted topic, there
is no longer any entry in the image. A future PR will properly handle
the topic deletion.

Reviewers: Apoorv Mittal <[email protected]>, PoAn Yang <[email protected]>
If the streams rebalance protocol is enabled in
StreamsUncaughtExceptionHandlerIntegrationTest, the streams application
does not shut down correctly upon error.

There are two causes for this. First, sometimes, the SHUTDOWN_APPLICATION
code only sent with the leave heartbeat, but that is not handled broker
side. Second, the SHUTDOWN_APPLICATION code wasn't properly handled
client-side at all.

Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck
 <[email protected]>, PoAn Yang <[email protected]>
…upMetadataValue (apache#19504)

* Add MetadataHash field to ConsumerGroupMetadataValue,
ShareGroupMetadataValue, and StreamGroupMetadataValue.
* Add metadataHash field to
GroupCoordinatorRecordHelpers#newConsumerGroupEpochRecord,
GroupCoordinatorRecordHelpers#newShareGroupEpochRecord, and
StreamsCoordinatorRecordHelpers#newStreamsGroupEpochRecord.
* Add deprecated message to ConsumerGroupPartitionMetadataKey and
ConsumerGroupPartitionMetadataValue.
* ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue /
StreamGroupPartitionMetadataKey / StreamGroupPartitionMetadataValue will
be removed in next PR.

Reviewers: Lucas Brutschy <[email protected]>, David Jacot <[email protected]>

---------

Signed-off-by: PoAn Yang <[email protected]>
…hangs

The call to `backingStore.get()` (called by connector task threads through
`OffsetStorageReaderImpl.offsets()`) can block for long time waiting for data flush to complete
(`KafkaProducer.flush()`).

This change moves that call outside the synchronized clause that holds `offsetReadFutures`,
so that if `backingStore.get()` hangs then it does not keep `offsetReadFutures` locked.
The access to `closed` flag (`closed.get()`) is kept inside the synchronize clause to avoid race
condition with `close()`.

This is important because `OffsetStorageReaderImpl.close()` needs to lock `offsetReadFutures` as
well in order to cancel the futures.
Since the herder thread calls `OffsetStorageReaderImpl.close()` when attempting to stops a task,
before this change this was resulting in the herder thread hanging indefinetely waiting for
`backingStore.get()` to complete.
Copy link

coderabbitai bot commented Aug 11, 2025

Important

Review skipped

More than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review.

193 files out of 300 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-17871-avoid-blocking-herder-thread

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@arvi18
Copy link
Author

arvi18 commented Aug 11, 2025

/refacto-test

Copy link

refacto-test bot commented Aug 11, 2025

🤖 Refacto.AI: PR Summary & Reviewer Guide

🎯 1. The Goal (Why was this PR raised?)

This PR aims to fix an issue (KAFKA-17871) where the Connect herder thread could be blocked when producer flushing operations hang. The implementation involves modifying the offset storage reader to prevent thread blocking and updating various components across Kafka Connect, Raft, and client libraries.

🗺️ 2. The Changes (What was implemented?)

File Name Implementation Details Goal / Requirement Addressed
raft/src/main/java/org/apache/kafka/raft/LeaderState.java (Modified) Significant refactoring to support KRaft version upgrades, including adding methods for upgrading KRaft versions and handling voter information. Support KRaft version upgrades
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java (Modified) Added tests for KRaft version upgrades and voter information handling. Test KRaft version upgrade functionality
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (Modified) Refactored to use thread type checking instead of a ThreadLocal variable for determining state poisoning behavior. Avoid thread blocking
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (Modified) Updated tests to match the new TransactionManager implementation. Test thread handling changes
raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java (Modified) Updated to handle KRaft version upgrades and voter information. Support KRaft version upgrades
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java (Modified) Added functionality to track transformation metadata including versions. Improve transformation tracking
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java (Modified) Added support for KRaft version upgrades. Support KRaft version upgrades
metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java (Modified) Updated tests for KRaft version upgrade functionality. Test KRaft version upgrade functionality
docs/ops.html (Modified) Added documentation for new metrics related to Kafka Streams. Documentation updates
metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java (Added) New interface for accessing and upgrading KRaft versions. Support KRaft version upgrades
metadata/src/main/java/org/apache/kafka/controller/RaftClientKRaftVersionAccessor.java (Added) Implementation of KRaftVersionAccessor for Raft clients. Support KRaft version upgrades
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java (Modified) Added version information to connector state. Improve version tracking
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java (Added) New class to track metadata about task plugins including versions. Improve version tracking
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (Modified) Refactored utility methods and improved error handling. Code organization and error handling
core/src/main/scala/kafka/log/LogManager.scala (Modified) Removed duplicate code and moved functionality to Java implementation. Code organization
core/src/main/scala/kafka/server/KafkaApis.scala (Modified) Updated OffsetCommit handling to support topic IDs. Support topic IDs in offset commits
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (Modified) Added tests for header handling and two-phase commit. Test producer functionality
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java (Modified) Removed test for share session expiration. Test cleanup
core/src/test/scala/unit/kafka/log/LogManagerTest.scala (Modified) Removed duplicate code and updated to use Java implementation. Code organization
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (Modified) Added support for two-phase commit and improved header handling. Improve transaction handling
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java (Modified) Added metrics for transformations and predicates. Improve metrics
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java (Modified) Added support for plugin metadata and version tracking. Improve version tracking
clients/src/main/resources/common/message/OffsetCommitRequest.json (Modified) Added support for topic IDs. Support topic IDs in offset commits
clients/src/main/resources/common/message/OffsetCommitResponse.json (Modified) Added support for topic IDs. Support topic IDs in offset commits
clients/src/main/resources/common/message/ShareAcknowledgeRequest.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareAcknowledgeResponse.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareFetchRequest.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareFetchResponse.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareGroupDescribeRequest.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareGroupDescribeResponse.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json (Modified) Updated version information. API version updates
clients/src/main/resources/common/message/ReadShareGroupStateResponse.json (Modified) Updated field descriptions and added error code. API documentation improvements
clients/src/main/resources/common/message/WriteShareGroupStateRequest.json (Modified) Updated field descriptions and removed unstable flag. API documentation improvements
raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java (Added) New class to track request sending results. Improve request handling
clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java (Modified) Simplified builder constructor. API cleanup
clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java (Modified) Simplified builder constructor. API cleanup
clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateRequest.java (Modified) Simplified builder constructor. API cleanup
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java (Modified) Simplified builder constructor. API cleanup
clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java (Modified) Simplified builder constructor. API cleanup
clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java (Modified) Added tests for stream rebalance data. Test consumer functionality
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java (Modified) Updated tests for epoch records. Test coordinator functionality
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java (Modified) Moved utility methods from GroupMetadataManager. Code organization
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala (Modified) Updated tests for OffsetCommit with topic IDs. Test topic ID support
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java (Added) New tests for offset storage reader to verify it doesn't block threads. Test offset storage reader non-blocking behavior
raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java (Added) New class for coordinating KRaft version upgrades. Support KRaft version upgrades
raft/src/main/java/org/apache/kafka/raft/FollowerState.java (Modified) Added tracking for leader updates. Improve leader update tracking
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java (Modified) Updated parsing to use Readable interface. API modernization
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java (Modified) Added support for topic IDs. Support topic IDs in offset commits
core/src/main/java/kafka/server/share/DelayedShareFetch.java (Modified) Added support for remote storage fetches. Improve fetch handling
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java (Modified) Updated tests for OffsetCommit with topic IDs. Test topic ID support

🤔 3. Key Areas for Human Review

Area of Concern: Connect Offset Storage Reader Non-Blocking Behavior

  • File: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
  • Why: This is a new test file specifically addressing the core issue of the PR - preventing the herder thread from being blocked when producer flushing hangs.
  • Testing Instruction: Verify that the test cases properly simulate hanging scenarios and that the offset storage reader correctly handles these cases without blocking the herder thread.

Area of Concern: Transaction Manager Thread Handling

  • File: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (Lines 143-283)
  • Why: The code changes how thread identification is handled, replacing ThreadLocal with direct thread type checking, which is critical to the core issue of avoiding thread blocking.
  • Testing Instruction: Test transaction operations with concurrent threads to ensure the new thread identification logic correctly identifies Sender threads versus application threads.

Area of Concern: KRaft Version Upgrade Coordination

  • File: raft/src/main/java/org/apache/kafka/raft/LeaderState.java (Lines 680-712)
  • Why: This file contains complex logic for upgrading KRaft versions with multiple edge cases and error handling paths.
  • Testing Instruction: Test KRaft version upgrades with different configurations and verify that the upgrade process correctly handles validation, error cases, and successful upgrades.

Area of Concern: Remote Storage Fetch Handling

  • File: core/src/main/java/kafka/server/share/DelayedShareFetch.java (Lines 582-883)
  • Why: This file has significant new code for handling remote storage fetches with complex error handling and state management.
  • Testing Instruction: Test scenarios where remote storage fetches are in progress when other operations occur, and verify that error handling works correctly and doesn't block threads.

Human reviewer: Please focus particularly on the OffsetStorageReaderTest.java file as it directly addresses the core issue mentioned in the PR title about avoiding herder thread blocking.

Copy link

refacto-test bot commented Aug 11, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-test bot commented Aug 11, 2025

📌 Files Processed
  • raft/src/main/java/org/apache/kafka/raft/LeaderState.java
  • raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
  • metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
  • metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
  • docs/ops.html
  • metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
  • metadata/src/main/java/org/apache/kafka/controller/RaftClientKRaftVersionAccessor.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
  • clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
  • clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
  • core/src/main/java/kafka/server/share/DelayedShareFetch.java
  • raft/src/main/java/org/apache/kafka/raft/FollowerState.java
  • clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
  • clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
  • raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
  • core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  • raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
  • core/src/main/scala/kafka/log/LogManager.scala
  • core/src/main/scala/kafka/server/KafkaApis.scala
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
  • core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
  • clients/src/main/resources/common/message/OffsetCommitRequest.json
  • clients/src/main/resources/common/message/OffsetCommitResponse.json
  • clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
  • clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
  • clients/src/main/resources/common/message/ShareFetchRequest.json
  • clients/src/main/resources/common/message/ShareFetchResponse.json
  • clients/src/main/resources/common/message/ShareGroupDescribeRequest.json
  • clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
  • clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json
  • clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
  • clients/src/main/resources/common/message/ReadShareGroupStateResponse.json
  • clients/src/main/resources/common/message/WriteShareGroupStateRequest.json
  • raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateResponse.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
  • core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  • metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
  • metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
  • raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
  • clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
  • metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
  • raft/src/main/java/org/apache/kafka/raft/VoterSet.java
  • core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
  • core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  • core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
  • core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
  • raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
  • release/git.py
  • group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json
  • checkstyle/import-control-storage.xml
  • checkstyle/suppressions.xml
  • clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
  • clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java
  • core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
  • server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
  • server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
  • build.gradle
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
  • core/src/main/java/kafka/server/share/SharePartitionManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • raft/src/main/java/org/apache/kafka/raft/RaftClient.java
  • clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json
  • clients/src/main/resources/common/message/DeleteShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/DeleteShareGroupStateRequest.json
  • clients/src/main/resources/common/message/InitializeShareGroupStateRequest.json
  • clients/src/main/resources/common/message/ReadShareGroupStateRequest.json
  • clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
  • clients/src/main/resources/common/message/WriteShareGroupStateResponse.json
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
  • clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  • core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  • core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  • core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
  • core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
  • metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  • metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
  • metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
  • raft/src/main/java/org/apache/kafka/raft/QuorumState.java
  • server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProvider.java
  • .github/scripts/junit.py
  • release/release.py
  • group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json
  • group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
  • group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json
  • group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
  • clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
  • clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
  • core/src/main/java/kafka/server/share/ShareFetchUtils.java
  • core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
  • core/src/main/scala/kafka/raft/RaftManager.scala
  • core/src/main/scala/kafka/server/BrokerServer.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  • core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
  • core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
  • core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
  • core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
  • core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
  • metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java

Copy link

refacto-test bot commented Aug 11, 2025

No review comments were generated during this review iteration of the PR.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant enhancements and stabilizations across Kafka's client, server, and Connect components. My primary focus was on preventing a potential hang in the Connect herder thread during producer flushing, which greatly improves the system's reliability. I've also rolled out a new two-phase commit feature for producers, enabling more robust external transaction management. Furthermore, the ShareConsumer and Streams rebalance protocols have matured from 'early access' to 'preview features,' indicating their readiness for broader adoption and testing. This involved extensive client API refactoring for better buffer handling and internal consistency, along with improvements to KRaft versioning and upgrade mechanisms.

Highlights

  • Producer Transactional 2PC: I've introduced a new two-phase commit (2PC) feature for transactional producers. This allows transactions to remain prepared and not automatically aborted even if the producer restarts, which is critical for external transaction managers. This functionality is controlled by a new transaction.two.phase.commit.enable configuration.
  • ShareConsumer and Streams Protocol Stabilization: The ShareConsumer and Streams rebalance protocols, previously in early access, have been stabilized. Their documentation now reflects their status as 'preview features,' indicating they are ready for broader testing, though still not fully recommended for production environments.
  • Kafka Connect Herder Thread Hang Fix: I've implemented a fix to prevent the Kafka Connect herder thread from blocking indefinitely if a producer flushing operation hangs. This improves the overall stability and resilience of Kafka Connect deployments.
  • Client API Refactoring: I've refactored significant portions of the Kafka client's request and response handling to use Readable interfaces and ByteBufferAccessor. This modernizes the internal API for buffer management, leading to more efficient and flexible data processing.
  • Producer Interceptor Headers: Producer interceptors now receive Headers objects in their onAcknowledgement callbacks. This provides richer context for custom interceptor logic, allowing for more sophisticated monitoring and manipulation of record metadata.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant set of changes. The primary goal is to prevent the herder thread from blocking when a producer flush hangs, which is addressed by moving a potentially blocking call out of a synchronized block in OffsetStorageReaderImpl.

Beyond this core fix, the PR includes several major enhancements and refactorings:

  • Transactional Improvements: Adds support for two-phase commit (2PC) transactions, including a new initTransactions(boolean keepPreparedTxn) method and associated configuration.
  • API Refinements: Many request/response classes are updated to use a Readable interface instead of ByteBuffer for better abstraction. Several APIs are also updated to support topic IDs in addition to topic names.
  • Share Group Stabilization: The share group feature is moved from "early access" to "preview", with related API and documentation updates.
  • KRaft Versioning: Logic for upgrading the KRaft version is introduced, including new control records and state management in the leader.
  • Code Cleanup: Logic from the Scala LogManager object has been moved to a new Java LogManager class. There are also various code simplifications and renames for clarity.
  • Observability: New metrics have been added for Connect tasks and plugins, improving monitoring capabilities.

My review focuses on consistency and maintainability. I've suggested a couple of minor refactorings in AsyncKafkaConsumer to align with similar changes made elsewhere in the file. Overall, the changes are well-structured and improve the codebase.

@visz11
Copy link
Collaborator

visz11 commented Aug 22, 2025

/refacto-test

Copy link

refacto-test bot commented Aug 22, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

refacto-test bot commented Aug 22, 2025

Multi-Domain Review: KRaft Upgrade

👍 Well Done
KRaft Version Upgrade

Comprehensive implementation of KRaft version upgrade with backward compatibility.

Thread Safety Improvements

Replaced ThreadLocal with thread-aware state determination for better reliability.

📌 Files Processed
  • raft/src/main/java/org/apache/kafka/raft/LeaderState.java
  • raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  • raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
  • metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
  • metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
  • docs/ops.html
  • metadata/src/main/java/org/apache/kafka/controller/KRaftVersionAccessor.java
  • metadata/src/main/java/org/apache/kafka/controller/RaftClientKRaftVersionAccessor.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskPluginsMetadata.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenResponse.java
  • clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
  • clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
  • core/src/main/java/kafka/server/share/DelayedShareFetch.java
  • raft/src/main/java/org/apache/kafka/raft/FollowerState.java
  • clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
  • clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageReaderTest.java
  • raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
  • core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  • raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
  • core/src/main/scala/kafka/log/LogManager.scala
  • core/src/main/scala/kafka/server/KafkaApis.scala
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
  • core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
  • clients/src/main/resources/common/message/OffsetCommitRequest.json
  • clients/src/main/resources/common/message/OffsetCommitResponse.json
  • clients/src/main/resources/common/message/ShareAcknowledgeRequest.json
  • clients/src/main/resources/common/message/ShareAcknowledgeResponse.json
  • clients/src/main/resources/common/message/ShareFetchRequest.json
  • clients/src/main/resources/common/message/ShareFetchResponse.json
  • clients/src/main/resources/common/message/ShareGroupDescribeRequest.json
  • clients/src/main/resources/common/message/ShareGroupDescribeResponse.json
  • clients/src/main/resources/common/message/ShareGroupHeartbeatRequest.json
  • clients/src/main/resources/common/message/ShareGroupHeartbeatResponse.json
  • clients/src/main/resources/common/message/ReadShareGroupStateResponse.json
  • clients/src/main/resources/common/message/WriteShareGroupStateRequest.json
  • raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitializeShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateResponse.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
  • core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  • metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
  • metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
  • raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
  • clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
  • metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
  • raft/src/main/java/org/apache/kafka/raft/VoterSet.java
  • core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
  • raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/RestartPlanTest.java
  • core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  • core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
  • core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
  • raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
  • release/git.py
  • group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json
  • checkstyle/import-control-storage.xml
  • checkstyle/suppressions.xml
  • clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BrokerHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeClusterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ListPartitionReassignmentsResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatRequest.java
  • clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/VoteResponse.java
  • clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
  • clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java
  • core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
  • server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
  • server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
  • build.gradle
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
  • core/src/main/java/kafka/server/share/SharePartitionManager.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • raft/src/main/java/org/apache/kafka/raft/RaftClient.java
  • clients/src/main/resources/common/message/AlterShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/AlterShareGroupOffsetsResponse.json
  • clients/src/main/resources/common/message/DeleteShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
  • clients/src/main/resources/common/message/DeleteShareGroupStateRequest.json
  • clients/src/main/resources/common/message/InitializeShareGroupStateRequest.json
  • clients/src/main/resources/common/message/ReadShareGroupStateRequest.json
  • clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json
  • clients/src/main/resources/common/message/WriteShareGroupStateResponse.json
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
  • clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
  • clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  • core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  • core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  • core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
  • core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
  • metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  • metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
  • metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
  • raft/src/main/java/org/apache/kafka/raft/QuorumState.java
  • server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
  • clients/src/test/java/org/apache/kafka/common/utils/annotation/ApiKeyVersionsProvider.java
  • .github/scripts/junit.py
  • release/release.py
  • group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataKey.json
  • group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
  • group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json
  • group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
  • clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
  • clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
  • clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
  • clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
  • core/src/main/java/kafka/server/share/ShareFetchUtils.java
  • core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
  • core/src/main/scala/kafka/raft/RaftManager.scala
  • core/src/main/scala/kafka/server/BrokerServer.scala
  • core/src/main/scala/kafka/server/KafkaConfig.scala
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  • clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerDelegateCreator.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
  • connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  • core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
  • clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  • connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
  • core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
  • core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
  • core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
  • core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
  • core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  • core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
  • core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
  • metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
📝 Additional Comments
core/src/main/java/kafka/server/share/DelayedShareFetch.java (1)
Complex Method

The completeRemoteStorageShareFetchRequest method is overly complex with over 90 lines and multiple responsibilities. This makes it difficult to understand, test, and maintain, increasing the risk of bugs.

    private void completeRemoteStorageShareFetchRequest() {
        LinkedHashMap<TopicIdPartition, Long> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
        try {
            List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>();
            int readableBytes = processRemoteFetchResult(shareFetchPartitionData);
            
            // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then try for a local read
            if (readableBytes < shareFetch.fetchParams().maxBytes) {
                acquiredNonRemoteFetchTopicPartitionData = acquireAndProcessLocalPartitions(readableBytes, shareFetchPartitionData);
            }
            
            completeShareFetchRequest(shareFetchPartitionData, acquiredNonRemoteFetchTopicPartitionData);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Exception occurred in completing remote fetch {} for delayed share fetch request", remoteFetchOpt.get(), e);
            handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
        } catch (Exception e) {
            log.error("Unexpected error in processing delayed share fetch request", e);
            handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
        } finally {
            Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet());
            topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet());
            releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
        }
    }

Standards:

  • Single Responsibility Principle
  • Method Complexity
raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java (1)
Error Handling Improvement

The code returns REQUEST_TIMED_OUT error when in-memory voters are empty, but the comment indicates this is a timing issue with the kraft version. Using a timeout error for a state condition is misleading and could cause clients to retry unnecessarily.

                /* This can happen if the remote voter sends an update voter request before the
                 * updated kraft version has been written to the log
                 */
                log.info("Received update voter request before in-memory voters were initialized");
                return CompletableFuture.completedFuture(
                    RaftUtil.updateVoterResponse(
                        Errors.NOT_CONTROLLER,  // More appropriate for state not ready condition
                        requestListenerName,
                        new LeaderAndEpoch(
                            localId,
                            leaderState.epoch()
                        ),
                        leaderState.leaderEndpoints()
                    )
                );

Standards:

  • Error Code Semantics
  • Client-Server Communication
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
Inconsistent Naming

The method name 'shouldPoisonStateOnInvalidTransition' doesn't follow the standard boolean method naming convention (typically starting with 'is', 'has', or 'should'). This inconsistency makes the code less readable and maintainable.

    protected boolean shouldPoisonStateOnInvalidTransition() {
        return Thread.currentThread() instanceof Sender.SenderThread;
    }

Standards:

  • Naming Conventions
  • Code Readability
raft/src/main/java/org/apache/kafka/raft/LeaderState.java (2)
Redundant Computation

The code streams through all voter keys to check if any have empty directory IDs, which is inefficient for large voter sets. This operation could be cached or optimized to avoid repeated streaming operations.

        } else {
            boolean anyEmptyDirectoryId = inMemoryVoters
                .voters()
                .voterKeys()
                .stream()
                .anyMatch(voterKey -> voterKey.directoryId().isEmpty());
                
            if (anyEmptyDirectoryId) {

Standards:

  • Stream Operation Efficiency
  • Performance Optimization
Misleading Comment

The comment contains a typo ('kraft.verion' instead of 'kraft.version') and doesn't clearly explain the implementation constraint. This could lead to confusion when future developers add kraft.version 2 support.

        // Currently only upgrade to kraft.version 1 is supported; this implementation
        // needs to be updated when kraft.version 2 is added to handle multiple upgrade paths

Standards:

  • Code Documentation
  • Comment Clarity

int fetchTimeoutMs,
LogContext logContext,
KafkaRaftMetrics kafkaRaftMetrics
) {
this.localReplicaKey = localReplicaKey;
if (localVoterNode.voterKey().directoryId().isEmpty()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Null Check

The code assumes localVoterNode is non-null but doesn't validate this before accessing it. This could lead to NullPointerException if the constructor is called with a null localVoterNode parameter.

Suggested change
if (localVoterNode.voterKey().directoryId().isEmpty()) {
if (localVoterNode == null) {
throw new IllegalArgumentException("Local voter node cannot be null");
} else if (localVoterNode.voterKey().directoryId().isEmpty()) {
Standards
  • Defensive Programming
  • Null Safety


/**
* Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt.
* @param topicPartitionRemoteFetchInfo - The remote storage fetch information.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remote Fetch Cleanup

The remoteFetchTask is created but not properly cleaned up if the request is cancelled. This could lead to resource leaks if the task completes after cancellation.

Suggested change
* @param topicPartitionRemoteFetchInfo - The remote storage fetch information.
private void cancelRemoteFetchTask() {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled && !remoteFetchOpt.get().remoteFetchTask().isDone()) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
}
}
Standards
  • Resource Management
  • Error Handling

Comment on lines +808 to +810
} catch (InterruptedException | ExecutionException e) {
log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect Exception Handling

InterruptedException should restore the interrupted status of the thread rather than being swallowed. This violates the interruption contract and could lead to unresponsive threads.

Suggested change
} catch (InterruptedException | ExecutionException e) {
log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (InterruptedException e) {
log.error("InterruptedException occurred in completing remote fetch {} for delayed share fetch request", remoteFetchOpt.get(), e);
Thread.currentThread().interrupt(); // Restore interrupted status
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
} catch (ExecutionException e) {
log.error("ExecutionException occurred in completing remote fetch {} for delayed share fetch request", remoteFetchOpt.get(), e);
handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e);
Standards
  • Thread Interruption Contract
  • Exception Handling

Comment on lines +244 to +248
if (topicId == null) {
throw new IllegalArgumentException("TopicId cannot be null.");
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Code

The throwIfTopicNameIsNull and throwIfTopicIdIsNull methods are nearly identical and violate the DRY principle. This creates maintenance overhead and increases the chance of inconsistencies.

Suggested change
if (topicId == null) {
throw new IllegalArgumentException("TopicId cannot be null.");
}
}
}
private static <T> void throwIfNull(T value, String name) {
if (value == null) {
throw new IllegalArgumentException(name + " cannot be null.");
}
}
private void throwIfTopicIdIsNull(Uuid topicId) {
throwIfNull(topicId, "TopicId");
}
private void throwIfTopicNameIsNull(String topicName) {
throwIfNull(topicName, "TopicName");
}
Standards
  • DRY Principle
  • Code Reuse

Comment on lines +516 to +523
if (pendingVersion.isPresent()) {
if (pendingVersion.get().kraftVersion().equals(newVersion)) {
// The version match; upgrade is a noop
return false;
} else {
throw new InvalidUpdateVersionException(
String.format(
"Invalid concurrent upgrade of %s from version %s to %s",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient Error Handling

The code logs the error and then throws an exception with similar information, causing redundant string formatting operations. This is inefficient and creates duplicate error information.

Suggested change
if (pendingVersion.isPresent()) {
if (pendingVersion.get().kraftVersion().equals(newVersion)) {
// The version match; upgrade is a noop
return false;
} else {
throw new InvalidUpdateVersionException(
String.format(
"Invalid concurrent upgrade of %s from version %s to %s",
String errorMsg = String.format(
"Invalid upgrade of %s to %s because not all of the voters support it",
KRaftVersion.FEATURE_NAME,
newVersion
);
log.info("{}: {}", errorMsg, inMemoryVoters.voters());
throw new InvalidUpdateVersionException(errorMsg);
Standards
  • Efficient Error Handling
  • Performance Optimization

Comment on lines +290 to +291
return ApiError.fromThrowable(e);
} catch (IllegalArgumentException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked Exception Exposure

The code directly exposes exception message details to the client, which could potentially leak sensitive information. Exception messages might contain implementation details that shouldn't be exposed externally.

Suggested change
return ApiError.fromThrowable(e);
} catch (IllegalArgumentException e) {
} catch (IllegalArgumentException e) {
log.debug("Invalid kraft version update request: {}", e.getMessage());
return invalidUpdateVersion(featureName, newVersion,
"Cannot update to the specified version due to validation failure");
}
Standards
  • CWE-209
  • Exception Information Exposure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.