Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java

This file was deleted.

27 changes: 0 additions & 27 deletions src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java

This file was deleted.

5 changes: 0 additions & 5 deletions src/main/java/me/itzg/helpers/http/FetchBuilderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ protected URI uri() {
return state.uri;
}

protected ConcurrencyLimiter getConcurrencyLimiter() {
return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter()
: ConcurrencyLimiter.NOOP_LIMITER;
}

public Set<String> getAcceptContentTypes() {
return state.acceptContentTypes;
}
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/me/itzg/helpers/http/SharedFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public class SharedFetch implements AutoCloseable {

private final HttpClient reactiveClient;

private final ConcurrencyLimiter concurrencyLimiter;

public SharedFetch(String forCommand, Options options) {
final String userAgent = String.format("%s/%s/%s (cmd=%s)",
"itzg",
Expand Down Expand Up @@ -76,8 +74,6 @@ public SharedFetch(String forCommand, Options options) {
);

headers.put("x-fetch-session", fetchSessionId);

concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads());
}

public FetchBuilderBase<?> fetch(URI uri) {
Expand Down Expand Up @@ -120,9 +116,6 @@ public static class Options {
@Default
private final Duration pendingAcquireTimeout = Duration.ofSeconds(120);

@Default
private final int concurrentFileDownloads = 10;

private final Map<String,String> extraHeaders;

public Options withHeader(String key, String value) {
Expand All @@ -131,7 +124,7 @@ public Options withHeader(String key, String value) {
newHeaders.put(key, value);

return new Options(
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads,
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout,
newHeaders
);
}
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/me/itzg/helpers/http/SharedFetchArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ public void setPendingAcquireTimeout(Duration timeout) {
optionsBuilder.pendingAcquireTimeout(timeout);
}

@Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}",
paramLabel = "COUNT",
description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}"
)
public void setConcurrentFileDownloads(int concurrentFileDownloads) {
optionsBuilder.concurrentFileDownloads(concurrentFileDownloads);
}

public Options options() {
return optionsBuilder.build();
}
Expand Down
128 changes: 58 additions & 70 deletions src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static java.lang.System.currentTimeMillis;
import static java.util.Objects.requireNonNull;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.net.URI;
Expand All @@ -18,8 +17,6 @@
import me.itzg.helpers.errors.GenericException;
import me.itzg.helpers.files.ReactiveFileUtils;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClientResponse;

@Slf4j
@Accessors(fluent = true)
Expand Down Expand Up @@ -68,78 +65,69 @@ public Mono<Path> assemble() {
final boolean useIfModifiedSince = skipUpToDate && Files.exists(file);

return useReactiveClient(client ->
getConcurrencyLimiter().limit(
client
.doOnRequest((httpClientRequest, connection) ->
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
)
.headers(headers ->
setupHeaders(headers, useIfModifiedSince)
)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.get()
.uri(uri)
.response((resp, byteBufFlux) ->
processResponse(resp, byteBufFlux, useIfModifiedSince, uri)
)
.last()
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
)
);
}

private Mono<Path> processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) {
final HttpResponseStatus status = resp.status();

if (useIfModifiedSince && status == NOT_MODIFIED) {
log.debug("The file {} is already up to date", file);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
return Mono.just(file);
}

if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
}

if (notExpectedContentType(resp)) {
return failedContentTypeMono(resp);
}

return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
.flatMap(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
downloadedHandler.call(uri, file, fileSize);
return Mono
.deferContextual(contextView -> {
if (log.isDebugEnabled()) {
final long durationMillis =
currentTimeMillis() - contextView.<Long>get("downloadStart");
log.debug("Download of {} took {} at {}",
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
client
.doOnRequest((httpClientRequest, connection) ->
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
)
.headers(headers -> {
if (useIfModifiedSince) {
try {
final FileTime lastModifiedTime;
lastModifiedTime = Files.getLastModifiedTime(file);
headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
);
} catch (IOException e) {
throw new GenericException("Unable to get last modified time of " + file, e);
}
return Mono.just(file);
});
});
}

private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) {
if (useIfModifiedSince) {
try {
final FileTime lastModifiedTime;
lastModifiedTime = Files.getLastModifiedTime(file);
headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
);
} catch (IOException e) {
throw new GenericException("Unable to get last modified time of " + file, e);
}
}

}
applyHeaders(headers);
})
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.get()
.uri(uri)
.response((resp, byteBufFlux) -> {
final HttpResponseStatus status = resp.status();

applyHeaders(headers);
if (useIfModifiedSince && status == NOT_MODIFIED) {
log.debug("The file {} is already up to date", file);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
return Mono.just(file);
}

if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
}

if (notExpectedContentType(resp)) {
return failedContentTypeMono(resp);
}

return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
.flatMap(fileSize -> {
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
downloadedHandler.call(uri, file, fileSize);
return Mono
.deferContextual(contextView -> {
if (log.isDebugEnabled()) {
final long durationMillis =
currentTimeMillis() - contextView.<Long>get("downloadStart");
log.debug("Download of {} took {} at {}",
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
);
}
return Mono.just(file);
});
});

})
.last()
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
);
}

}