From 934cbed5aab11a6f7a73427f3d0985c8b727eb87 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 22 Apr 2025 16:38:49 +0100 Subject: [PATCH 1/4] KAFKA-18889: Make records in ShareFetchResponse non-nullable --- .../org/apache/kafka/common/requests/ShareFetchResponse.java | 2 +- core/src/main/java/kafka/server/share/ShareFetchUtils.java | 5 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 2 ++ .../test/java/kafka/server/share/ShareFetchUtilsTest.java | 4 ++-- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../java/org/apache/kafka/server/share/fetch/ShareFetch.java | 4 +++- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java index 2bab79ead9bc0..50a768ebecb89 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java @@ -209,7 +209,7 @@ public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPart return partitionResponse(topicIdPartition.topicPartition().partition(), error); } - public static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) { + private static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) { return new ShareFetchResponseData.PartitionData() .setPartitionIndex(partition) .setErrorCode(error.code()) diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index 3cfab25e6845a..603ae8e048b23 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.coordinator.group.GroupConfigManager; @@ -83,7 +84,7 @@ static Map processFetchR if (fetchPartitionData.error.code() != Errors.NONE.code()) { partitionData - .setRecords(null) + .setRecords(MemoryRecords.EMPTY) .setErrorCode(fetchPartitionData.error.code()) .setErrorMessage(fetchPartitionData.error.message()) .setAcquiredRecords(List.of()); @@ -123,7 +124,7 @@ static Map processFetchR // if we want parallel requests for the same share partition or not. if (shareAcquiredRecords.acquiredRecords().isEmpty()) { partitionData - .setRecords(null) + .setRecords(MemoryRecords.EMPTY) .setAcquiredRecords(List.of()); } else { partitionData diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6a22963ac7d6a..22d783dbcaec2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3116,6 +3116,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setPartitionIndex(partitionIndex) .setErrorCode(Errors.NONE.code) .setAcknowledgeErrorCode(value) + .setRecords(MemoryRecords.EMPTY) topic.partitions.add(fetchPartitionData) } topicPartitionAcknowledgements.remove(topicId) @@ -3131,6 +3132,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setPartitionIndex(partitionIndex) .setErrorCode(Errors.NONE.code) .setAcknowledgeErrorCode(value) + .setRecords(MemoryRecords.EMPTY) topicData.partitions.add(fetchPartitionData) } shareFetchResponse.data.responses.add(topicData) diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index d5acaef2060b8..7798f96481ef6 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -329,7 +329,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() { assertEquals(1, resultData.size()); assertTrue(resultData.containsKey(tp0)); assertEquals(0, resultData.get(tp0).partitionIndex()); - assertNull(resultData.get(tp0).records()); + assertEquals(MemoryRecords.EMPTY, resultData.get(tp0).records()); assertTrue(resultData.get(tp0).acquiredRecords().isEmpty()); assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); @@ -344,7 +344,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() { assertEquals(1, resultData.size()); assertTrue(resultData.containsKey(tp0)); assertEquals(0, resultData.get(tp0).partitionIndex()); - assertNull(resultData.get(tp0).records()); + assertEquals(MemoryRecords.EMPTY, resultData.get(tp0).records()); assertTrue(resultData.get(tp0).acquiredRecords().isEmpty()); assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 02541097d4c32..d78edcc62afd7 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -6199,7 +6199,7 @@ class KafkaApisTest extends Logging { assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode) - assertNull(topicResponses.get(0).partitions.get(0).records) + assertEquals(MemoryRecords.EMPTY, topicResponses.get(0).partitions.get(0).records) assertEquals(0, topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().length) } diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java index 8406f9efa91c3..fd5e917569aa7 100644 --- a/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -217,7 +218,8 @@ private synchronized void addErroneousToResponse(Map { brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().mark(); From f807b0556328787497ae7507678801a2df765940 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Tue, 22 Apr 2025 17:21:55 +0100 Subject: [PATCH 2/4] Fixing checkstyle error --- core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 7798f96481ef6..3bec497d7a1f2 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -61,7 +61,6 @@ import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; From 48cb4ef2933baafc3d9c388e7c0acb165105bc9f Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 23 Apr 2025 12:51:24 +0100 Subject: [PATCH 3/4] Making records non-nullable --- .../src/main/resources/common/message/ShareFetchResponse.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json b/clients/src/main/resources/common/message/ShareFetchResponse.json index 697e932cf8580..2014692277488 100644 --- a/clients/src/main/resources/common/message/ShareFetchResponse.json +++ b/clients/src/main/resources/common/message/ShareFetchResponse.json @@ -67,7 +67,7 @@ { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]}, - { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data." }, + { "name": "Records", "type": "records", "versions": "0+", "about": "The record data." }, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records." }, From 476d2b9028262b02fed1dcb22e2d501d88323f04 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Thu, 24 Apr 2025 13:47:08 +0100 Subject: [PATCH 4/4] Making nullable to version 0 --- .../src/main/resources/common/message/ShareFetchResponse.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/resources/common/message/ShareFetchResponse.json b/clients/src/main/resources/common/message/ShareFetchResponse.json index 2014692277488..28cc13ee939bf 100644 --- a/clients/src/main/resources/common/message/ShareFetchResponse.json +++ b/clients/src/main/resources/common/message/ShareFetchResponse.json @@ -67,7 +67,7 @@ { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]}, - { "name": "Records", "type": "records", "versions": "0+", "about": "The record data." }, + { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0", "about": "The record data." }, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records." },