Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ public enum AckMode {
/**
* Call {@link Consumer#commitAsync()} immediately for pending acks.
*/
MANUAL_IMMEDIATE
MANUAL_IMMEDIATE,

/**
* Call {@link Consumer#commitSync()} immediately for pending acks.
*/
MANUAL_IMMEDIATE_SYNC

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ private class ListenerConsumer implements SchedulingAwareRunnable {

private final boolean autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();

private final AckMode ackMode = getAckMode();

private Thread consumerThread;

private volatile Collection<TopicPartition> definedPartitions;
Expand All @@ -222,9 +224,11 @@ private class ListenerConsumer implements SchedulingAwareRunnable {

ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener,
long recentOffset) {
Assert.state(!(getAckMode().equals(AckMode.MANUAL) || getAckMode().equals(AckMode.MANUAL_IMMEDIATE))
Assert.state(!(this.ackMode.equals(AckMode.MANUAL)
|| this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)
|| this.ackMode.equals(AckMode.MANUAL_IMMEDIATE_SYNC))
|| !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + getAckMode());
"Consumer cannot be configured for auto commit for ackMode " + this.ackMode);
Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {

Expand Down Expand Up @@ -272,7 +276,6 @@ public void run() {
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
}
final AckMode ackMode = getAckMode();
while (isRunning()) {
try {
if (this.logger.isTraceEnabled()) {
Expand All @@ -287,14 +290,14 @@ public void run() {
while (iterator.hasNext()) {
final ConsumerRecord<K, V> record = iterator.next();
invokeListener(record);
if (!this.autoCommit && ackMode.equals(AckMode.RECORD)) {
if (!this.autoCommit && this.ackMode.equals(AckMode.RECORD)) {
this.consumer.commitAsync(
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)), this.callback);
}
}
if (!this.autoCommit) {
processCommits(ackMode, records);
processCommits(this.ackMode, records);
}
}
else {
Expand Down Expand Up @@ -337,28 +340,39 @@ private void invokeListener(final ConsumerRecord<K, V> record) {

@Override
public void acknowledge() {
if (getAckMode().equals(AckMode.MANUAL)) {
if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL)) {
updateManualOffset(record);
}
else if (getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
if (Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) {
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Committing: " + commits);
}
ListenerConsumer.this.consumer.commitAsync(commits, ListenerConsumer.this.callback);
else if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)
|| ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE_SYNC)) {
ackImmediate(record);
}
else {
throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE "
+ "for manual acks");
}
}

private void ackImmediate(final ConsumerRecord<K, V> record) {
if (Thread.currentThread().equals(ListenerConsumer.this.consumerThread)) {
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (ListenerConsumer.this.logger.isDebugEnabled()) {
ListenerConsumer.this.logger.debug("Committing: " + commits);
}
if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)) {
ListenerConsumer.this.consumer.commitAsync(commits,
ListenerConsumer.this.callback);
}
else {
throw new IllegalStateException(
"With MANUAL_IMMEDIATE ack mode, acknowledge() must be invoked on the "
+ "consumer thread");
ListenerConsumer.this.consumer.commitSync(commits);
}
}
else {
throw new IllegalStateException("AckMode must be MANUAL or MANUAL_IMMEDIATE "
+ "for manual acks");
throw new IllegalStateException(
"With " + ListenerConsumer.this.ackMode.name()
+ " ack mode, acknowledge() must be invoked on the consumer thread");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ public class ConcurrentMessageListenerContainerTests {

private static String topic7 = "testTopic7";

private static String topic8 = "testTopic8";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
topic6, topic7);
topic6, topic7, topic8);

@Test
public void testAutoCommit() throws Exception {
Expand Down Expand Up @@ -357,6 +359,48 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
logger.info("Stop MANUAL_IMMEDIATE with Existing");
}

@Test
public void testManualCommitSyncExisting() throws Exception {
logger.info("Start MANUAL_IMMEDIATE_SYNC with Existing");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic8);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.flush();
Map<String, Object> props = KafkaTestUtils.consumerProps("testManualExistingSync", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, topic8);
final CountDownLatch latch = new CountDownLatch(8);
container.setMessageListener(new AcknowledgingMessageListener<Integer, String>() {

@Override
public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ack) {
logger.info("manualExisting: " + message);
ack.acknowledge();
latch.countDown();
}

});
container.setConcurrency(1);
container.setAckMode(AckMode.MANUAL_IMMEDIATE_SYNC);
container.setBeanName("testManualExisting");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
template.send(0, "fooo");
template.send(2, "barr");
template.send(0, "bazz");
template.send(2, "quxx");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
logger.info("Stop MANUAL_IMMEDIATE_SYNC with Existing");
}

@SuppressWarnings("unchecked")
@Test
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,11 @@ records have been received since the last commit.
after which, the same semantics as `COUNT_TIME` are applied.
- MANUAL_IMMEDIATE - call `commitAsync()`` immediately when the `Acknowledgment.acknowledge()` method is called by the
listener - must be executed on the container's thread.
- MANUAL_IMMEDIATE_SYNC - call `commitSync()`` immediately when the `Acknowledgment.acknowledge()` method is called by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo with one more extra `.
Will fix on merge.

the listener - must be executed on the container's thread.

NOTE: `MANUAL` and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener`.
NOTE: `MANUAL`, `MANUAL_IMMEDIATE`, and `MANUAL_IMMEDIATE_SYNC` require the listener to be an
`AcknowledgingMessageListener`.

[source, java]
----
Expand Down
22 changes: 9 additions & 13 deletions src/reference/asciidoc/quick-tour.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public void testAutoCommit() throws Exception {
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.convertAndSend(0, "foo");
template.convertAndSend(2, "bar");
template.convertAndSend(0, "baz");
template.convertAndSend(2, "qux");
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
Expand Down Expand Up @@ -94,10 +94,8 @@ private Map<String, Object> consumerProps() {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

Expand All @@ -108,10 +106,8 @@ private Map<String, Object> senderProps() {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
----
Expand All @@ -131,7 +127,7 @@ private KafkaTemplate<Integer, String> template;
@Test
public void testSimple() throws Exception {
waitListening("foo");
template.convertAndSend("annotated1", 0, "foo");
template.send("annotated1", 0, "foo");
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}

Expand Down
10 changes: 5 additions & 5 deletions src/reference/asciidoc/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ public class KafkaTemplateTests {
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.syncConvertAndSend("foo");
template.send("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.syncConvertAndSend(0, 2, "bar");
template.send(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
Expand All @@ -207,12 +207,12 @@ The above uses the hamcrest matchers; with `AssertJ`, the final part looks like
----
...
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.syncConvertAndSend(0, 2, "bar");
template.send(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("bar"));
template.syncConvertAndSend(TEMPLATE_TOPIC, 0, 2, "baz");
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
Expand Down