From 3d6c7181cd0d961a5f3810996ca53d7ed71fe98e Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Thu, 17 Jul 2025 16:27:40 -0400 Subject: [PATCH 1/6] Add ability for a user to set mqtt's `quiescentTimeout` for forceable shutdowns Mqttv5 updated to also support this feature Fixes: https://github.com/spring-projects/spring-integration/issues/10095 --- ...stractMqttMessageDrivenChannelAdapter.java | 19 +++++++++++ .../MqttPahoMessageDrivenChannelAdapter.java | 3 +- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 5 ++- .../mqtt/BackToBackAdapterTests.java | 32 +++++++++++++++++++ .../integration/mqtt/MqttAdapterTests.java | 7 ++-- .../mqtt/Mqttv5BackToBackTests.java | 18 +++++++++++ 6 files changed, 79 insertions(+), 5 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index d1be00b5588..7210d167e0a 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -50,6 +50,7 @@ * @author Mikhail Polivakha * @author Artem Vozhdayenko * @author Jiri Soucek + * @author Glenn Renfro * * @since 4.0 * @@ -59,6 +60,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport implements ApplicationEventPublisherAware, ClientManager.ConnectCallback { + public static final Long QUIESCENT_TIMEOUT = 30_000L; + protected final Lock topicLock = new ReentrantLock(); // NOSONAR private final String url; @@ -73,6 +76,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends Mess private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; + private long quiescentTimeout = QUIESCENT_TIMEOUT; + private boolean manualAcks; private ApplicationEventPublisher applicationEventPublisher; @@ -199,6 +204,20 @@ protected long getDisconnectCompletionTimeout() { return this.disconnectCompletionTimeout; } + /** + * Set the quiescentTimeout timeout when disconnecting. + * Default is 30,000 milliseconds. + * @param quiescentTimeout The timeout. + * @since 7.0.0 + */ + public void setQuiescentTimeout(long quiescentTimeout) { + this.quiescentTimeout = quiescentTimeout; + } + + public long getQuiescentTimeout() { + return this.quiescentTimeout; + } + @Override protected void onInit() { super.onInit(); diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index 28b5026a952..f56b9a53dc3 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -58,6 +58,7 @@ * @author Gary Russell * @author Artem Bilan * @author Artem Vozhdayenko + * @author Glenn Renfro * * @since 4.0 * @@ -227,7 +228,7 @@ protected void doStop() { } try { - this.client.disconnectForcibly(getDisconnectCompletionTimeout()); + this.client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.client); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index ec79783f67a..39ea5a1fc27 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -37,6 +37,7 @@ import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationEventPublisher; @@ -81,6 +82,7 @@ * @author Lucas Bowler * @author Artem Vozhdayenko * @author Matthias Thoma + * @author Glenn Renfro * * @since 5.5.5 * @@ -296,7 +298,8 @@ protected void doStop() { } if (getClientManager() == null) { - this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout()); + this.mqttClient.disconnectForcibly(QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(), + MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.mqttClient); } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java index ba30b442ffb..f82ea4cf758 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java @@ -64,6 +64,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Glenn Renfro * * @since 4.0 * @@ -73,6 +74,10 @@ @DirtiesContext public class BackToBackAdapterTests implements MosquittoContainerTest { + private static final long QUIESCENT_TIMEOUT = 1; + + private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L; + @TempDir static File folder; @@ -123,6 +128,33 @@ public void testSingleTopic() { assertThat(adapter.getConnectionInfo().getServerURIs()[0]).isEqualTo(MosquittoContainerTest.mqttUrl()); } + @Test + void testSingleTopicWithQuiescentSet() { + MqttPahoMessageHandler adapter = new MqttPahoMessageHandler(MosquittoContainerTest.mqttUrl(), "si-test-out"); + adapter.setDefaultTopic("mqtt-foo"); + adapter.setBeanFactory(mock(BeanFactory.class)); + adapter.afterPropertiesSet(); + adapter.start(); + MqttPahoMessageDrivenChannelAdapter inbound = + new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); + QueueChannel outputChannel = new QueueChannel(); + inbound.setOutputChannel(outputChannel); + inbound.setTaskScheduler(taskScheduler); + inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT); + inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + inbound.setBeanFactory(mock(BeanFactory.class)); + inbound.afterPropertiesSet(); + inbound.start(); + adapter.handleMessage(new GenericMessage<>("foo")); + Message out = outputChannel.receive(20000); + assertThat(out).isNotNull(); + adapter.stop(); + inbound.stop(); + assertThat(out.getPayload()).isEqualTo("foo"); + assertThat(out.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)).isEqualTo("mqtt-foo"); + assertThat(adapter.getConnectionInfo().getServerURIs()[0]).isEqualTo(MosquittoContainerTest.mqttUrl()); + } + @Test public void testJson() { testJsonCommon("org.springframework"); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java index 0f3caf231b4..7df7f97b3e1 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java @@ -100,6 +100,7 @@ * @author Gary Russell * @author Artem Bilan * @author Artem Vozhdayenko + * @author Glenn Renfro * * @since 4.0 * @@ -519,7 +520,7 @@ public void testDifferentQos() throws Exception { new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE); adapter.stop(); - verify(client).disconnectForcibly(5_000L); + verify(client).disconnectForcibly(30_000, 5_000L); } @Test @@ -589,14 +590,14 @@ private void verifyUnsubscribe(IMqttAsyncClient client) throws Exception { verify(client).connect(any(MqttConnectOptions.class)); verify(client).subscribe(any(String[].class), any(int[].class), any()); verify(client).unsubscribe(any(String[].class)); - verify(client).disconnectForcibly(anyLong()); + verify(client).disconnectForcibly(anyLong(), anyLong()); } private void verifyNotUnsubscribe(IMqttAsyncClient client) throws Exception { verify(client).connect(any(MqttConnectOptions.class)); verify(client).subscribe(any(String[].class), any(int[].class), any()); verify(client, never()).unsubscribe(any(String[].class)); - verify(client).disconnectForcibly(anyLong()); + verify(client).disconnectForcibly(anyLong(), anyLong()); } @Configuration diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 3cb16af8594..0eba7b531df 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -57,6 +57,7 @@ * @author Gary Russell * @author Artem Bilan * @author Mikhail Polivakha + * @author Glenn Renfro * * @since 5.5.5 * @@ -153,6 +154,23 @@ public void testSharedTopicMqttv5Interaction() { assertThat(receive.getPayload()).isEqualTo(testPayload); } + @Test + void testSharedTopicMqttv5InteractionQuiescentTimeout() { + this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopicq"); + this.mqttv5MessageDrivenChannelAdapter.setQuiescentTimeout(2000); + this.mqttv5MessageDrivenChannelAdapter.setDisconnectCompletionTimeout(2000); + String testPayload = "shared topic payload"; + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "testTopicq") + .build()); + + Message receive = this.fromMqttChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isEqualTo(testPayload); + } + @Configuration @EnableIntegration public static class Config { From 03d3bfda8b0c693d9780246e7366cc4e5ed2fe5d Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Fri, 18 Jul 2025 15:46:56 -0400 Subject: [PATCH 2/6] Move QUIESCENT_TIMEOUT constant to ClientManager Update tests to set setDisconnectCompletionTimeout and setQuiescentTimeout to 1L to reduce test runtime Rebase --- .../integration/mqtt/core/ClientManager.java | 2 + ...stractMqttMessageDrivenChannelAdapter.java | 8 +-- ...Mqttv5PahoMessageDrivenChannelAdapter.java | 2 +- .../mqtt/BackToBackAdapterTests.java | 59 +++++-------------- .../mqtt/Mqttv5BackToBackTests.java | 25 +++----- 5 files changed, 27 insertions(+), 69 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java index 7b4a6860c9e..2c43e53c33d 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java @@ -39,6 +39,8 @@ public interface ClientManager extends SmartLifecycle, MqttComponent { */ long DEFAULT_COMPLETION_TIMEOUT = 30_000L; + Long QUIESCENT_TIMEOUT = 30_000L; + /** * The default disconnect completion timeout in milliseconds. */ diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index 7210d167e0a..fe68ad5b16e 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -60,8 +60,6 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport implements ApplicationEventPublisherAware, ClientManager.ConnectCallback { - public static final Long QUIESCENT_TIMEOUT = 30_000L; - protected final Lock topicLock = new ReentrantLock(); // NOSONAR private final String url; @@ -76,7 +74,7 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends Mess private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; - private long quiescentTimeout = QUIESCENT_TIMEOUT; + private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT; private boolean manualAcks; @@ -206,7 +204,7 @@ protected long getDisconnectCompletionTimeout() { /** * Set the quiescentTimeout timeout when disconnecting. - * Default is 30,000 milliseconds. + * Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds. * @param quiescentTimeout The timeout. * @since 7.0.0 */ @@ -214,7 +212,7 @@ public void setQuiescentTimeout(long quiescentTimeout) { this.quiescentTimeout = quiescentTimeout; } - public long getQuiescentTimeout() { + protected long getQuiescentTimeout() { return this.quiescentTimeout; } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 39ea5a1fc27..4e03554dbd0 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -298,7 +298,7 @@ protected void doStop() { } if (getClientManager() == null) { - this.mqttClient.disconnectForcibly(QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(), + this.mqttClient.disconnectForcibly(ClientManager.QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(), MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.mqttClient); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java index f82ea4cf758..6bf8d3fe95e 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java @@ -113,36 +113,7 @@ public void testSingleTopic() { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); - inbound.afterPropertiesSet(); - inbound.start(); - adapter.handleMessage(new GenericMessage<>("foo")); - Message out = outputChannel.receive(20000); - assertThat(out).isNotNull(); - adapter.stop(); - inbound.stop(); - assertThat(out.getPayload()).isEqualTo("foo"); - assertThat(out.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)).isEqualTo("mqtt-foo"); - assertThat(adapter.getConnectionInfo().getServerURIs()[0]).isEqualTo(MosquittoContainerTest.mqttUrl()); - } - - @Test - void testSingleTopicWithQuiescentSet() { - MqttPahoMessageHandler adapter = new MqttPahoMessageHandler(MosquittoContainerTest.mqttUrl(), "si-test-out"); - adapter.setDefaultTopic("mqtt-foo"); - adapter.setBeanFactory(mock(BeanFactory.class)); - adapter.afterPropertiesSet(); - adapter.start(); - MqttPahoMessageDrivenChannelAdapter inbound = - new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); - QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT); - inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); adapter.handleMessage(new GenericMessage<>("foo")); @@ -179,9 +150,7 @@ private void testJsonCommon(String... trusted) { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.setConverter(converter); inbound.afterPropertiesSet(); inbound.start(); @@ -210,9 +179,7 @@ public void testAddRemoveTopic() { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); inbound.addTopic("mqtt-foo"); @@ -258,9 +225,7 @@ public void testTwoTopics() { new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo", "mqtt-bar"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); adapter.handleMessage(new GenericMessage<>("foo")); @@ -293,9 +258,7 @@ public void testAsync() throws Exception { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); GenericMessage message = new GenericMessage<>("foo"); @@ -331,9 +294,7 @@ public void testAsyncPersisted() throws Exception { new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo", "mqtt-bar"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); Message message1 = new GenericMessage<>("foo"); @@ -428,6 +389,14 @@ public void onApplicationEvent(MqttSubscribedEvent event) { } + private void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) { + inbound.setOutputChannel(outputChannel); + inbound.setTaskScheduler(taskScheduler); + inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT); + inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + inbound.setBeanFactory(mock(BeanFactory.class)); + } + private class EventPublisher implements ApplicationEventPublisher { private volatile MqttMessageDeliveredEvent delivered; diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 0eba7b531df..4d634a772fd 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -66,6 +66,10 @@ @DirtiesContext public class Mqttv5BackToBackTests implements MosquittoContainerTest { + private static final long QUIESCENT_TIMEOUT = 1; + + private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L; + @Autowired @Qualifier("mqttOutFlow.input") private MessageChannel mqttOutFlowInput; @@ -95,7 +99,7 @@ public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() { @Test public void testSimpleMqttv5Interaction() { - String testPayload = "foo"; + String testPayload = "datakey"; this.mqttOutFlowInput.send( MessageBuilder.withPayload(testPayload) @@ -154,23 +158,6 @@ public void testSharedTopicMqttv5Interaction() { assertThat(receive.getPayload()).isEqualTo(testPayload); } - @Test - void testSharedTopicMqttv5InteractionQuiescentTimeout() { - this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopicq"); - this.mqttv5MessageDrivenChannelAdapter.setQuiescentTimeout(2000); - this.mqttv5MessageDrivenChannelAdapter.setDisconnectCompletionTimeout(2000); - String testPayload = "shared topic payload"; - this.mqttOutFlowInput.send( - MessageBuilder.withPayload(testPayload) - .setHeader(MqttHeaders.TOPIC, "testTopicq") - .build()); - - Message receive = this.fromMqttChannel.receive(10_000); - - assertThat(receive).isNotNull(); - assertThat(receive.getPayload()).isEqualTo(testPayload); - } - @Configuration @EnableIntegration public static class Config { @@ -231,6 +218,8 @@ public IntegrationFlow mqttInFlow() { new Mqttv5PahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "mqttv5SIin", mqttSubscription); messageProducer.setPayloadType(String.class); + messageProducer.setQuiescentTimeout(QUIESCENT_TIMEOUT); + messageProducer.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); From 7879938aece4b5492834a93638fab8fe5501c63b Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Fri, 18 Jul 2025 16:08:01 -0400 Subject: [PATCH 3/6] Mqttv5PahoMessageDrivenChannelAdapter.disconnectForcibly needs to use getQuiescentTimeout for quiescent timeout BackToBackAdapterTests initializeInboundAdapter needs to be a static method --- .../mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java | 2 +- .../integration/mqtt/BackToBackAdapterTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 4e03554dbd0..848053c7493 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -298,7 +298,7 @@ protected void doStop() { } if (getClientManager() == null) { - this.mqttClient.disconnectForcibly(ClientManager.QUIESCENT_TIMEOUT, getDisconnectCompletionTimeout(), + this.mqttClient.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(), MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.mqttClient); diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java index 6bf8d3fe95e..2cf37627eea 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java @@ -389,7 +389,7 @@ public void onApplicationEvent(MqttSubscribedEvent event) { } - private void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) { + private static void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) { inbound.setOutputChannel(outputChannel); inbound.setTaskScheduler(taskScheduler); inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT); From 7d165c523af9cabe50aef2617a7274fa876422f6 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Fri, 18 Jul 2025 18:04:41 -0400 Subject: [PATCH 4/6] Apply quiescentTimeout attribute to AbstractMqttClientManager. Add the quiescentTimeout in the disconnectForcibly in the Mqttv3ClientManager and Mqttv5ClientManager classes Add documentation to what's new doc describing the addition of quiescence timeout --- .../mqtt/core/AbstractMqttClientManager.java | 16 ++++++++++++++++ .../mqtt/core/Mqttv3ClientManager.java | 2 +- .../mqtt/core/Mqttv5ClientManager.java | 4 +++- .../mqtt/ClientManagerBackToBackTests.java | 19 ++++++++++++++++--- .../integration/mqtt/MqttAdapterTests.java | 2 +- .../antora/modules/ROOT/pages/whats-new.adoc | 5 +++++ 6 files changed, 42 insertions(+), 6 deletions(-) diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java index fae12599dab..e3a66b178d7 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java @@ -62,6 +62,8 @@ public abstract class AbstractMqttClientManager implements ClientManager Date: Mon, 21 Jul 2025 08:30:00 -0400 Subject: [PATCH 5/6] Update the whats new MQTT announcement It should be rewritten to state that this is exposing more of the MQTT Client API. --- src/reference/antora/modules/ROOT/pages/whats-new.adoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index a85eac39010..547b4d66578 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -48,4 +48,5 @@ Previously deprecated classes in the `spring-integation-hazelcast` module, such [[x7.0-mqtt-changes]] === MQTT Changes -The `disconnectForcibly` method in `MqttPahoMessageDrivenChannelAdapter`, `Mqttv5PahoMessageDrivenChannelAdapter`, `Mqttv3ClientManager`, and `Mqttv5ClientManager` now supports `getQuiescentTimeout` in addition to the existing `disconnectCompletionTimeout`. +The `disconnectForcibly` method in `MqttPahoMessageDrivenChannelAdapter`, `Mqttv5PahoMessageDrivenChannelAdapter`, `Mqttv3ClientManager`, and `Mqttv5ClientManager` now expose the option to configure `quiescentTimeout`, which is available through the https://eclipse.dev/paho/files/javadoc/org/eclipse/paho/client/mqttv3/IMqttAsyncClient.html[IMqttAsyncClient] client. +See https://docs.spring.io/spring-integration/api/org/springframework/integration/mqtt/core/package-summary.html[MQTT Core Java Docs] for more information. \ No newline at end of file From fb5afec53eb702465252a43c8ed2ca752c469590 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Mon, 21 Jul 2025 11:00:04 -0400 Subject: [PATCH 6/6] Change what's new notes on mqtt changes to be brief --- src/reference/antora/modules/ROOT/pages/whats-new.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 547b4d66578..4ec95e23c10 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -48,5 +48,5 @@ Previously deprecated classes in the `spring-integation-hazelcast` module, such [[x7.0-mqtt-changes]] === MQTT Changes -The `disconnectForcibly` method in `MqttPahoMessageDrivenChannelAdapter`, `Mqttv5PahoMessageDrivenChannelAdapter`, `Mqttv3ClientManager`, and `Mqttv5ClientManager` now expose the option to configure `quiescentTimeout`, which is available through the https://eclipse.dev/paho/files/javadoc/org/eclipse/paho/client/mqttv3/IMqttAsyncClient.html[IMqttAsyncClient] client. -See https://docs.spring.io/spring-integration/api/org/springframework/integration/mqtt/core/package-summary.html[MQTT Core Java Docs] for more information. \ No newline at end of file +The `AbstractMqttMessageDrivenChannelAdapter` and `ClientManager` implementations now expose a `quiescentTimeout` option which is propagated in their `stop()` method down to the `disconnectForcibly()` API of the MQTT Paho clients. +See xref:mqtt.adoc[] for more information. \ No newline at end of file