Skip to content

Commit 7d1e540

Browse files
garyrussellartembilan
authored andcommitted
GH-992: Fix SeekToCurrent zero retries
Fixes #992 The `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor` always retried at least one time, even if `maxAttempts` was 1. **cherry-pick to 2.2.x** * Fix infinite retries * Fix test class name and new Sonar issue.
1 parent 6ac6d24 commit 7d1e540

File tree

3 files changed

+85
-10
lines changed

3 files changed

+85
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.time.temporal.ValueRange;
1920
import java.util.function.BiConsumer;
2021

2122
import org.apache.commons.logging.Log;
@@ -38,6 +39,8 @@ class FailedRecordTracker {
3839

3940
private final int maxFailures;
4041

42+
private final boolean noRetries;
43+
4144
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures, Log logger) {
4245
if (recoverer == null) {
4346
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t);
@@ -46,24 +49,34 @@ class FailedRecordTracker {
4649
this.recoverer = recoverer;
4750
}
4851
this.maxFailures = maxFailures;
52+
this.noRetries = ValueRange.of(0, 1).isValidIntValue(maxFailures);
4953
}
5054

5155
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
56+
if (this.noRetries) {
57+
this.recoverer.accept(record, exception);
58+
return true;
59+
}
5260
FailedRecord failedRecord = this.failures.get();
53-
if (failedRecord == null || !failedRecord.getTopic().equals(record.topic())
54-
|| failedRecord.getPartition() != record.partition() || failedRecord.getOffset() != record.offset()) {
61+
if (this.maxFailures > 0 && (failedRecord == null || newFailure(record, failedRecord))) {
5562
this.failures.set(new FailedRecord(record.topic(), record.partition(), record.offset()));
5663
return false;
5764
}
58-
else {
59-
if (this.maxFailures >= 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
65+
else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
6066
this.recoverer.accept(record, exception);
6167
return true;
62-
}
68+
}
69+
else {
6370
return false;
6471
}
6572
}
6673

74+
private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
75+
return !failedRecord.getTopic().equals(record.topic())
76+
|| failedRecord.getPartition() != record.partition()
77+
|| failedRecord.getOffset() != record.offset();
78+
}
79+
6780
void clearThreadState() {
6881
this.failures.remove();
6982
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
111111
extends AbstractMessageListenerContainer<K, V> {
112112

113+
private static final String UNUSED = "unused";
114+
113115
private static final int DEFAULT_ACK_TIME = 5000;
114116

115117
private final AbstractMessageListenerContainer<K, V> container;
@@ -755,7 +757,7 @@ public void run() {
755757
try {
756758
pollAndInvoke();
757759
}
758-
catch (@SuppressWarnings("unused") WakeupException e) {
760+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
759761
// Ignore, we're stopping
760762
}
761763
catch (NoOffsetForPartitionException nofpe) {
@@ -874,7 +876,7 @@ public void wrapUp() {
874876
try {
875877
this.consumer.unsubscribe();
876878
}
877-
catch (@SuppressWarnings("unused") WakeupException e) {
879+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
878880
// No-op. Continue process
879881
}
880882
}
@@ -960,7 +962,7 @@ private void processAck(ConsumerRecord<K, V> record) {
960962
try {
961963
ackImmediate(record);
962964
}
963-
catch (@SuppressWarnings("unused") WakeupException e) {
965+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
964966
// ignore - not polling
965967
}
966968
}
@@ -1100,7 +1102,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
11001102
throw er;
11011103
}
11021104
}
1103-
catch (@SuppressWarnings("unused") InterruptedException e) {
1105+
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
11041106
Thread.currentThread().interrupt();
11051107
}
11061108
return null;
@@ -1583,7 +1585,7 @@ private void commitIfNecessary() {
15831585
this.consumer.commitAsync(commits, this.commitCallback);
15841586
}
15851587
}
1586-
catch (@SuppressWarnings("unused") WakeupException e) {
1588+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
15871589
// ignore - not polling
15881590
this.logger.debug("Woken up during commit");
15891591
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.junit.jupiter.api.Test;
27+
28+
/**
29+
* @author Gary Russell
30+
* @since 2.2.5
31+
*
32+
*/
33+
public class FailedRecordTrackerTests {
34+
35+
@Test
36+
public void testNoRetries() {
37+
AtomicBoolean recovered = new AtomicBoolean();
38+
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
39+
recovered.set(true);
40+
}, 1, mock(Log.class));
41+
ConsumerRecord<?, ?> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
42+
assertThat(tracker.skip(record, new RuntimeException())).isTrue();
43+
assertThat(recovered.get()).isTrue();
44+
}
45+
46+
@Test
47+
public void testThreeRetries() {
48+
AtomicBoolean recovered = new AtomicBoolean();
49+
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
50+
recovered.set(true);
51+
}, 4, mock(Log.class));
52+
ConsumerRecord<?, ?> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
53+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
54+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
55+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
56+
assertThat(tracker.skip(record, new RuntimeException())).isTrue();
57+
assertThat(recovered.get()).isTrue();
58+
}
59+
60+
}

0 commit comments

Comments
 (0)