Skip to content

Commit bbc5003

Browse files
garyrussellartembilan
authored andcommitted
GH-1778: Properly Apply Client Post Processors
Resolves #1778 `Consumer`/`Producer` post processors are `Function`s and the result should be used; it was previously discarded. **cherry-pick to 2.6.x, 2.5.x**
1 parent 06483a0 commit bbc5003

File tree

4 files changed

+25
-14
lines changed

4 files changed

+25
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
350350
}
351351
}
352352
for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
353-
pp.apply(kafkaConsumer);
353+
kafkaConsumer = pp.apply(kafkaConsumer);
354354
}
355355
return kafkaConsumer;
356356
}

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -739,9 +739,11 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
739739
}
740740

741741
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
742-
KafkaProducer<K, V> kafkaProducer =
742+
Producer<K, V> kafkaProducer =
743743
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
744-
this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
744+
for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
745+
kafkaProducer = pp.apply(kafkaProducer);
746+
}
745747
return kafkaProducer;
746748
}
747749

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.Properties;
3030
import java.util.concurrent.CountDownLatch;
3131
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
3333
import java.util.stream.Collectors;
3434
import java.util.stream.Stream;
3535

@@ -41,6 +41,7 @@
4141
import org.apache.kafka.common.serialization.StringDeserializer;
4242
import org.junit.jupiter.api.Test;
4343

44+
import org.springframework.aop.framework.ProxyFactory;
4445
import org.springframework.aop.support.AopUtils;
4546
import org.springframework.beans.factory.annotation.Autowired;
4647
import org.springframework.context.annotation.Configuration;
@@ -337,10 +338,14 @@ public void testNestedTxProducerIsCached() throws Exception {
337338
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
338339
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
339340
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
340-
AtomicBoolean ppCalled = new AtomicBoolean();
341+
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
341342
cf.addPostProcessor(consumer -> {
342-
ppCalled.set(true);
343-
return consumer;
343+
ProxyFactory prox = new ProxyFactory();
344+
prox.setTarget(consumer);
345+
@SuppressWarnings("unchecked")
346+
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
347+
wrapped.set(proxy);
348+
return proxy;
344349
});
345350
ContainerProperties containerProps = new ContainerProperties("txCache1");
346351
CountDownLatch latch = new CountDownLatch(1);
@@ -362,13 +367,13 @@ public void testNestedTxProducerIsCached() throws Exception {
362367
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
363368
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
364369
assertThat(pfTx.getCache()).hasSize(1);
370+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
365371
}
366372
finally {
367373
container.stop();
368374
pf.destroy();
369375
pfTx.destroy();
370376
}
371-
assertThat(ppCalled.get()).isTrue();
372377
}
373378

374379
@SuppressWarnings("unchecked")

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,7 +38,6 @@
3838
import java.util.UUID;
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
41-
import java.util.concurrent.atomic.AtomicBoolean;
4241
import java.util.concurrent.atomic.AtomicInteger;
4342
import java.util.concurrent.atomic.AtomicReference;
4443
import java.util.function.Supplier;
@@ -64,6 +63,7 @@
6463
import org.junit.jupiter.api.BeforeAll;
6564
import org.junit.jupiter.api.Test;
6665

66+
import org.springframework.aop.framework.ProxyFactory;
6767
import org.springframework.kafka.KafkaException;
6868
import org.springframework.kafka.support.Acknowledgment;
6969
import org.springframework.kafka.support.CompositeProducerListener;
@@ -120,10 +120,14 @@ public static void tearDown() {
120120
void testTemplate() {
121121
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
122122
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
123-
AtomicBoolean ppCalled = new AtomicBoolean();
123+
AtomicReference<Producer<Integer, String>> wrapped = new AtomicReference<>();
124124
pf.addPostProcessor(prod -> {
125-
ppCalled.set(true);
126-
return prod;
125+
ProxyFactory prox = new ProxyFactory();
126+
prox.setTarget(prod);
127+
@SuppressWarnings("unchecked")
128+
Producer<Integer, String> proxy = (Producer<Integer, String>) prox.getProxy();
129+
wrapped.set(proxy);
130+
return proxy;
127131
});
128132
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
129133

@@ -166,8 +170,8 @@ void testTemplate() {
166170
List<PartitionInfo> partitions = template.partitionsFor(INT_KEY_TOPIC);
167171
assertThat(partitions).isNotNull();
168172
assertThat(partitions).hasSize(2);
173+
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
169174
pf.destroy();
170-
assertThat(ppCalled.get()).isTrue();
171175
}
172176

173177
@Test

0 commit comments

Comments
 (0)