Skip to content

Commit 46aea17

Browse files
garyrussellartembilan
authored andcommitted
Fix Consumer Property Overrides
If the consumer properties object contains `default` properties, they were ignored. `Properties.forEach()` does not return default properties. Use `stringPropertyNames()` instead. **cherry-pick to 2.2.x**
1 parent 45ddac7 commit 46aea17

File tree

4 files changed

+19
-11
lines changed

4 files changed

+19
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nul
126126
}
127127
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
128128
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
129-
if (groupId == null && properties == null && !shouldModifyClientId) {
129+
if (groupId == null
130+
&& (properties == null || properties.stringPropertyNames().size() == 0)
131+
&& !shouldModifyClientId) {
130132
return createKafkaConsumer(this.configs);
131133
}
132134
else {
@@ -149,11 +151,11 @@ private KafkaConsumer<K, V> createConsumerWithAdjustedProperties(String groupId,
149151
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
150152
}
151153
if (properties != null) {
152-
properties.forEach((k, v) -> {
153-
if (!k.equals(ConsumerConfig.CLIENT_ID_CONFIG) && !k.equals(ConsumerConfig.GROUP_ID_CONFIG)) {
154-
modifiedConfigs.put((String) k, v);
155-
}
156-
});
154+
properties.stringPropertyNames()
155+
.stream()
156+
.filter(name -> !name.equals(ConsumerConfig.CLIENT_ID_CONFIG)
157+
&& !name.equals(ConsumerConfig.GROUP_ID_CONFIG))
158+
.forEach(name -> modifiedConfigs.put(name, properties.getProperty(name)));
157159
}
158160
return createKafkaConsumer(modifiedConfigs);
159161
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ protected void checkTopics() {
282282
if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
283283
try (Consumer<K, V> consumer =
284284
this.consumerFactory.createConsumer(this.containerProperties.getGroupId(),
285-
this.containerProperties.getClientId(), null)) {
285+
this.containerProperties.getClientId(), null,
286+
this.containerProperties.getConsumerProperties())) {
286287
if (consumer != null) {
287288
String[] topics = this.containerProperties.getTopics();
288289
if (topics == null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ public void setMissingTopicsFatal(boolean missingTopicsFatal) {
617617
* name(s) in the consumer factory.
618618
* {@code group.id} and {@code client.id} are ignored.
619619
* @return the properties.
620-
* @since 2.1.4
620+
* @since 2.2.4
621621
* @see org.apache.kafka.clients.consumer.ConsumerConfig
622622
* @see #setGroupId(String)
623623
* @see #setClientId(String)
@@ -633,7 +633,7 @@ public Properties getConsumerProperties() {
633633
* name(s) in the consumer factory.
634634
* {@code group.id} and {@code client.id} are ignored.
635635
* @param consumerProperties the properties.
636-
* @since 2.1.4
636+
* @since 2.2.4
637637
* @see org.apache.kafka.clients.consumer.ConsumerConfig
638638
* @see #setGroupId(String)
639639
* @see #setClientId(String)

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,8 +1213,8 @@ public void testDefinedPartitions() throws Exception {
12131213
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
12141214

12151215
@Override
1216-
public Consumer<Integer, String> createConsumer(String groupId, String clientIdPrefix,
1217-
String clientIdSuffix, Properties properties) {
1216+
protected KafkaConsumer<Integer, String> createKafkaConsumer(Map<String, Object> configs) {
1217+
assertThat(configs).containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
12181218
return new KafkaConsumer<Integer, String>(props) {
12191219

12201220
@Override
@@ -1238,6 +1238,10 @@ public ConsumerRecords<Integer, String> poll(Duration timeout) {
12381238
logger.info("defined part: " + message);
12391239
latch1.countDown();
12401240
});
1241+
Properties defaultProperties = new Properties();
1242+
defaultProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "42");
1243+
Properties consumerProperties = new Properties(defaultProperties);
1244+
container1Props.setConsumerProperties(consumerProperties);
12411245
CountDownLatch stubbingComplete1 = new CountDownLatch(1);
12421246
KafkaMessageListenerContainer<Integer, String> container1 = spyOnContainer(
12431247
new KafkaMessageListenerContainer<>(cf, container1Props), stubbingComplete1);
@@ -1266,6 +1270,7 @@ public ConsumerRecords<Integer, String> poll(Duration timeout) {
12661270
logger.info("defined part: " + message);
12671271
latch2.countDown();
12681272
});
1273+
container2Props.setConsumerProperties(consumerProperties);
12691274
CountDownLatch stubbingComplete2 = new CountDownLatch(1);
12701275
KafkaMessageListenerContainer<Integer, String> container2 = spyOnContainer(
12711276
new KafkaMessageListenerContainer<>(cf, container2Props), stubbingComplete2);

0 commit comments

Comments
 (0)