diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java deleted file mode 100644 index cd0fee98..00000000 --- a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java +++ /dev/null @@ -1,16 +0,0 @@ -package me.itzg.helpers.http; - -import reactor.core.publisher.Mono; - -@FunctionalInterface -public interface ConcurrencyLimiter { - - ConcurrencyLimiter NOOP_LIMITER = new ConcurrencyLimiter() { - @Override - public Mono limit(Mono source) { - return source; - } - }; - - Mono limit(Mono source); -} diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java deleted file mode 100644 index ea44ff4b..00000000 --- a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java +++ /dev/null @@ -1,27 +0,0 @@ -package me.itzg.helpers.http; - -import java.util.concurrent.Semaphore; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -public class ConcurrencyLimiterImpl implements ConcurrencyLimiter { - - private final Semaphore semaphore; - - public ConcurrencyLimiterImpl(int concurrency) { - this.semaphore = new Semaphore(concurrency); - } - - @Override - public Mono limit(Mono source) { - return Mono.using( - () -> { - semaphore.acquire(); - return Boolean.TRUE; - }, - r -> source, - r -> semaphore.release() - ) - .subscribeOn(Schedulers.boundedElastic()); - } -} diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index 71039a6c..c6906b29 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -147,11 +147,6 @@ protected URI uri() { return state.uri; } - protected ConcurrencyLimiter getConcurrencyLimiter() { - return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter() - : ConcurrencyLimiter.NOOP_LIMITER; - } - public Set getAcceptContentTypes() { return state.acceptContentTypes; } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 6d1ae723..72c36c93 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -33,8 +33,6 @@ public class SharedFetch implements AutoCloseable { private final HttpClient reactiveClient; - private final ConcurrencyLimiter concurrencyLimiter; - public SharedFetch(String forCommand, Options options) { final String userAgent = String.format("%s/%s/%s (cmd=%s)", "itzg", @@ -76,8 +74,6 @@ public SharedFetch(String forCommand, Options options) { ); headers.put("x-fetch-session", fetchSessionId); - - concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads()); } public FetchBuilderBase fetch(URI uri) { @@ -120,9 +116,6 @@ public static class Options { @Default private final Duration pendingAcquireTimeout = Duration.ofSeconds(120); - @Default - private final int concurrentFileDownloads = 10; - private final Map extraHeaders; public Options withHeader(String key, String value) { @@ -131,7 +124,7 @@ public Options withHeader(String key, String value) { newHeaders.put(key, value); return new Options( - responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads, + responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, newHeaders ); } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java index e64461b3..6ca4d75b 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java @@ -48,14 +48,6 @@ public void setPendingAcquireTimeout(Duration timeout) { optionsBuilder.pendingAcquireTimeout(timeout); } - @Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}", - paramLabel = "COUNT", - description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}" - ) - public void setConcurrentFileDownloads(int concurrentFileDownloads) { - optionsBuilder.concurrentFileDownloads(concurrentFileDownloads); - } - public Options options() { return optionsBuilder.build(); } diff --git a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java index 58ce3e5e..000aa411 100644 --- a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java @@ -5,7 +5,6 @@ import static java.lang.System.currentTimeMillis; import static java.util.Objects.requireNonNull; -import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; import java.net.URI; @@ -18,8 +17,6 @@ import me.itzg.helpers.errors.GenericException; import me.itzg.helpers.files.ReactiveFileUtils; import reactor.core.publisher.Mono; -import reactor.netty.ByteBufFlux; -import reactor.netty.http.client.HttpClientResponse; @Slf4j @Accessors(fluent = true) @@ -68,78 +65,69 @@ public Mono assemble() { final boolean useIfModifiedSince = skipUpToDate && Files.exists(file); return useReactiveClient(client -> - getConcurrencyLimiter().limit( - client - .doOnRequest((httpClientRequest, connection) -> - statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) - ) - .headers(headers -> - setupHeaders(headers, useIfModifiedSince) - ) - .followRedirect(true) - .doOnRequest(debugLogRequest(log, "file fetch")) - .get() - .uri(uri) - .response((resp, byteBufFlux) -> - processResponse(resp, byteBufFlux, useIfModifiedSince, uri) - ) - .last() - .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) - ) - ); - } - - private Mono processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) { - final HttpResponseStatus status = resp.status(); - - if (useIfModifiedSince && status == NOT_MODIFIED) { - log.debug("The file {} is already up to date", file); - statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); - return Mono.just(file); - } - - if (notSuccess(resp)) { - return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); - } - - if (notExpectedContentType(resp)) { - return failedContentTypeMono(resp); - } - - return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) - .flatMap(fileSize -> { - statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); - downloadedHandler.call(uri, file, fileSize); - return Mono - .deferContextual(contextView -> { - if (log.isDebugEnabled()) { - final long durationMillis = - currentTimeMillis() - contextView.get("downloadStart"); - log.debug("Download of {} took {} at {}", - uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) + client + .doOnRequest((httpClientRequest, connection) -> + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) + ) + .headers(headers -> { + if (useIfModifiedSince) { + try { + final FileTime lastModifiedTime; + lastModifiedTime = Files.getLastModifiedTime(file); + headers.set( + IF_MODIFIED_SINCE, + httpDateTimeFormatter.format(lastModifiedTime.toInstant()) ); + } catch (IOException e) { + throw new GenericException("Unable to get last modified time of " + file, e); } - return Mono.just(file); - }); - }); - } - private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) { - if (useIfModifiedSince) { - try { - final FileTime lastModifiedTime; - lastModifiedTime = Files.getLastModifiedTime(file); - headers.set( - IF_MODIFIED_SINCE, - httpDateTimeFormatter.format(lastModifiedTime.toInstant()) - ); - } catch (IOException e) { - throw new GenericException("Unable to get last modified time of " + file, e); - } + } - } + applyHeaders(headers); + }) + .followRedirect(true) + .doOnRequest(debugLogRequest(log, "file fetch")) + .get() + .uri(uri) + .response((resp, byteBufFlux) -> { + final HttpResponseStatus status = resp.status(); - applyHeaders(headers); + if (useIfModifiedSince && status == NOT_MODIFIED) { + log.debug("The file {} is already up to date", file); + statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); + return Mono.just(file); + } + + if (notSuccess(resp)) { + return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); + } + + if (notExpectedContentType(resp)) { + return failedContentTypeMono(resp); + } + + return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) + .flatMap(fileSize -> { + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); + downloadedHandler.call(uri, file, fileSize); + return Mono + .deferContextual(contextView -> { + if (log.isDebugEnabled()) { + final long durationMillis = + currentTimeMillis() - contextView.get("downloadStart"); + log.debug("Download of {} took {} at {}", + uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) + ); + } + return Mono.just(file); + }); + }); + + }) + .last() + .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) + ); } }