Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.eclipse.jetty.websocket.core;

import java.nio.ByteBuffer;

import org.eclipse.jetty.util.Callback;

/**
Expand All @@ -28,6 +30,8 @@ public interface OutgoingFrames
* <p>
* If you are implementing a mutation, you are obliged to handle
* the incoming Callback appropriately.
* <p>
* The {@link Frame}'s payload {@link ByteBuffer} is consumed by this call.
*
* @param frame the frame to eventually write to the network layer.
* @param callback the callback to notify when the frame is written.
Expand All @@ -40,8 +44,10 @@ default void sendFrame(Frame frame, Callback callback, boolean batch)

/**
* Send an {@link OutgoingEntry} containing a {@link Frame} and {@link Callback}.
* <p>
* The {@link Frame}'s payload {@link ByteBuffer} is consumed by this call.
*
* @param entry the frame to eventually write to the network layer.
*/
void sendFrame(OutgoingEntry entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public void init(ExtensionConfig config, WebSocketComponents components)

public class FragmentingDemander extends WebSocketDemander
{
private ByteBuffer _payload;

public FragmentingDemander()
{
super(FragmentExtension.this::nextIncomingFrame);
Expand All @@ -112,11 +110,10 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
emitFrame(frame, callback);
return true;
}

_payload = frame.getPayload();
}

int remaining = _payload.remaining();
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);
byte opCode = (frame.getOpCode() == OpCode.CONTINUATION || !first) ? OpCode.CONTINUATION : frame.getOpCode();
Frame fragment = new Frame(opCode);
Expand All @@ -126,19 +123,13 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
if (finished)
{
// If finished we don't need to fragment, forward original payload.
fragment.setPayload(_payload);
_payload = null;
fragment.setPayload(payload);
}
else
{
// Slice the fragmented payload from the buffer.
int limit = _payload.limit();
int newLimit = _payload.position() + fragmentSize;
_payload.limit(newLimit);
ByteBuffer payloadFragment = _payload.slice();
_payload.limit(limit);
fragment.setPayload(payloadFragment);
_payload.position(newLimit);
fragment.setPayload(payload.slice(payload.position(), fragmentSize));
payload.position(payload.position() + fragmentSize);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
}
Expand All @@ -157,12 +148,5 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
emitFrame(fragment, payloadCallback);
return finished;
}

@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
_payload = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ protected Action process() throws Throwable
_releasableBuffers.add(masked);
_generator.generatePayload(entry.getFrame(), payload);
}
buffers.add(payload.slice());
buffers.add(payload);
}

// Once we have added another buffer we cannot add to the batch buffer again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void generatePayload(Frame frame, ByteBuffer buffer)
if (frame.isMasked())
maskPayload(buffer, frame);
else
buffer.put(payload.slice());
buffer.put(payload);
BufferUtil.flipToFlush(buffer, pos);
}

Expand Down Expand Up @@ -198,6 +198,7 @@ private void maskPayload(ByteBuffer buffer, Frame frame)
++maskOffset;
}
}
payload.position(end);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ protected boolean onFrame(OutgoingEntry entry, boolean first)
}

// Provide the frames payload as input to the Deflater.
getDeflater().setInput(entry.getFrame().getPayload().slice());
getDeflater().setInput(entry.getFrame().getPayload());
}

boolean finished = deflate(entry, first);
Expand Down Expand Up @@ -404,7 +404,7 @@ protected boolean handle(Frame frame, Callback callback, boolean first)

// Provide the frames payload as input to the Inflater.
_tailBytes = false;
getInflater().setInput(frame.getPayload().slice());
getInflater().setInput(frame.getPayload());
}

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,6 @@ private void flush(boolean fin) throws IOException
// Any flush after the first will be a CONTINUATION frame.
bytesSent += initialBufferSize;
++frameCount;

// Buffer has been sent, but buffer should not have been consumed.
try
{
assert buffer.remaining() == initialBufferSize;
buffer.clear();
}
catch (Throwable t)
{
t.printStackTrace();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public abstract class FragmentingFlusher extends WebSocketFlusher
{
private static final Logger LOG = LoggerFactory.getLogger(FragmentingFlusher.class);
private final Configuration _configuration;
private ByteBuffer _payload;

public FragmentingFlusher(Configuration configuration)
{
Expand All @@ -52,11 +51,10 @@ protected boolean onFrame(OutgoingEntry entry, boolean first)
forwardFrame(entry);
return true;
}

_payload = frame.getPayload().slice();
}

int remaining = _payload.remaining();
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);
byte opCode = (frame.getOpCode() == OpCode.CONTINUATION || !first) ? OpCode.CONTINUATION : frame.getOpCode();
Frame fragment = new Frame(opCode);
Expand All @@ -66,31 +64,18 @@ protected boolean onFrame(OutgoingEntry entry, boolean first)
// If we don't need to fragment just forward with original payload.
if (finished)
{
fragment.setPayload(_payload);
_payload = null;
fragment.setPayload(payload);
}
else
{
// Slice the fragmented payload from the buffer.
int limit = _payload.limit();
int newLimit = _payload.position() + fragmentSize;
_payload.limit(newLimit);
ByteBuffer payloadFragment = _payload.slice();
_payload.limit(limit);
fragment.setPayload(payloadFragment);
_payload.position(newLimit);
fragment.setPayload(payload.slice(payload.position(), fragmentSize));
payload.position(payload.position() + fragmentSize);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
}

forwardFrame(new OutgoingEntry.Builder(entry).frame(fragment).build());
return finished;
}

@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
_payload = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception
assertThat(frame.isFin(), is(true));

// Original frame payload should not have been changed.
assertThat(sentFrame.getPayload(), is(message));
assertThat(sentFrame.getPayload().remaining(), is(0));

clientHandler.sendClose();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
Expand Down Expand Up @@ -309,9 +310,12 @@ public void testOutgoingAutoFragmentWithPermessageDeflate() throws Exception
frame = clientHandler.receivedFrames.poll(1, TimeUnit.SECONDS);
}

// Content of the sentPayload was fully consumed.
assertThat(sendPayload.remaining(), equalTo(0));

// We received correct payload in 2 frames.
assertThat(message, is(payload));
assertThat(message, is(sendPayload));
assertThat(sendPayload.remaining(), is(0));
assertThat(numFrames, is(2));

clientHandler.sendClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -72,36 +71,7 @@ public void testSingleFrame() throws Exception

assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayload(), is(message));
assertThat(sendPayload, is(message));

clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertNull(clientHandler.getError());
}

@Test
public void testSendSameFrameMultipleTimes() throws Exception
{
TestFrameHandler clientHandler = new TestFrameHandler();
client.connect(clientHandler, server.getUri()).get(5, TimeUnit.SECONDS);
serverHandler.open.await(5, TimeUnit.SECONDS);
clientHandler.getCoreSession().setAutoFragment(false);
serverHandler.getCoreSession().setAutoFragment(false);

int payloadLen = 32 * 1024;
byte[] array = new byte[payloadLen];
new Random().nextBytes(array);
ByteBuffer message = ByteBuffer.wrap(array);

Frame frame = new Frame(OpCode.BINARY, BufferUtil.copy(message));
for (int i = 0; i < 200; i++)
{
clientHandler.sendFrame(frame, Callback.NOOP, false);
Frame recvFrame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
assertThat(recvFrame.getOpCode(), is(OpCode.BINARY));
assertThat(recvFrame.getPayload(), is(message));
assertThat(frame.getPayload(), is(message));
}
assertThat(sendPayload.remaining(), is(0));

clientHandler.sendClose();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void test() throws Exception
assertNotNull(coreSession);
// Set max frame size to autoFragment the message into multiple frames.
ByteBuffer message = randomBytes(1024);
ByteBuffer messageSlice = message.slice();
coreSession.setMaxFrameSize(64);
coreSession.sendFrame(new Frame(OpCode.BINARY, message).setFin(true), Callback.NOOP, false);

Expand All @@ -108,7 +109,8 @@ public void test() throws Exception

assertThat(serverHandler.binaryMessages.size(), equalTo(1));
ByteBuffer recvMessage = serverHandler.binaryMessages.poll();
assertThat(recvMessage, equalTo(message));
assertThat(message.remaining(), equalTo(0));
assertThat(recvMessage, equalTo(messageSlice));
}

private static ByteBuffer randomBytes(int size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ public static class FrameCapture extends CoreSession.Empty
@Override
public void sendFrame(OutgoingEntry entry)
{
frames.offer(Frame.copy(entry.getFrame()));
Frame frame = entry.getFrame();
frames.add(Frame.copy(frame));
frame.getPayload().position(frame.getPayload().limit());
entry.getCallback().succeeded();
}
}
Expand All @@ -235,9 +237,10 @@ public void sendFrame(OutgoingEntry entry)

if (frame.isFin())
{
messages.offer(activeMessage.takeCompleteString(IllegalArgumentException::new));
messages.add(activeMessage.takeCompleteString(IllegalArgumentException::new));
activeMessage = null;
}
frame.getPayload().position(frame.getPayload().limit());
entry.getCallback().succeeded();
}
}
Expand Down
Loading