Skip to content

Commit 80d482d

Browse files
authored
GH-2641: Fix Exception Change with RetryableTopic
Resolves #2641 Need to traverse the cause tree to find the actual user exception. Also tested with reporter's reproducer. **cherry-pick to 2.9.x**
1 parent 552e13c commit 80d482d

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -200,8 +200,9 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Except
200200
Map<TopicPartition, FailedRecord> map, TopicPartition topicPartition) {
201201

202202
Exception realException = exception;
203-
if (realException instanceof ListenerExecutionFailedException
204-
&& realException.getCause() instanceof Exception) {
203+
while ((realException instanceof ListenerExecutionFailedException
204+
|| realException instanceof TimestampedException)
205+
&& realException.getCause() instanceof Exception) {
205206

206207
realException = (Exception) realException.getCause();
207208
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,7 +24,9 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
2728

29+
import org.apache.kafka.clients.consumer.Consumer;
2830
import org.apache.kafka.clients.consumer.ConsumerRecord;
2931
import org.apache.kafka.common.TopicPartition;
3032
import org.junit.jupiter.api.Test;
@@ -186,4 +188,32 @@ void exceptionChanges(boolean reset) {
186188
}
187189
}
188190

191+
@Test
192+
void exceptionChangesWithTimestampedException() throws InterruptedException {
193+
FixedBackOff bo1 = new FixedBackOff(0L, 5L);
194+
FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> { }, bo1, mock(LogAccessor.class));
195+
AtomicReference<Exception> captured = new AtomicReference<>();
196+
tracker.setBackOffFunction((record, ex) -> {
197+
captured.set(ex);
198+
if (ex instanceof IllegalStateException) {
199+
return bo1;
200+
}
201+
else {
202+
return new FixedBackOff(0L, 0L);
203+
}
204+
});
205+
IllegalStateException ise = new IllegalStateException();
206+
Exception ex = new ListenerExecutionFailedException("", new TimestampedException(
207+
new ListenerExecutionFailedException("", ise)));
208+
ConsumerRecord<?, ?> record = mock(ConsumerRecord.class);
209+
Consumer<?, ?> consumer = mock(Consumer.class);
210+
assertThat(tracker.recovered(record, ex, mock(MessageListenerContainer.class), consumer)).isFalse();
211+
assertThat(captured.get()).isSameAs(ise);
212+
IllegalArgumentException iae = new IllegalArgumentException();
213+
ex = new ListenerExecutionFailedException("", new TimestampedException(
214+
new ListenerExecutionFailedException("", iae)));
215+
assertThat(tracker.recovered(record, ex, mock(MessageListenerContainer.class), consumer)).isTrue();
216+
assertThat(captured.get()).isSameAs(iae);
217+
}
218+
189219
}

0 commit comments

Comments
 (0)