@@ -223,6 +223,8 @@ static class AggregateSubscriber extends BaseSubscriber<String> {
223223 */
224224 private ResponseInfo responseInfo ;
225225
226+ volatile boolean hasRequestedDemand = false ;
227+
226228 /**
227229 * Creates a new JsonLineSubscriber that will emit parsed JSON-RPC messages.
228230 * @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects
@@ -236,7 +238,13 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> si
236238
237239 @ Override
238240 protected void hookOnSubscribe (Subscription subscription ) {
239- sink .onRequest (subscription ::request );
241+
242+ sink .onRequest (n -> {
243+ if (!hasRequestedDemand ) {
244+ subscription .request (Long .MAX_VALUE );
245+ }
246+ hasRequestedDemand = true ;
247+ });
240248
241249 // Register disposal callback to cancel subscription when Flux is disposed
242250 sink .onDispose (subscription ::cancel );
@@ -249,8 +257,11 @@ protected void hookOnNext(String line) {
249257
250258 @ Override
251259 protected void hookOnComplete () {
252- String data = this .eventBuilder .toString ();
253- this .sink .next (new AggregateResponseEvent (responseInfo , data ));
260+
261+ if (hasRequestedDemand ) {
262+ String data = this .eventBuilder .toString ();
263+ this .sink .next (new AggregateResponseEvent (responseInfo , data ));
264+ }
254265
255266 this .sink .complete ();
256267 }
@@ -271,6 +282,8 @@ static class BodilessResponseLineSubscriber extends BaseSubscriber<String> {
271282
272283 private final ResponseInfo responseInfo ;
273284
285+ volatile boolean hasRequestedDemand = false ;
286+
274287 public BodilessResponseLineSubscriber (ResponseInfo responseInfo , FluxSink <ResponseEvent > sink ) {
275288 this .sink = sink ;
276289 this .responseInfo = responseInfo ;
@@ -280,7 +293,10 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink<Respon
280293 protected void hookOnSubscribe (Subscription subscription ) {
281294
282295 sink .onRequest (n -> {
283- subscription .request (n );
296+ if (!hasRequestedDemand ) {
297+ subscription .request (Long .MAX_VALUE );
298+ }
299+ hasRequestedDemand = true ;
284300 });
285301
286302 // Register disposal callback to cancel subscription when Flux is disposed
@@ -291,11 +307,13 @@ protected void hookOnSubscribe(Subscription subscription) {
291307
292308 @ Override
293309 protected void hookOnComplete () {
294- // emit dummy event to be able to inspect the response info
295- // this is a shortcut allowing for a more streamlined processing using
296- // operator composition instead of having to deal with the CompletableFuture
297- // along the Subscriber for inspecting the result
298- this .sink .next (new DummyEvent (responseInfo ));
310+ if (hasRequestedDemand ) {
311+ // emit dummy event to be able to inspect the response info
312+ // this is a shortcut allowing for a more streamlined processing using
313+ // operator composition instead of having to deal with the
314+ // CompletableFuture along the Subscriber for inspecting the result
315+ this .sink .next (new DummyEvent (responseInfo ));
316+ }
299317 this .sink .complete ();
300318 }
301319
0 commit comments