Skip to content

Commit 3ffeec9

Browse files
committed
Merge pull request #1123 from violetagg/violetagg-flush-last-element
2 parents fffea06 + d219054 commit 3ffeec9

File tree

6 files changed

+249
-140
lines changed

6 files changed

+249
-140
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,59 +26,47 @@
2626
import org.springframework.core.io.buffer.DataBufferFactory;
2727

2828
/**
29-
* Abstract base class for listener-based server responses, i.e. Servlet 3.1 and Undertow.
29+
* Abstract base class for listener-based server responses, e.g. Servlet 3.1
30+
* and Undertow.
31+
*
3032
* @author Arjen Poutsma
3133
* @since 5.0
3234
*/
3335
public abstract class AbstractListenerServerHttpResponse extends AbstractServerHttpResponse {
3436

3537
private final AtomicBoolean writeCalled = new AtomicBoolean();
3638

39+
3740
public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) {
3841
super(dataBufferFactory);
3942
}
4043

44+
4145
@Override
4246
protected final Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
43-
if (this.writeCalled.compareAndSet(false, true)) {
44-
Processor<DataBuffer, Void> bodyProcessor = createBodyProcessor();
45-
return Mono.from(subscriber -> {
46-
body.subscribe(bodyProcessor);
47-
bodyProcessor.subscribe(subscriber);
48-
});
49-
50-
} else {
51-
return Mono.error(new IllegalStateException(
52-
"writeWith() or writeAndFlushWith() has already been called"));
53-
}
47+
return writeAndFlushWithInternal(Mono.just(body));
5448
}
5549

5650
@Override
5751
protected final Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body) {
5852
if (this.writeCalled.compareAndSet(false, true)) {
59-
Processor<Publisher<DataBuffer>, Void> bodyProcessor =
60-
createBodyFlushProcessor();
53+
Processor<Publisher<DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
6154
return Mono.from(subscriber -> {
6255
body.subscribe(bodyProcessor);
6356
bodyProcessor.subscribe(subscriber);
6457
});
65-
} else {
58+
}
59+
else {
6660
return Mono.error(new IllegalStateException(
6761
"writeWith() or writeAndFlushWith() has already been called"));
6862
}
6963
}
7064

71-
/**
72-
* Abstract template method to create a {@code Processor<DataBuffer, Void>} that
73-
* will write the response body to the underlying output. Called from
74-
* {@link #writeWithInternal(Publisher)}.
75-
*/
76-
protected abstract Processor<DataBuffer, Void> createBodyProcessor();
77-
7865
/**
7966
* Abstract template method to create a {@code Processor<Publisher<DataBuffer>, Void>}
8067
* that will write the response body with flushes to the underlying output. Called from
8168
* {@link #writeAndFlushWithInternal(Publisher)}.
8269
*/
8370
protected abstract Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor();
71+
8472
}

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,25 @@
3535
* Servlet 3.1 and Undertow support.
3636
*
3737
* @author Arjen Poutsma
38+
* @author Violeta Georgieva
3839
* @since 5.0
3940
* @see ServletServerHttpRequest
4041
* @see UndertowHttpHandlerAdapter
4142
* @see ServerHttpResponse#writeAndFlushWith(Publisher)
4243
*/
43-
abstract class AbstractResponseBodyFlushProcessor
44-
implements Processor<Publisher<DataBuffer>, Void> {
44+
abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher<DataBuffer>, Void> {
4545

4646
protected final Log logger = LogFactory.getLog(getClass());
4747

48-
private final ResponseBodyWriteResultPublisher publisherDelegate =
49-
new ResponseBodyWriteResultPublisher();
48+
private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher();
5049

51-
private final AtomicReference<State> state =
52-
new AtomicReference<>(State.UNSUBSCRIBED);
50+
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
5351

5452
private volatile boolean subscriberCompleted;
5553

5654
private Subscription subscription;
5755

56+
5857
// Subscriber
5958

6059
@Override
@@ -89,13 +88,15 @@ public final void onComplete() {
8988
this.state.get().onComplete(this);
9089
}
9190

91+
9292
// Publisher
9393

9494
@Override
9595
public final void subscribe(Subscriber<? super Void> subscriber) {
96-
this.publisherDelegate.subscribe(subscriber);
96+
this.resultPublisher.subscribe(subscriber);
9797
}
9898

99+
99100
/**
100101
* Creates a new processor for subscribing to a body chunk.
101102
*/
@@ -106,6 +107,11 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
106107
*/
107108
protected abstract void flush() throws IOException;
108109

110+
111+
private boolean changeState(State oldState, State newState) {
112+
return this.state.compareAndSet(oldState, newState);
113+
}
114+
109115
private void writeComplete() {
110116
if (logger.isTraceEnabled()) {
111117
logger.trace(this.state + " writeComplete");
@@ -114,104 +120,117 @@ private void writeComplete() {
114120

115121
}
116122

117-
private boolean changeState(State oldState, State newState) {
118-
return this.state.compareAndSet(oldState, newState);
123+
private void cancel() {
124+
this.subscription.cancel();
119125
}
120126

127+
121128
private enum State {
129+
122130
UNSUBSCRIBED {
131+
123132
@Override
124-
public void onSubscribe(AbstractResponseBodyFlushProcessor processor,
125-
Subscription subscription) {
133+
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
126134
Objects.requireNonNull(subscription, "Subscription cannot be null");
127-
if (processor.changeState(this, SUBSCRIBED)) {
135+
if (processor.changeState(this, REQUESTED)) {
128136
processor.subscription = subscription;
129137
subscription.request(1);
130138
}
131139
else {
132140
super.onSubscribe(processor, subscription);
133141
}
134142
}
135-
}, SUBSCRIBED {
143+
},
144+
REQUESTED {
145+
136146
@Override
137-
public void onNext(AbstractResponseBodyFlushProcessor processor,
138-
Publisher<DataBuffer> chunk) {
139-
Processor<DataBuffer, Void> chunkProcessor =
140-
processor.createBodyProcessor();
141-
chunk.subscribe(chunkProcessor);
142-
chunkProcessor.subscribe(new WriteSubscriber(processor));
147+
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> chunk) {
148+
if (processor.changeState(this, RECEIVED)) {
149+
Processor<DataBuffer, Void> chunkProcessor = processor.createBodyProcessor();
150+
chunk.subscribe(chunkProcessor);
151+
chunkProcessor.subscribe(new WriteSubscriber(processor));
152+
}
143153
}
144154

145155
@Override
146-
void onComplete(AbstractResponseBodyFlushProcessor processor) {
147-
processor.subscriberCompleted = true;
156+
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
157+
if (processor.changeState(this, COMPLETED)) {
158+
processor.resultPublisher.publishComplete();
159+
}
148160
}
161+
},
162+
RECEIVED {
149163

150164
@Override
151165
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
166+
try {
167+
processor.flush();
168+
}
169+
catch (IOException ex) {
170+
processor.cancel();
171+
processor.onError(ex);
172+
}
173+
152174
if (processor.subscriberCompleted) {
153175
if (processor.changeState(this, COMPLETED)) {
154-
processor.publisherDelegate.publishComplete();
176+
processor.resultPublisher.publishComplete();
155177
}
156178
}
157179
else {
158-
try {
159-
processor.flush();
160-
}
161-
catch (IOException ex) {
162-
processor.onError(ex);
180+
if (processor.changeState(this, REQUESTED)) {
181+
processor.subscription.request(1);
163182
}
164-
processor.subscription.request(1);
165183
}
166184
}
167-
}, COMPLETED {
168-
@Override
169-
public void onNext(AbstractResponseBodyFlushProcessor processor,
170-
Publisher<DataBuffer> publisher) {
171-
// ignore
172185

186+
@Override
187+
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
188+
processor.subscriberCompleted = true;
173189
}
190+
},
191+
COMPLETED {
174192

175193
@Override
176-
void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
194+
public void onNext(AbstractResponseBodyFlushProcessor processor,
195+
Publisher<DataBuffer> publisher) {
177196
// ignore
197+
178198
}
179199

180200
@Override
181-
void onComplete(AbstractResponseBodyFlushProcessor processor) {
201+
public void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
182202
// ignore
183203
}
184204

185205
@Override
186-
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
206+
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
187207
// ignore
188208
}
189209
};
190210

191-
public void onSubscribe(AbstractResponseBodyFlushProcessor processor,
192-
Subscription subscription) {
211+
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
193212
subscription.cancel();
194213
}
195214

196-
public void onNext(AbstractResponseBodyFlushProcessor processor,
197-
Publisher<DataBuffer> publisher) {
215+
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> publisher) {
198216
throw new IllegalStateException(toString());
199217
}
200218

201-
void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
219+
public void onError(AbstractResponseBodyFlushProcessor processor, Throwable ex) {
202220
if (processor.changeState(this, COMPLETED)) {
203-
processor.publisherDelegate.publishError(t);
221+
processor.resultPublisher.publishError(ex);
204222
}
205223
}
206224

207-
void onComplete(AbstractResponseBodyFlushProcessor processor) {
225+
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
208226
throw new IllegalStateException(toString());
209227
}
210228

211229
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
212-
throw new IllegalStateException(toString());
230+
// ignore
213231
}
214232

233+
215234
private static class WriteSubscriber implements Subscriber<Void> {
216235

217236
private final AbstractResponseBodyFlushProcessor processor;
@@ -221,22 +240,23 @@ public WriteSubscriber(AbstractResponseBodyFlushProcessor processor) {
221240
}
222241

223242
@Override
224-
public void onSubscribe(Subscription s) {
225-
s.request(Long.MAX_VALUE);
243+
public void onSubscribe(Subscription subscription) {
244+
subscription.request(Long.MAX_VALUE);
226245
}
227246

228247
@Override
229248
public void onNext(Void aVoid) {
230249
}
231250

232251
@Override
233-
public void onError(Throwable t) {
234-
processor.onError(t);
252+
public void onError(Throwable ex) {
253+
this.processor.cancel();
254+
this.processor.onError(ex);
235255
}
236256

237257
@Override
238258
public void onComplete() {
239-
processor.writeComplete();
259+
this.processor.writeComplete();
240260
}
241261
}
242262
}

0 commit comments

Comments
 (0)