Skip to content

Conversation

@lachlan-roberts
Copy link
Contributor

closes #13043

  • rename TransformingFlusher to WebSocketFlusher and update it to use an OutgoingEntry API instead of frame + callback + batch.
  • add method to override to handle onCompleteFailure on WebSocketFlusher
  • WebSocketFlusher now wraps the callback needed to continue processing in the OutgoingEntry itself.
  • Rename DemandingFlusher to WebSocketDemander, and use a lock instead of multiple atomic variables for the implementation.

@lachlan-roberts lachlan-roberts requested review from gregw and sbordet June 18, 2025 04:23
@lachlan-roberts lachlan-roberts self-assigned this Jun 18, 2025
@joakime joakime linked an issue Jun 18, 2025 that may be closed by this pull request
@sbordet sbordet moved this to 👀 In review in Jetty 12.1.0 - (FROZEN) Jun 18, 2025
Copy link
Contributor

@gregw gregw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good, but I think some comments are wrong with their upstream/downstream usage? which way the stream flows needs to be made clear in comments.

Then I'm not sure mixing in "Next" is a good idea. Isn't that just "Downstream"?

Signed-off-by: Lachlan Roberts <[email protected]>
@lachlan-roberts lachlan-roberts requested a review from gregw June 23, 2025 05:25
boolean finished = deflate(callback);
_first = false;
// Provide the frames payload as input to the Deflater.
getDeflater().setInput(entry.getFrame().getPayload().slice());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need slicing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do because currently the contract is that we do not consume the frame payload on a sendFrame (see #13290).

Comment on lines 164 to 172
if (failure == null)
{
succeeded();
}
else
{
callback.failed(failure);
failFlusher(failure);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels wrong, because if no failure, the whole IteratingCallback is succeeded, but not the callback parameter, and if there is a failure, the callback parameter is failed, but not the IteratingCallback (which is instead aborted).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't succeed the callback parameter because we are not done with the frames payload, but if we have a non-null failure we need to fail it because it wasn't added to the state and so won't be failed later on.

I don't think we want to just fail the iterating callback. It is really a terminal failure so we want to ensure that the Demander is aborted. Looking at ICB.failed there are cases where it just turns into a NOOP, for example if it has already been completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment to the call to succeeded(), something like:

// Succeed this IteratingCallback, since we are not done with the frame's payload.

* @param first true if this is the first time this entry is being processed.
* @return true to indicate that you have finished transforming this entry.
*/
protected abstract boolean onFrame(OutgoingEntry entry, boolean first);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the entry transformation be asynchronous, so that this should rather be:

Suggested change
protected abstract boolean onFrame(OutgoingEntry entry, boolean first);
protected abstract void onFrame(OutgoingEntry entry, boolean first, Promise<Boolean> promise);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a need for this.

And I don't see how this would work with the Flusher.process implementation.

}
catch (Throwable x)
{
log.warn("Exception while notifying success of entry {}", entry, x);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use LOG.info() like in the other cases of exceptions while notifying listeners.

}
catch (Throwable x)
{
log.warn("Exception while notifying failure of entry {}", entry, x);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.info()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug or warn. info is for more expected stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug seems a bit light for this error.

Signed-off-by: Lachlan Roberts <[email protected]>
Copy link
Contributor

@sbordet sbordet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just the nit of adding a comment.

@lachlan-roberts lachlan-roberts merged commit 19971c7 into jetty-12.1.x Jun 26, 2025
10 checks passed
@lachlan-roberts lachlan-roberts deleted the jetty-12.1.x-WebSocketFlushers branch June 26, 2025 02:28
@github-project-automation github-project-automation bot moved this from 👀 In review to ✅ Done in Jetty 12.1.0 - (FROZEN) Jun 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

No open projects
Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

Review WebSocket flushers

4 participants