Skip to content

Commit 831521a

Browse files
committed
Fix new Sonar issues
1 parent f20f0d6 commit 831521a

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
6464
*/
6565
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; // late phase
6666

67+
private static final int DEFAULT_TOPIC_CHECK_TIMEOUT = 30;
68+
6769
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
6870

6971
protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)
@@ -85,6 +87,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8587
private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor =
8688
new DefaultAfterRollbackProcessor<>();
8789

90+
private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;
91+
8892
private volatile boolean running = false;
8993

9094
private volatile boolean paused;
@@ -265,6 +269,16 @@ public String getListenerId() {
265269
return this.beanName; // the container factory sets the bean name to the id attribute
266270
}
267271

272+
/**
273+
* How long to wait for {@link AdminClient#describeTopics(Collection)} result
274+
* futures to complete.
275+
* @param topicCheckTimeout the timeout in seconds; default 30.
276+
* @since 2.3
277+
*/
278+
public void setTopicCheckTimeout(int topicCheckTimeout) {
279+
this.topicCheckTimeout = topicCheckTimeout;
280+
}
281+
268282
@Override
269283
public void setupMessageListener(Object messageListener) {
270284
this.containerProperties.setMessageListener(messageListener);
@@ -304,7 +318,7 @@ protected void checkTopics() {
304318
.stream()
305319
.filter(entry -> {
306320
try {
307-
entry.getValue().get(30, TimeUnit.SECONDS);
321+
entry.getValue().get(this.topicCheckTimeout, TimeUnit.SECONDS);
308322
return false;
309323
}
310324
catch (@SuppressWarnings("unused") Exception e) {

spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import org.apache.kafka.clients.consumer.Consumer;
3131
import org.apache.kafka.clients.consumer.ConsumerRecord;
32-
import org.apache.kafka.clients.consumer.ConsumerRecords;
3332
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3433
import org.apache.kafka.common.TopicPartition;
3534
import org.apache.kafka.common.header.Header;
@@ -58,24 +57,26 @@ public class AggregatingReplyingKafkaTemplate<K, V, R>
5857
implements BatchConsumerAwareMessageListener<K, Collection<ConsumerRecord<K, R>>> {
5958

6059
/**
61-
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
60+
* Pseudo topic name for the "outer" {@link ConsumerRecord} that has the aggregated
6261
* results in its value after a normal release by the release strategy.
6362
*/
6463
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
6564

6665
/**
67-
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
66+
* Pseudo topic name for the "outer" {@link ConsumerRecord} that has the aggregated
6867
* results in its value after a timeout.
6968
*/
7069
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
7170

71+
private static final int DEFAULT_COMMIT_TIMEOUT = 30;
72+
7273
private final Map<CorrelationKey, Set<RecordHolder<K, R>>> pending = new HashMap<>();
7374

7475
private final Map<TopicPartition, Long> offsets = new HashMap<>();
7576

7677
private final Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy;
7778

78-
private Duration commitTimeout = Duration.ofSeconds(30);
79+
private Duration commitTimeout = Duration.ofSeconds(DEFAULT_COMMIT_TIMEOUT);
7980

8081
private boolean returnPartialOnTimeout;
8182

@@ -111,7 +112,7 @@ public void setCommitTimeout(Duration commitTimeout) {
111112
* Set to true to return a partial result when a request times out.
112113
* @param returnPartialOnTimeout true to return a partial result.
113114
*/
114-
public void setReturnPartialOnTimeout(boolean returnPartialOnTimeout) {
115+
public synchronized void setReturnPartialOnTimeout(boolean returnPartialOnTimeout) {
115116
this.returnPartialOnTimeout = returnPartialOnTimeout;
116117
}
117118

0 commit comments

Comments
 (0)