Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,9 @@ public Flusher(Configuration configuration)
}

@Override
protected void forwardFrame(Frame frame, Callback callback, boolean batch)
protected void forwardFrame(OutgoingEntry entry)
{
OutgoingEntry currentEntry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
negotiated.getExtensions().sendFrame(currentEntry);
negotiated.getExtensions().sendFrame(entry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.eclipse.jetty.websocket.core.OutgoingEntry;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.util.DemandChain;
import org.eclipse.jetty.websocket.core.util.DemandingFlusher;
import org.eclipse.jetty.websocket.core.util.FragmentingFlusher;
import org.eclipse.jetty.websocket.core.util.WebSocketDemander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,26 +37,21 @@ public class FragmentExtension extends AbstractExtension implements DemandChain
private static final Logger LOG = LoggerFactory.getLogger(FragmentExtension.class);

private final FragmentingFlusher outgoingFlusher;
private final DemandingFlusher incomingFlusher;
private final WebSocketDemander incomingFlusher;
private final Configuration configuration = new Configuration.ConfigurationCustomizer();

public FragmentExtension()
{
outgoingFlusher = new FragmentingFlusher(configuration)
{
@Override
protected void forwardFrame(Frame frame, Callback callback, boolean batch)
protected void forwardFrame(OutgoingEntry entry)
{
OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
nextOutgoingFrame(entry);
}
};

incomingFlusher = new FragmentingDemandingFlusher();
incomingFlusher = new FragmentingDemander();
}

@Override
Expand Down Expand Up @@ -97,50 +92,53 @@ public void init(ExtensionConfig config, WebSocketComponents components)
configuration.setMaxFrameSize(maxLength);
}

public class FragmentingDemandingFlusher extends DemandingFlusher
public class FragmentingDemander extends WebSocketDemander
{
public FragmentingDemandingFlusher()
private ByteBuffer _payload;

public FragmentingDemander()
{
super(FragmentExtension.this::nextIncomingFrame);
}

@Override
protected boolean handle(Frame frame, Callback callback, boolean first)
{
long maxFrameSize = configuration.getMaxFrameSize();
if (first)
{
if (frame.isControlFrame())
if (frame.isControlFrame() || maxFrameSize <= 0 || frame.getPayloadLength() <= maxFrameSize)
{
emitFrame(frame, callback);
return true;
}

_payload = frame.getPayload();
}

ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
long maxFrameSize = configuration.getMaxFrameSize();
int remaining = _payload.remaining();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);

boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first;
Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode());
byte opCode = (frame.getOpCode() == OpCode.CONTINUATION || !first) ? OpCode.CONTINUATION : frame.getOpCode();
Frame fragment = new Frame(opCode);
boolean finished = (maxFrameSize <= 0 || remaining <= maxFrameSize);
fragment.setFin(frame.isFin() && finished);

if (finished)
{
// If finished we don't need to fragment, forward original payload.
fragment.setPayload(payload);
fragment.setPayload(_payload);
_payload = null;
}
else
{
// Slice the fragmented payload from the buffer.
int limit = payload.limit();
int newLimit = payload.position() + fragmentSize;
payload.limit(newLimit);
ByteBuffer payloadFragment = payload.slice();
payload.limit(limit);
int limit = _payload.limit();
int newLimit = _payload.position() + fragmentSize;
_payload.limit(newLimit);
ByteBuffer payloadFragment = _payload.slice();
_payload.limit(limit);
fragment.setPayload(payloadFragment);
payload.position(newLimit);
_payload.position(newLimit);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
}
Expand All @@ -159,5 +157,12 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
emitFrame(fragment, payloadCallback);
return finished;
}

@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
_payload = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected Iterator<FlusherEntry> iterator()
protected void iterate()
{
// We need to acquire the lock before we can iterate over the queue and entries.
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
super.iterate();
}
Expand Down Expand Up @@ -147,7 +147,7 @@ public boolean enqueue(OutgoingEntry outgoingEntry)
List<FlusherEntry> failedEntries = null;
CloseStatus closeStatus = null;

try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (!_canEnqueue || _closedCause != null)
{
Expand Down Expand Up @@ -222,7 +222,7 @@ public boolean enqueue(OutgoingEntry outgoingEntry)

public void onClose(Throwable cause)
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
_closedCause = cause == null ? new ClosedChannelException() : cause;
}
Expand All @@ -236,7 +236,7 @@ protected Action process() throws Throwable
LOG.debug("Flushing {}", this);

boolean flush = false;
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_closedCause != null)
throw _closedCause;
Expand Down Expand Up @@ -369,7 +369,7 @@ private RetainableByteBuffer acquireBuffer(int capacity)

private int getQueueSize()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
return _queue.size();
}
Expand All @@ -378,7 +378,7 @@ private int getQueueSize()
@Override
protected void onSuccess()
{
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
assert _completedEntries.isEmpty();
_completedEntries.addAll(_currentEntries);
Expand All @@ -404,7 +404,7 @@ public void onCompleteFailure(Throwable failure)
if (_batchBuffer != null)
_batchBuffer.clear();
releaseAggregateIfEmpty();
try (AutoLock l = _lock.lock())
try (AutoLock ignored = _lock.lock())
{
// Ensure no more entries can be enqueued.
_canEnqueue = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.util.DemandChain;
import org.eclipse.jetty.websocket.core.util.DemandingFlusher;
import org.eclipse.jetty.websocket.core.util.TransformingFlusher;
import org.eclipse.jetty.websocket.core.util.WebSocketDemander;
import org.eclipse.jetty.websocket.core.util.WebSocketFlusher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,10 +55,9 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
private static final int DEFAULT_BUF_SIZE = 8 * 1024;

private final OutgoingFlusher outgoingFlusher;
private final IncomingFlusher incomingFlusher;
private final IncomingDemander incomingFlusher;
private DeflaterPool.Entry deflaterHolder;
private InflaterPool.Entry inflaterHolder;
private boolean incomingCompressed;

private ExtensionConfig configRequested;
private ExtensionConfig configNegotiated;
Expand All @@ -70,7 +69,7 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
public PerMessageDeflateExtension()
{
outgoingFlusher = new OutgoingFlusher();
incomingFlusher = new IncomingFlusher();
incomingFlusher = new IncomingDemander();
}

@Override
Expand Down Expand Up @@ -264,57 +263,37 @@ public void demand()
incomingFlusher.demand();
}

private class OutgoingFlusher extends TransformingFlusher
private class OutgoingFlusher extends WebSocketFlusher
{
private boolean _first;
private Frame _frame;

@Override
protected boolean onFrame(Frame frame, Callback callback, boolean batch)
protected boolean onFrame(OutgoingEntry entry, boolean first)
{
if (frame.isControlFrame())
if (first)
{
OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
nextOutgoingFrame(entry);
return true;
}

_first = true;
_frame = frame;

// Provide the frames payload as input to the Deflater.
getDeflater().setInput(frame.getPayload().slice());
callback.succeeded();
return false;
}
if (entry.getFrame().isControlFrame())
{
nextOutgoingFrame(entry);
return true;
}

@Override
protected boolean transform(Callback callback)
{
boolean finished = deflate(callback);
_first = false;
// Provide the frames payload as input to the Deflater.
getDeflater().setInput(entry.getFrame().getPayload().slice());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need slicing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do because currently the contract is that we do not consume the frame payload on a sendFrame (see #13290).

}

boolean finished = deflate(entry, first);
if (finished)
{
_frame = null;
getDeflater().setInput(BufferUtil.EMPTY_BUFFER);
}

return finished;
}

private boolean deflate(Callback callback)
private boolean deflate(OutgoingEntry entry, boolean first)
{
// Get a buffer for the deflated payload.
long maxFrameSize = getConfiguration().getMaxFrameSize();
int bufferSize = (maxFrameSize <= 0) ? deflateBufferSize : (int)Math.min(maxFrameSize, deflateBufferSize);
RetainableByteBuffer buffer = getByteBufferPool().acquire(bufferSize, false);
ByteBuffer byteBuffer = buffer.getByteBuffer();
callback = Callback.from(callback, buffer::release);
Callback callback = Callback.from(entry.getCallback(), buffer::release);

// Fill up the buffer with a max length of bufferSize;
boolean finished = false;
Expand Down Expand Up @@ -342,47 +321,48 @@ private boolean deflate(Callback callback)
}
}

Frame frame = entry.getFrame();
ByteBuffer payload = byteBuffer;
if (payload.hasRemaining())
{
// Handle tail bytes generated by SYNC_FLUSH.
if (finished && _frame.isFin() && endsWithTail(payload))
if (finished && frame.isFin() && endsWithTail(payload))
{
payload.limit(payload.limit() - TAIL_BYTES.length);
if (LOG.isDebugEnabled())
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}", BufferUtil.toDetailString(payload));
}
}
else if (_frame.isFin())
else if (frame.isFin())
{
// Special case: 7.2.3.6. Generating an Empty Fragment Manually
// https://tools.ietf.org/html/rfc7692#section-7.2.3.6
payload = ByteBuffer.wrap(new byte[]{0x00});
}

if (LOG.isDebugEnabled())
LOG.debug("Compressed {}: payload:{}", _frame, payload.remaining());
LOG.debug("Compressed {}: payload:{}", frame, payload.remaining());

Frame chunk = new Frame(_first ? _frame.getOpCode() : OpCode.CONTINUATION);
chunk.setRsv1(_first && _frame.getOpCode() != OpCode.CONTINUATION);
Frame chunk = new Frame(first ? frame.getOpCode() : OpCode.CONTINUATION);
chunk.setRsv1(first && frame.getOpCode() != OpCode.CONTINUATION);
chunk.setPayload(payload);
chunk.setFin(_frame.isFin() && finished);
chunk.setFin(frame.isFin() && finished);

OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
nextOutgoingFrame(new OutgoingEntry.Builder(entry)
.frame(chunk)
.callback(callback)
.build();
nextOutgoingFrame(entry);
.build());
return finished;
}
}

private class IncomingFlusher extends DemandingFlusher
private class IncomingDemander extends WebSocketDemander
{
private boolean _tailBytes;
private boolean _incomingCompressed;
private AtomicReference<RetainableByteBuffer> _payloadRef;

public IncomingFlusher()
public IncomingDemander()
{
super(PerMessageDeflateExtension.this::nextIncomingFrame);
}
Expand All @@ -404,7 +384,7 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
{
case OpCode.TEXT:
case OpCode.BINARY:
incomingCompressed = frame.isRsv1();
_incomingCompressed = frame.isRsv1();
break;

case OpCode.CONTINUATION:
Expand All @@ -416,7 +396,7 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
break;
}

if (!incomingCompressed)
if (!_incomingCompressed)
{
emitFrame(frame, callback);
return true;
Expand Down Expand Up @@ -513,6 +493,7 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da
@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
releasePayload(_payloadRef);
}

Expand Down
Loading