Skip to content
Draft
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ subprojects {
ext['assertj.version'] = '3.19.0'
ext['netflix.limits.version'] = '0.3.6'
ext['bouncycastle-bcpkix.version'] = '1.68'
ext['aeron.version'] = '1.31.1'

group = "io.rsocket"

Expand Down Expand Up @@ -87,6 +88,10 @@ subprojects {
entry 'jmh-core'
entry 'jmh-generator-annprocess'
}
dependencySet(group: 'io.aeron', version: ext['aeron.version']) {
entry 'aeron-client'
entry 'aeron-driver'
}
}
generatedPomCustomization {
enabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class BaseDuplexConnection implements DuplexConnection {
protected UnboundedProcessor sender = new UnboundedProcessor();

public BaseDuplexConnection() {
onClose().doFinally(s -> doOnClose()).subscribe();
onClose().subscribe(null, t -> doOnClose(), this::doOnClose);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public KeepAliveFramesAcceptor start(
KeepAliveSupport keepAliveSupport,
Consumer<ByteBuf> onSendKeepAliveFrame,
Consumer<KeepAlive> onTimeout) {
duplexConnection.onClose().doFinally(s -> keepAliveSupport.stop()).subscribe();
duplexConnection
.onClose()
.subscribe(null, __ -> keepAliveSupport.stop(), keepAliveSupport::stop);
return keepAliveSupport
.onSendKeepAliveFrame(onSendKeepAliveFrame)
.onTimeout(onTimeout)
Expand Down
Loading