Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPart
return partitionResponse(topicIdPartition.topicPartition().partition(), error);
}

public static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
private static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
return new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(error.code())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The latest known leader epoch." }
]},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data." },
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0", "about": "The record data." },
{ "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records." },
{ "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records." },
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.coordinator.group.GroupConfigManager;
Expand Down Expand Up @@ -83,7 +84,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR

if (fetchPartitionData.error.code() != Errors.NONE.code()) {
partitionData
.setRecords(null)
.setRecords(MemoryRecords.EMPTY)
.setErrorCode(fetchPartitionData.error.code())
.setErrorMessage(fetchPartitionData.error.message())
.setAcquiredRecords(List.of());
Expand Down Expand Up @@ -123,7 +124,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
// if we want parallel requests for the same share partition or not.
if (shareAcquiredRecords.acquiredRecords().isEmpty()) {
partitionData
.setRecords(null)
.setRecords(MemoryRecords.EMPTY)
.setAcquiredRecords(List.of());
} else {
partitionData
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3116,6 +3116,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.NONE.code)
.setAcknowledgeErrorCode(value)
.setRecords(MemoryRecords.EMPTY)
topic.partitions.add(fetchPartitionData)
}
topicPartitionAcknowledgements.remove(topicId)
Expand All @@ -3131,6 +3132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.NONE.code)
.setAcknowledgeErrorCode(value)
.setRecords(MemoryRecords.EMPTY)
topicData.partitions.add(fetchPartitionData)
}
shareFetchResponse.data.responses.add(topicData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
Expand Down Expand Up @@ -329,7 +328,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() {
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
assertEquals(0, resultData.get(tp0).partitionIndex());
assertNull(resultData.get(tp0).records());
assertEquals(MemoryRecords.EMPTY, resultData.get(tp0).records());
assertTrue(resultData.get(tp0).acquiredRecords().isEmpty());
assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode());

Expand All @@ -344,7 +343,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() {
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
assertEquals(0, resultData.get(tp0).partitionIndex());
assertNull(resultData.get(tp0).records());
assertEquals(MemoryRecords.EMPTY, resultData.get(tp0).records());
assertTrue(resultData.get(tp0).acquiredRecords().isEmpty());
assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode());

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6199,7 +6199,7 @@ class KafkaApisTest extends Logging {
assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex)
assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode)
assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode)
assertNull(topicResponses.get(0).partitions.get(0).records)
assertEquals(MemoryRecords.EMPTY, topicResponses.get(0).partitions.get(0).records)
assertEquals(0, topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().length)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

Expand Down Expand Up @@ -217,7 +218,8 @@ private synchronized void addErroneousToResponse(Map<TopicIdPartition, Partition
response.put(topicIdPartition, new PartitionData()
.setPartitionIndex(topicIdPartition.partition())
.setErrorCode(Errors.forException(throwable).code())
.setErrorMessage(throwable.getMessage()));
.setErrorMessage(throwable.getMessage())
.setRecords(MemoryRecords.EMPTY));
});
erroneousTopics.forEach(topic -> {
brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().mark();
Expand Down