Skip to content
Merged
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
files="AbstractResponse.java"/>

<suppress checks="MethodLength"
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>

<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -25,6 +26,10 @@
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;

/**
* A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky)
Expand All @@ -43,6 +48,13 @@
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {

// these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
private static final String GENERATION_KEY_NAME = "generation";
private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
new Field(GENERATION_KEY_NAME, Type.INT32));

private int generation = DEFAULT_GENERATION; // consumer group generation

@Override
public String name() {
return "cooperative-sticky";
Expand All @@ -53,9 +65,37 @@ public List<RebalanceProtocol> supportedProtocols() {
return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
this.generation = metadata.generationId();
}

@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);

struct.set(GENERATION_KEY_NAME, generation);
ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
buffer.flip();
return buffer;
}

@Override
protected MemberData memberData(Subscription subscription) {
return new MemberData(subscription.ownedPartitions(), Optional.empty());
ByteBuffer buffer = subscription.userData();
Optional<Integer> encodedGeneration;
if (buffer == null) {
encodedGeneration = Optional.empty();
} else {
try {
Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME));
} catch (Exception e) {
encodedGeneration = Optional.of(DEFAULT_GENERATION);
}
}
return new MemberData(subscription.ownedPartitions(), encodedGeneration);
}

@Override
Expand Down
Loading