Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void sendURL(Context ctx, String urlPath, URL path) {
addCachedEntry(ctx, urlPath, fis);
return;
}
ctx.write(fis);
ctx.rangedWrite(fis);
} catch (final Exception e) {
throw404(ctx.exchange());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void sendFile(Context ctx, HttpExchange jdkExchange, String urlPath, Fil
addCachedEntry(ctx, urlPath, fis);
return;
}
ctx.write(fis);
ctx.rangedWrite(fis);
} catch (FileNotFoundException e) {
throw404(jdkExchange);
}
Expand Down
12 changes: 12 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/DJexConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final class DJexConfig implements JexConfig {
private final CompressionConfig compression = new CompressionConfig();
private int bufferInitial = 256;
private long bufferMax = 4096L;
private int rangeChunkSize = 131_072;
private HttpServerProvider serverProvider;

@Override
Expand Down Expand Up @@ -199,4 +200,15 @@ public JexConfig serverProvider(HttpServerProvider serverProvider) {
this.serverProvider = serverProvider;
return this;
}

@Override
public int rangeChunkSize() {
return rangeChunkSize;
}

@Override
public JexConfig rangeChunkSize(int rangeChunkSize) {
this.rangeChunkSize = rangeChunkSize;
return this;
}
}
12 changes: 11 additions & 1 deletion avaje-jex/src/main/java/io/avaje/jex/JexConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ public interface JexConfig {
*/
JexConfig port(int port);

/** The configured rangeChunk size */
int rangeChunkSize();

/**
* Set the chunk size on range requests, set to a high number to reduce the amount of range
* requests (especially for video streaming)
*
* @param rangeChunkSize chunk size on range requests
*/
JexConfig rangeChunkSize(int rangeChunkSize);

/**
* Registers a template renderer for a specific file extension.
*
Expand Down Expand Up @@ -185,5 +196,4 @@ public interface JexConfig {
* default value is used
*/
JexConfig socketBacklog(int backlog);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private void decideCompression(int length) throws IOException {
if (!compressionDecided) {
boolean compressionAllowed =
compressedStream == null
&& ctx.responseHeader(Constants.CONTENT_RANGE) == null
&& compression.allowsForCompression(ctx.responseHeader(Constants.CONTENT_TYPE));

if (compressionAllowed && length >= minSizeForCompression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ final class BufferedOutStream extends OutputStream {
this.context = context;
this.max = max;
this.buffer = new ByteArrayOutputStream(initial);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if this should be initialised at all.


// if content length is set, skip buffer
if (context.responseHeader(Constants.CONTENT_LENGTH) != null) {
count = max + 1;
}
}

@Override
Expand Down Expand Up @@ -52,7 +57,9 @@ public void write(byte[] b, int off, int len) throws IOException {
/** Use responseLength 0 and chunked response. */
private void initialiseChunked() throws IOException {
final HttpExchange exchange = context.exchange();
exchange.sendResponseHeaders(context.statusCode(), 0);
// if a manual content-length is set, honor that instead of chunking
String length = context.responseHeader(Constants.CONTENT_LENGTH);
exchange.sendResponseHeaders(context.statusCode(), length == null ? 0 : Long.parseLong(length));
stream = exchange.getResponseBody();
// empty the existing buffer
buffer.writeTo(stream);
Expand Down
5 changes: 5 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ private Constants() {}
public static final String TEXT_PLAIN_UTF8 = "text/plain;charset=utf-8";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_X_JSON_STREAM = "application/x-json-stream";

// range
public static final String ACCEPT_RANGES = "Accept-ranges";
public static final String RANGE = "Range";
public static final String CONTENT_RANGE = "Content-range";
}
5 changes: 5 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,9 @@ public void write(String content) {
public JsonService jsonService() {
return mgr.jsonService();
}

@Override
public void rangedWrite(InputStream inputStream, long totalBytes) {
mgr.writeRange(this, inputStream, totalBytes);
}
}
86 changes: 86 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/core/RangeWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.avaje.jex.core;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;

import io.avaje.jex.http.Context;
import io.avaje.jex.http.HttpStatus;

class RangeWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be final


private static final int DEFAULT_BUFFER_SIZE = 16384;

static void write(Context ctx, InputStream inputStream, long totalBytes, long chunkSize) {

ctx.header(Constants.ACCEPT_RANGES, "bytes");
final String rangeHeader = ctx.header(Constants.RANGE);
if (rangeHeader == null) {
ctx.contentLength(totalBytes).write(inputStream);
return;
}
final List<String> requestedRange =
Arrays.stream(rangeHeader.split("=")[1].split("-")).filter(s -> !s.isEmpty()).toList();
final long from = Long.parseLong(requestedRange.get(0));
final long to;

final boolean audioOrVideo = isAudioOrVideo(ctx.responseHeader(Constants.CONTENT_TYPE));

if (!audioOrVideo || from + chunkSize > totalBytes) {
to = totalBytes - 1; // chunk bigger than file, write all
} else if (requestedRange.size() == 2) {
to = Long.parseLong(requestedRange.get(1)); // chunk smaller than file, to/from specified
} else {
to = from + chunkSize - 1;
}

long contentLength;
if (audioOrVideo) {
contentLength = Math.min(to - from + 1, totalBytes);
} else {
contentLength = totalBytes - from;
}

final HttpStatus status;
if (audioOrVideo) {
status = HttpStatus.PARTIAL_CONTENT_206;
} else {
status = HttpStatus.OK_200;
}

ctx.status(status);
ctx.header(Constants.ACCEPT_RANGES, "bytes");
ctx.header(Constants.CONTENT_RANGE, "bytes " + from + "-" + to + "/" + totalBytes);
ctx.contentLength(contentLength);
try (var os = ctx.outputStream()) {
write(os, inputStream, from, to);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static void write(OutputStream outputStream, InputStream inputStream, long from, long to)
throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
long toSkip = from;
while (toSkip > 0) {
toSkip -= inputStream.skip(toSkip);
}
long bytesLeft = to - from + 1;
while (bytesLeft > 0) {
int read = inputStream.read(buffer, 0, (int) Math.min(DEFAULT_BUFFER_SIZE, bytesLeft));
if (read == -1) {
break; // End of stream reached unexpectedly
}
outputStream.write(buffer, 0, read);
bytesLeft -= read;
}
}

private static boolean isAudioOrVideo(String contentType) {
return contentType.startsWith("audio/") || contentType.startsWith("video/");
}
}
12 changes: 10 additions & 2 deletions avaje-jex/src/main/java/io/avaje/jex/core/ServiceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ final class ServiceManager {
private final String scheme;
private final int bufferInitial;
private final long bufferMax;
private final int rangeChunks;

static ServiceManager create(Jex jex) {
return new Builder(jex).build();
Expand All @@ -49,14 +50,16 @@ static ServiceManager create(Jex jex) {
TemplateManager templateManager,
String scheme,
long bufferMax,
int bufferInitial) {
int bufferInitial,
int rangeChunks) {
this.compressionConfig = compressionConfig;
this.jsonService = jsonService;
this.exceptionHandler = manager;
this.templateManager = templateManager;
this.scheme = scheme;
this.bufferInitial = bufferInitial;
this.bufferMax = bufferMax;
this.rangeChunks = rangeChunks;
}

OutputStream createOutputStream(JdkContext jdkContext) {
Expand Down Expand Up @@ -97,6 +100,10 @@ <E> void toJsonStream(Iterator<E> iterator, OutputStream os) {
}
}

void writeRange(Context ctx, InputStream is, long totalBytes) {
RangeWriter.write(ctx, is, totalBytes, rangeChunks);
}

void maybeClose(Object iterator) {
if (iterator instanceof AutoCloseable closeable) {
try {
Expand Down Expand Up @@ -177,7 +184,8 @@ ServiceManager build() {
initTemplateMgr(),
jex.config().scheme(),
jex.config().maxStreamBufferSize(),
jex.config().initialStreamBufferSize());
jex.config().initialStreamBufferSize(),
jex.config().rangeChunkSize());
}

JsonService initJsonService() {
Expand Down
27 changes: 27 additions & 0 deletions avaje-jex/src/main/java/io/avaje/jex/http/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -110,6 +112,11 @@ default <T> T bodyStreamAsClass(Class<T> beanType) {
/** Return the request content length. */
long contentLength();

/** Manually set the response content length. */
default Context contentLength(long length) {
return header(Constants.CONTENT_LENGTH, String.valueOf(length));
}

/** Return the request content type. */
String contentType();

Expand Down Expand Up @@ -192,6 +199,26 @@ default String fullUrl() {
return uri.charAt(0) != '/' ? uri : scheme() + "://" + host() + uri;
}

/**
* Reads HTTP Range headers and determines which part of the provided InputStream to write back.
*
* @param inputStream data to write
* @param totalBytes total size of the data
*/
void rangedWrite(InputStream inputStream, long totalBytes);

/**
* Writes input stream to {@link #rangedWrite(InputStream, long)} with currently available data
* via {@link InputStream#available}
*/
default void rangedWrite(InputStream inputStream) {
try {
rangedWrite(inputStream, inputStream.available());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Return the request header value by name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,23 @@ void testNoCompression() {
assertThat(res.statusCode()).isEqualTo(200);
assertThat(res.headers().firstValue(Constants.CONTENT_ENCODING)).isEmpty();
}

@Test
void testCompressionRange() throws IOException {
var res =
pair.request()
.header(Constants.ACCEPT_ENCODING, "deflate, gzip;q=1.0, *;q=0.5")
.path("compress")
.GET()
.asInputStream();
assertThat(res.statusCode()).isEqualTo(200);
assertThat(res.headers().firstValue(Constants.CONTENT_ENCODING)).contains("gzip");

var expected = CompressionTest.class.getResourceAsStream("/64KB.json").readAllBytes();

final var gzipInputStream = new GZIPInputStream(res.body());
var decompressed = gzipInputStream.readAllBytes();
gzipInputStream.close();
assertThat(decompressed).isEqualTo(expected);
}
}
57 changes: 57 additions & 0 deletions avaje-jex/src/test/java/io/avaje/jex/http/LargeSeekableInput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.avaje.jex.http;

import java.io.InputStream;

class LargeSeekableInput extends InputStream {
private final long prefixSize;
private final long contentSize;
private long alreadyRead = 0L;

public LargeSeekableInput(long prefixSize, long contentSize) {
this.prefixSize = prefixSize;
this.contentSize = contentSize;
}

private long remaining() {
return prefixSize + contentSize - alreadyRead;
}

@Override
public int available() {
long rem = remaining();
if (rem >= 0 && rem <= Integer.MAX_VALUE) {
return (int) rem;
} else {
return Integer.MAX_VALUE;
}
}

@Override
public long skip(long toSkip) {
if (toSkip <= 0) {
return 0;
} else {
long rem = remaining();
if (rem >= 0 && rem <= toSkip) {
alreadyRead += rem;
return rem;
} else {
alreadyRead += toSkip;
return toSkip;
}
}
}

@Override
public int read() {
if (remaining() == 0L) {
return -1;
} else if (alreadyRead < prefixSize) {
alreadyRead++;
return ' ';
} else {
alreadyRead++;
return 'J';
}
}
}
Loading