Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void registerSender(TcpSender sender) {

@Override
public void registerSenders(List<TcpSender> sendersToRegister) {
this.theConnection.registerSenders(sendersToRegister);
this.theConnection.registerSenders(sendersToRegister, this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed confusing that interceptor abstraction implements a connection one.
So, we have such a strange code flow...

I wish @garyrussell will come back to us one day to revise this architecture 😄

Nevertheless would you mind, @gigermocas , to double check what we have so far with the TcpConnectionSupport.close():

public void close() {
		for (TcpSender sender : this.senders) {
			sender.removeDeadConnection(this);
		}
...

After this fix it doesn't look like we are going to clean up properly...

I'll debug it locally meanwhile, too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish @garyrussell will come back to us one day to revise this architecture 😄

Yes, this is a bit of a mess; dates back to very early code and has caused headaches since then.

Maybe in 6.0 @artembilan if you want to raise an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue created: #3515, but it doesn't mean we have to make it in 6.0. Probably one day in the future when it causes more headaches 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, I was too slow writing this, but these were my observations.

TcpConnectionSupport.close() can be called multiple times and TcpConnectionInterceptorSupport.close() will ultimately call it too. TcpConnectionSupport.closeConnection() (through close()) can call both again.

TcpConnectionSupport.close() with also publish a close event, which must be done once.

Keeping the senders list in TcpConnectionSupport seemed like a better fit looking at the rest of the API (and parity with the other register methods)

I hoped it was cleaner, I don't particularly like it, but I couldn't find an alternative without worrying about breaking the API or polluting the interceptor by re-implementing some behavior from TcpConnectionSupport 😟

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Mário Dias
*
* @since 2.0
*
Expand Down Expand Up @@ -316,9 +317,13 @@ public void registerSender(@Nullable TcpSender senderToRegister) {
* @since 5.4
*/
public void registerSenders(List<TcpSender> sendersToRegister) {
registerSenders(sendersToRegister, this);
}

protected final void registerSenders(List<TcpSender> sendersToRegister, TcpConnection connection) {
this.senders.addAll(sendersToRegister);
for (TcpSender sender : sendersToRegister) {
sender.addNewConnection(this);
sender.addNewConnection(connection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain;
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
Expand All @@ -80,7 +83,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Mario Dias
* @author Mário Dias
*
* @since 2.0
*/
Expand Down Expand Up @@ -1189,6 +1192,38 @@ public void testConnectionException() throws Exception {
}
}

@Test
public void testInterceptedConnection() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
scf.setSerializer(serializer);
scf.setDeserializer(serializer);
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(scf);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(scf);
final AtomicReference<TcpConnection> connection = new AtomicReference<>();
scf.setApplicationEventPublisher(event -> {
if (event instanceof TcpConnectionOpenEvent) {
connection.set(handler.getConnections()
.get(((TcpConnectionOpenEvent) event).getConnectionId()));
latch.countDown();
}
});
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher()));
scf.setInterceptorFactoryChain(fc);
scf.start();
TestingUtilities.waitListening(scf, null);
int port = scf.getPort();
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
socket.close();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(connection.get()).isInstanceOf(HelloWorldInterceptor.class);
scf.stop();
}

@Test
public void testInterceptedCleanup() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down