Skip to content

Commit 8359201

Browse files
committed
Improve write completion handling in ChannelSendOperator
Avoid re-using the WriteBarrier as the Subscription to both the completionSubscriber and the writeSubscriber. Instead DownstreamBridge is now called WriteCompletionBarrier and is both a Subscriber and Subscription, i.e. handles all signals to and from the completion Subscriber. This frees the request method implementation in WriteBarrier to assume it can only be called by the writeSubscriber and hence it's safe to pass on request signals to the write source outside the synchronized block.
1 parent 6329ccb commit 8359201

File tree

1 file changed

+73
-24
lines changed

1 file changed

+73
-24
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,30 @@ public Object scanUnsafe(Attr key) {
7373

7474
@Override
7575
public void subscribe(CoreSubscriber<? super Void> actual) {
76-
this.source.subscribe(new WriteWithBarrier(actual));
76+
this.source.subscribe(new WriteBarrier(actual));
7777
}
7878

7979

80+
/**
81+
* A barrier between the write source and the write subscriber (i.e. the
82+
* HTTP server adapter) that pre-fetches and waits for the first signal
83+
* before deciding whether to hook in to the write subscriber.
84+
*
85+
* <p>Acts as:
86+
* <ul>
87+
* <li>Subscriber to the write source.
88+
* <li>Subscription to the write subscriber.
89+
* <li>Publisher ot the write subscriber.
90+
* </ul>
91+
*
92+
* <p>Also uses {@link WriteCompletionBarrier} for delegating signals to
93+
* and from the write completion subscriber.
94+
*/
8095
@SuppressWarnings("deprecation")
81-
private final class WriteWithBarrier
82-
implements Publisher<T>, CoreSubscriber<T>, Subscription {
96+
private final class WriteBarrier implements CoreSubscriber<T>, Subscription, Publisher<T> {
8397

84-
/* Downstream write completion subscriber */
85-
private final CoreSubscriber<? super Void> completionSubscriber;
98+
/* Bridges signals to and from the completionSubscriber */
99+
private final WriteCompletionBarrier writeCompletionBarrier;
86100

87101
/* Upstream write source subscription */
88102
@Nullable
@@ -109,13 +123,13 @@ private final class WriteWithBarrier
109123
/** Cached 1st/2nd signal before readyToWrite */
110124
private boolean completed = false;
111125

112-
/** The actual writeSubscriber vs the downstream completion subscriber */
126+
/** The actual writeSubscriber from the HTTP server adapter */
113127
@Nullable
114128
private Subscriber<? super T> writeSubscriber;
115129

116130

117-
WriteWithBarrier(CoreSubscriber<? super Void> completionSubscriber) {
118-
this.completionSubscriber = completionSubscriber;
131+
WriteBarrier(CoreSubscriber<? super Void> completionSubscriber) {
132+
this.writeCompletionBarrier = new WriteCompletionBarrier(completionSubscriber, this);
119133
}
120134

121135

@@ -125,7 +139,7 @@ private final class WriteWithBarrier
125139
public final void onSubscribe(Subscription s) {
126140
if (Operators.validate(this.subscription, s)) {
127141
this.subscription = s;
128-
this.completionSubscriber.onSubscribe(this);
142+
this.writeCompletionBarrier.connect();
129143
s.request(1);
130144
}
131145
}
@@ -144,13 +158,13 @@ public final void onNext(T item) {
144158
else if (this.beforeFirstEmission) {
145159
this.item = item;
146160
this.beforeFirstEmission = false;
147-
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
161+
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
148162
}
149163
else {
150164
if (this.subscription != null) {
151165
this.subscription.cancel();
152166
}
153-
this.completionSubscriber.onError(new IllegalStateException("Unexpected item."));
167+
this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
154168
}
155169
}
156170
}
@@ -172,7 +186,7 @@ public final void onError(Throwable ex) {
172186
}
173187
else if (this.beforeFirstEmission) {
174188
this.beforeFirstEmission = false;
175-
this.completionSubscriber.onError(ex);
189+
this.writeCompletionBarrier.onError(ex);
176190
}
177191
else {
178192
this.error = ex;
@@ -193,7 +207,7 @@ public final void onComplete() {
193207
else if (this.beforeFirstEmission) {
194208
this.completed = true;
195209
this.beforeFirstEmission = false;
196-
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
210+
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
197211
}
198212
else {
199213
this.completed = true;
@@ -203,11 +217,11 @@ else if (this.beforeFirstEmission) {
203217

204218
@Override
205219
public Context currentContext() {
206-
return this.completionSubscriber.currentContext();
220+
return this.writeCompletionBarrier.currentContext();
207221
}
208222

209223

210-
// Subscription methods (we're the subscription to completion~ and writeSubscriber)..
224+
// Subscription methods (we're the Subscription to the writeSubscriber)..
211225

212226
@Override
213227
public void request(long n) {
@@ -229,9 +243,9 @@ public void request(long n) {
229243
if (n == 0) {
230244
return;
231245
}
232-
s.request(n);
233246
}
234247
}
248+
s.request(n);
235249
}
236250

237251
private boolean emitCachedSignals() {
@@ -259,7 +273,7 @@ public void cancel() {
259273
}
260274

261275

262-
// Publisher<T> methods (we're the Publisher to the write subscriber)...
276+
// Publisher<T> methods (we're the Publisher to the writeSubscriber)..
263277

264278
@Override
265279
public void subscribe(Subscriber<? super T> writeSubscriber) {
@@ -278,14 +292,38 @@ public void subscribe(Subscriber<? super T> writeSubscriber) {
278292
}
279293

280294

281-
private class DownstreamBridge implements CoreSubscriber<Void> {
295+
/**
296+
* We need an extra barrier between the WriteBarrier and the actual
297+
* completion subscriber.
298+
*
299+
* <p>The actual completionSubscriber is subscribed immediately to the
300+
* WriteBarrier initially. Later after the first signal is received, we need
301+
* this wrapper to subscribe again, this time to the write function.
302+
*/
303+
private class WriteCompletionBarrier implements CoreSubscriber<Void>, Subscription {
304+
305+
/* Downstream write completion subscriber */
306+
private final CoreSubscriber<? super Void> completionSubscriber;
282307

283-
private final CoreSubscriber<? super Void> downstream;
308+
private final WriteBarrier writeBarrier;
284309

285-
public DownstreamBridge(CoreSubscriber<? super Void> downstream) {
286-
this.downstream = downstream;
310+
311+
public WriteCompletionBarrier(CoreSubscriber<? super Void> subscriber, WriteBarrier writeBarrier) {
312+
this.completionSubscriber = subscriber;
313+
this.writeBarrier = writeBarrier;
314+
}
315+
316+
317+
/**
318+
* Connect the underlying completion subscriber to this barrier in order
319+
* to track cancel signals and pass them on to the write barrier.
320+
*/
321+
public void connect() {
322+
this.completionSubscriber.onSubscribe(this);
287323
}
288324

325+
// Subscriber methods (we're the subscriber to the write function)..
326+
289327
@Override
290328
public void onSubscribe(Subscription subscription) {
291329
subscription.request(Long.MAX_VALUE);
@@ -297,17 +335,28 @@ public void onNext(Void aVoid) {
297335

298336
@Override
299337
public void onError(Throwable ex) {
300-
this.downstream.onError(ex);
338+
this.completionSubscriber.onError(ex);
301339
}
302340

303341
@Override
304342
public void onComplete() {
305-
this.downstream.onComplete();
343+
this.completionSubscriber.onComplete();
306344
}
307345

308346
@Override
309347
public Context currentContext() {
310-
return this.downstream.currentContext();
348+
return this.completionSubscriber.currentContext();
349+
}
350+
351+
352+
@Override
353+
public void request(long n) {
354+
// Ignore: we don't produce data
355+
}
356+
357+
@Override
358+
public void cancel() {
359+
this.writeBarrier.cancel();
311360
}
312361
}
313362

0 commit comments

Comments
 (0)