Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,25 @@ public static <IN, OUT> OUT handleOperationException(
) {
ApiError apiError = ApiError.fromThrowable(exception);

switch (apiError.error()) {
case UNKNOWN_SERVER_ERROR:
return switch (apiError.error()) {
case UNKNOWN_SERVER_ERROR -> {
log.error("Operation {} with {} hit an unexpected exception: {}.",
operationName, operationInput, exception.getMessage(), exception);
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);

case NETWORK_EXCEPTION:
operationName, operationInput, exception.getMessage(), exception);
yield handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
}
case NETWORK_EXCEPTION ->
// When committing offsets transactionally, we now verify the transaction with the
// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a
// retriable error which older clients may not expect and retry correctly. We
// translate the error to `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients
// to retry the request without an unnecessary coordinator lookup.
return handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);

case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
return handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);

case NOT_LEADER_OR_FOLLOWER:
case KAFKA_STORAGE_ERROR:
return handler.apply(Errors.NOT_COORDINATOR, null);

case MESSAGE_TOO_LARGE:
case RECORD_LIST_TOO_LARGE:
case INVALID_FETCH_SIZE:
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);

default:
return handler.apply(apiError.error(), apiError.message());
}
handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
case NOT_LEADER_OR_FOLLOWER, KAFKA_STORAGE_ERROR -> handler.apply(Errors.NOT_COORDINATOR, null);
case MESSAGE_TOO_LARGE, RECORD_LIST_TOO_LARGE, INVALID_FETCH_SIZE ->
handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
default -> handler.apply(apiError.error(), apiError.message());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ CoordinatorResult<Void, T> execute() {
}
}

public static class ExecutorResult<T> {
public final String key;
public final CoordinatorResult<Void, T> result;

public record ExecutorResult<T>(String key, CoordinatorResult<Void, T> result) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public ExecutorResult(
String key,
CoordinatorResult<Void, T> result
Expand All @@ -61,24 +58,6 @@ public ExecutorResult(
this.result = Objects.requireNonNull(result);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExecutorResult<?> that = (ExecutorResult<?>) o;

if (!Objects.equals(key, that.key)) return false;
return Objects.equals(result, that.result);
}

@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + this.result.hashCode();
return result;
}

@Override
public String toString() {
return "ExecutorResult(" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* A simple Coordinator implementation that stores the records into a set.
*/
public class MockCoordinatorShard implements CoordinatorShard<String> {
static record RecordAndMetadata(
record RecordAndMetadata(
long offset,
long producerId,
short producerEpoch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;

Expand All @@ -36,54 +35,13 @@ public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> {
/**
* Represents a scheduled timeout.
*/
public static class ScheduledTimeout<T, U> {
public final String key;
public final long deadlineMs;
public final TimeoutOperation<T, U> operation;

public ScheduledTimeout(
String key,
long deadlineMs,
TimeoutOperation<T, U> operation
) {
this.key = key;
this.deadlineMs = deadlineMs;
this.operation = operation;
}
public record ScheduledTimeout<T, U>(String key, long deadlineMs, TimeoutOperation<T, U> operation) {
}

/**
* Represents an expired timeout.
*/
public static class ExpiredTimeout<T, U> {
public final String key;
public final CoordinatorResult<T, U> result;

public ExpiredTimeout(
String key,
CoordinatorResult<T, U> result
) {
this.key = key;
this.result = result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ExpiredTimeout<?, ?> that = (ExpiredTimeout<?, ?>) o;

if (!Objects.equals(key, that.key)) return false;
return Objects.equals(result, that.result);
}

@Override
public int hashCode() {
int result1 = key != null ? key.hashCode() : 0;
result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
return result1;
}
public record ExpiredTimeout<T, U>(String key, CoordinatorResult<T, U> result) {
}

private final Time time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ protected List<ConsumerGroupPartitionAssignor> consumerGroupAssignors(
}
} else if (object instanceof Class<?> klass) {
Object o = Utils.newInstance((Class<?>) klass);
if (!ConsumerGroupPartitionAssignor.class.isInstance(o)) {
if (!(o instanceof ConsumerGroupPartitionAssignor)) {
throw new KafkaException(klass + " is not an instance of " + ConsumerGroupPartitionAssignor.class.getName());
}
assignor = (ConsumerGroupPartitionAssignor) o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionRecord(
String regex,
ResolvedRegularExpression resolvedRegularExpression
) {
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics);
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics());
Collections.sort(topics);

return CoordinatorRecord.record(
Expand All @@ -337,8 +337,8 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionRecord(
new ApiMessageAndVersion(
new ConsumerGroupRegularExpressionValue()
.setTopics(topics)
.setVersion(resolvedRegularExpression.version)
.setTimestamp(resolvedRegularExpression.timestamp),
.setVersion(resolvedRegularExpression.version())
.setTimestamp(resolvedRegularExpression.timestamp()),
(short) 0
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffs
"share-group-offsets-alter",
request,
exception,
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
AlterShareGroupOffsetsRequest::getErrorResponseData,
log
));
}
Expand Down Expand Up @@ -1891,7 +1891,7 @@ public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
"initiate-delete-share-group-offsets",
groupId,
exception,
(error, message) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
DeleteShareGroupOffsetsRequest::getErrorDeleteResponseData,
log
));
}
Expand Down Expand Up @@ -2332,27 +2332,23 @@ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchExcept
) {
ApiError apiError = ApiError.fromThrowable(exception);

switch (apiError.error()) {
case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
// additionally triggers coordinator re-lookup, which is necessary if the client is
// talking to a zombie coordinator.
//
// While handleOperationException does remap UNKNOWN_TOPIC_OR_PARTITION,
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
// 3.9.
return OffsetFetchResponse.groupError(
request,
Errors.NOT_COORDINATOR,
context.requestVersion()
);

default:
return handleOperationException(
return switch (apiError.error()) {
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
// additionally triggers coordinator re-lookup, which is necessary if the client is
// talking to a zombie coordinator.
//
// While handleOperationException does remap UNKNOWN_TOPIC_OR_PARTITION,
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
// 3.9.
OffsetFetchResponse.groupError(
request,
Errors.NOT_COORDINATOR,
context.requestVersion()
);
default -> handleOperationException(
operationName,
request,
exception,
Expand All @@ -2362,8 +2358,8 @@ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchExcept
context.requestVersion()
),
log
);
}
);
};
}

private static void requireNonNull(Object obj, String msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3347,14 +3347,14 @@ private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResul
.resolvedRegularExpression(regex)
.orElse(ResolvedRegularExpression.EMPTY);

if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) {
if (!oldResolvedRegularExpression.topics().equals(newResolvedRegularExpression.topics())) {
bumpGroupEpoch = true;

oldResolvedRegularExpression.topics.forEach(topicName ->
oldResolvedRegularExpression.topics().forEach(topicName ->
subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount)
);

newResolvedRegularExpression.topics.forEach(topicName ->
newResolvedRegularExpression.topics().forEach(topicName ->
subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@

import java.util.function.Function;

public class OffsetExpirationConditionImpl implements OffsetExpirationCondition {

/**
* Given an offset and metadata, obtain the base timestamp that should be used
* as the start of the offsets retention period.
*/
private final Function<OffsetAndMetadata, Long> baseTimestamp;

public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimestamp) {
this.baseTimestamp = baseTimestamp;
}
/**
* @param baseTimestamp Given an offset and metadata, obtain the base timestamp that should be used
* as the start of the offsets retention period.
*/
public record OffsetExpirationConditionImpl(
Function<OffsetAndMetadata, Long> baseTimestamp) implements OffsetExpirationCondition {

/**
* Determine whether an offset is expired. Older versions have an expire timestamp per partition. If this
Expand All @@ -39,7 +34,6 @@ public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimes
* @param offset The offset and metadata.
* @param currentTimestampMs The current timestamp.
* @param offsetsRetentionMs The offsets retention in milliseconds.
*
* @return Whether the given offset is expired or not.
*/
@Override
Expand All @@ -52,11 +46,4 @@ public boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs
return currentTimestampMs - baseTimestamp.apply(offset) >= offsetsRetentionMs;
}
}

/**
* @return The base timestamp.
*/
public Function<OffsetAndMetadata, Long> baseTimestamp() {
return this.baseTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public OffsetMetadataManager build() {

/**
* Tracks open transactions (producer ids) by group id, topic name and partition id.
* It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}.
* It is the responsibility of the caller to update {@link #pendingTransactionalOffsets}.
*/
private class OpenTransactions {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,12 @@ public static ShareGroupAutoOffsetResetStrategy fromString(String offsetStrategy
AutoOffsetResetStrategy baseStrategy = AutoOffsetResetStrategy.fromString(offsetStrategy);
AutoOffsetResetStrategy.StrategyType baseType = baseStrategy.type();

StrategyType shareGroupType;
switch (baseType) {
case EARLIEST:
shareGroupType = StrategyType.EARLIEST;
break;
case LATEST:
shareGroupType = StrategyType.LATEST;
break;
case BY_DURATION:
shareGroupType = StrategyType.BY_DURATION;
break;
default:
throw new IllegalArgumentException("Unsupported strategy for ShareGroup: " + baseType);
}
StrategyType shareGroupType = switch (baseType) {
case EARLIEST -> StrategyType.EARLIEST;
case LATEST -> StrategyType.LATEST;
case BY_DURATION -> StrategyType.BY_DURATION;
default -> throw new IllegalArgumentException("Unsupported strategy for ShareGroup: " + baseType);
};

return new ShareGroupAutoOffsetResetStrategy(baseStrategy, shareGroupType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ private int balanceTopic(
// First, choose a member from the most loaded range to reassign a partition from.

// Loop until we find a member that has partitions to give up.
int mostLoadedMemberIndex = -1;
int mostLoadedMemberIndex;
while (true) {
mostLoadedMemberIndex = memberAssignmentBalancer.nextMostLoadedMember();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,6 @@ private void assignRemainingPartitions() {
}
}

private static class MemberWithRemainingQuota {
final String memberId;
final int remainingQuota;

MemberWithRemainingQuota(
String memberId,
int remainingQuota
) {
this.memberId = memberId;
this.remainingQuota = remainingQuota;
}
private record MemberWithRemainingQuota(String memberId, int remainingQuota) {
}
}
Loading