Skip to content

Commit bbdf0ea

Browse files
committed
Polishing.
Refine exception propagation by collecting exceptions as list instead of using an atomic reference. Also, introduce exception aggregator for easier exception surfacing. [resolves #678][#677]
1 parent 88aaa3c commit bbdf0ea

File tree

5 files changed

+56
-32
lines changed

5 files changed

+56
-32
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,9 @@ Logging facilities:
607607

608608
* Driver Logging (`io.r2dbc.postgresql`)
609609
* Query Logging (`io.r2dbc.postgresql.QUERY` on `DEBUG` level)
610+
* Connection Context (`io.r2dbc.postgresql.client.ConnectionContext`)
611+
* `DEBUG` level enables connection and process identifiers in log messages and exceptions (`[cid: 0x1][pid: 109]`)
612+
* `TRACE` level enables socket information (remote and local addresses) to the connection context (`[cid: 0x1][pid: 109][id: 0x79dfc4d4, L:/127.0.0.1:49391 - R:localhost/127.0.0.1:49366]`)
610613
* Parameters' values Logging (`io.r2dbc.postgresql.PARAM` on `DEBUG` level)
611614
* Transport Logging (`io.r2dbc.postgresql.client`)
612615
* `DEBUG` enables `Message` exchange logging

src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.concurrent.ConcurrentHashMap;
38-
import java.util.concurrent.atomic.AtomicReference;
38+
import java.util.concurrent.CopyOnWriteArrayList;
39+
import java.util.function.Consumer;
3940
import java.util.function.Predicate;
4041

4142
import static io.r2dbc.postgresql.MultiHostConnectionStrategy.TargetServerType.ANY;
@@ -73,7 +74,7 @@ public final class MultiHostConnectionStrategy implements ConnectionStrategy {
7374

7475
@Override
7576
public Mono<Client> connect() {
76-
return Mono.defer(() -> connect(this.multiHostConfiguration.getTargetServerType()));
77+
return connect(this.multiHostConfiguration.getTargetServerType());
7778
}
7879

7980
@Override
@@ -83,40 +84,37 @@ public String toString() {
8384
}
8485

8586
public Mono<Client> connect(TargetServerType targetServerType) {
86-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
87+
List<Throwable> errors = new CopyOnWriteArrayList<>();
8788

88-
return Mono.defer(() -> attemptConnection(targetServerType))
89+
return attemptConnection(targetServerType, errors::add)
8990
.onErrorResume(e -> {
90-
if (!exceptionRef.compareAndSet(null, e)) {
91-
exceptionRef.get().addSuppressed(e);
92-
}
91+
errors.add(e);
9392
return Mono.empty();
9493
})
95-
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY) : Mono.empty()))
94+
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY, errors::add) : Mono.empty()))
9695
.switchIfEmpty(Mono.error(() -> {
97-
Throwable error = exceptionRef.get();
98-
if (error == null) {
99-
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type '%s'", targetServerType), null);
96+
if (errors.isEmpty()) {
97+
return new ExceptionAggregator(String.format("No server matches target type '%s'", targetServerType), null);
10098
} else {
101-
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("Cannot connect to a host of %s", this.addresses), error);
99+
RuntimeException exception = new ExceptionAggregator(null, errors.size() == 1 ?
100+
errors.get(0) : null);
101+
102+
if (errors.size() > 1) {
103+
errors.forEach(exception::addSuppressed);
104+
}
105+
return exception;
102106
}
103107
}));
104108
}
105109

106-
private Mono<Client> attemptConnection(TargetServerType targetServerType) {
107-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
110+
private Mono<Client> attemptConnection(TargetServerType targetServerType, Consumer<Throwable> errorHandler) {
108111
return getCandidates(targetServerType).concatMap(candidate -> this.attemptConnection(targetServerType, candidate)
109112
.onErrorResume(e -> {
110-
if (!exceptionRef.compareAndSet(null, e)) {
111-
exceptionRef.get().addSuppressed(e);
112-
}
113+
errorHandler.accept(e);
113114
this.statusMap.put(candidate, HostConnectOutcome.fail(candidate));
114115
return Mono.empty();
115116
}))
116-
.next()
117-
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
118-
? Mono.error(exceptionRef.get())
119-
: Mono.empty()));
117+
.next();
120118
}
121119

122120
private Mono<Client> attemptConnection(TargetServerType targetServerType, SocketAddress candidate) {
@@ -229,6 +227,13 @@ public interface HostSelector {
229227

230228
}
231229

230+
static class ExceptionAggregator extends RuntimeException {
231+
232+
public ExceptionAggregator(@Nullable String message, @Nullable Throwable cause) {
233+
super(message, cause);
234+
}
235+
}
236+
232237
private static class HostConnectOutcome {
233238

234239
static final Clock DEFAULT_CLOCK = Clock.systemDefaultZone();

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,16 @@ private Mono<PostgresqlConnection> closeWithError(Client client, Throwable throw
181181

182182
private Throwable cannotConnect(Throwable throwable, ConnectionStrategy strategy) {
183183

184+
// Rewrite ExceptionAggregator but preserve suppressed exceptions to enrich the exception context
185+
if (throwable instanceof MultiHostConnectionStrategy.ExceptionAggregator) {
186+
String message = throwable.getMessage() != null ? String.format("Cannot connect to %s: %s", strategy, throwable.getMessage()) : String.format("Cannot connect to %s", strategy);
187+
PostgresConnectionException exception = new PostgresConnectionException(message, throwable.getCause());
188+
for (Throwable t : throwable.getSuppressed()) {
189+
exception.addSuppressed(t);
190+
}
191+
return exception;
192+
}
193+
184194
if (throwable instanceof R2dbcException) {
185195
return throwable;
186196
}
@@ -226,6 +236,7 @@ static class PostgresConnectionException extends R2dbcNonTransientResourceExcept
226236

227237
private final ErrorDetails errorDetails;
228238

239+
229240
public PostgresConnectionException(String reason, @Nullable Throwable cause) {
230241
super(reason, CONNECTION_DOES_NOT_EXIST, 0, null, cause);
231242
this.errorDetails = ErrorDetails.fromCodeAndMessage(CONNECTION_DOES_NOT_EXIST, reason);

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,6 @@ public final class ReactorNettyClient implements Client {
100100

101101
private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
102102

103-
private static final Supplier<PostgresConnectionClosedException> UNEXPECTED = () -> new PostgresConnectionClosedException("Connection unexpectedly closed");
104-
105-
private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed");
106-
107103
private final ByteBufAllocator byteBufAllocator;
108104

109105
private final ConnectionSettings settings;
@@ -112,6 +108,10 @@ public final class ReactorNettyClient implements Client {
112108

113109
private final Scheduler scheduler;
114110

111+
private final Supplier<PostgresConnectionClosedException> unexpected;
112+
113+
private final Supplier<PostgresConnectionClosedException> expected;
114+
115115
private ConnectionContext context;
116116

117117
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
@@ -149,12 +149,16 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
149149
Assert.requireNonNull(connection, "Connection must not be null");
150150
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");
151151

152+
152153
connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
153154
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
154155
this.connection = connection;
155156
this.byteBufAllocator = connection.outbound().alloc();
156157

157158
ConnectionContext connectionContext = new ConnectionContext().withChannelId(connection.channel().toString());
159+
this.unexpected = () -> new PostgresConnectionClosedException(this.context.getMessage("Connection unexpectedly closed"));
160+
this.expected = () -> new PostgresConnectionClosedException(this.context.getMessage("Connection closed"));
161+
158162
SslHandler sslHandler = this.connection.channel().pipeline().get(SslHandler.class);
159163

160164
if (sslHandler == null) {
@@ -211,7 +215,7 @@ public Mono<Void> close() {
211215

212216
this.notificationProcessor.tryEmitComplete();
213217

214-
drainError(EXPECTED);
218+
drainError(expected);
215219

216220
boolean connected = isConnected();
217221
if (this.isClosed.compareAndSet(false, true)) {
@@ -536,9 +540,9 @@ private static String toString(List<Field> fields) {
536540

537541
private void handleClose() {
538542
if (this.isClosed.compareAndSet(false, true)) {
539-
drainError(UNEXPECTED);
543+
drainError(unexpected);
540544
} else {
541-
drainError(EXPECTED);
545+
drainError(expected);
542546
}
543547
}
544548

@@ -548,7 +552,7 @@ private void handleConnectionError(Throwable error) {
548552
drainError(() -> this.messageSubscriber.createClientClosedException(error));
549553
}
550554

551-
drainError(() -> new PostgresConnectionException(error));
555+
drainError(() -> new PostgresConnectionException(this.context, error));
552556
}
553557

554558
private void drainError(Supplier<? extends Throwable> supplier) {
@@ -606,8 +610,8 @@ static class PostgresConnectionException extends R2dbcNonTransientResourceExcept
606610

607611
private final static ErrorDetails ERROR_DETAILS = ErrorDetails.fromCodeAndMessage(CONNECTION_FAILURE, "An I/O error occurred while sending to the backend or receiving from the backend");
608612

609-
public PostgresConnectionException(Throwable cause) {
610-
super(ERROR_DETAILS.getMessage(), ERROR_DETAILS.getCode(), 0, null, cause);
613+
public PostgresConnectionException(ConnectionContext context, Throwable cause) {
614+
super(context.getMessage(ERROR_DETAILS.getMessage()), ERROR_DETAILS.getCode(), 0, null, cause);
611615
}
612616

613617
@Override
@@ -789,7 +793,7 @@ PostgresConnectionClosedException createClientClosedException() {
789793
}
790794

791795
PostgresConnectionClosedException createClientClosedException(@Nullable Throwable cause) {
792-
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", cause);
796+
return new PostgresConnectionClosedException(ReactorNettyClient.this.context.getMessage("Cannot exchange messages because the connection is closed"), cause);
793797
}
794798

795799
/**

src/test/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
</appender>
2525

2626
<logger name="io.r2dbc.postgresql" level="INFO"/>
27+
<logger name="io.r2dbc.postgresql.client.ConnectionContext" level="INFO"/>
2728
<logger name="org.testcontainers" level="INFO"/>
2829
<logger name="reactor.netty" level="WARN"/>
2930
<logger name="stream" level="INFO"/>

0 commit comments

Comments
 (0)