Skip to content

Commit 5e23d25

Browse files
authored
http: limit concurrent file downloads (#548)
1 parent cfdcaa2 commit 5e23d25

File tree

6 files changed

+134
-59
lines changed

6 files changed

+134
-59
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package me.itzg.helpers.http;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
@FunctionalInterface
6+
public interface ConcurrencyLimiter {
7+
8+
ConcurrencyLimiter NOOP_LIMITER = new ConcurrencyLimiter() {
9+
@Override
10+
public <T> Mono<T> limit(Mono<T> source) {
11+
return source;
12+
}
13+
};
14+
15+
<T> Mono<T> limit(Mono<T> source);
16+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package me.itzg.helpers.http;
2+
3+
import java.util.concurrent.Semaphore;
4+
import reactor.core.publisher.Mono;
5+
import reactor.core.scheduler.Schedulers;
6+
7+
public class ConcurrencyLimiterImpl implements ConcurrencyLimiter {
8+
9+
private final Semaphore semaphore;
10+
11+
public ConcurrencyLimiterImpl(int concurrency) {
12+
this.semaphore = new Semaphore(concurrency);
13+
}
14+
15+
@Override
16+
public <T> Mono<T> limit(Mono<T> source) {
17+
return Mono.using(
18+
() -> {
19+
semaphore.acquire();
20+
return Boolean.TRUE;
21+
},
22+
r -> source,
23+
r -> semaphore.release()
24+
)
25+
.subscribeOn(Schedulers.boundedElastic());
26+
}
27+
}

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ protected URI uri() {
142142
return state.uri;
143143
}
144144

145+
protected ConcurrencyLimiter getConcurrencyLimiter() {
146+
return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter()
147+
: ConcurrencyLimiter.NOOP_LIMITER;
148+
}
149+
145150
public Set<String> getAcceptContentTypes() {
146151
return state.acceptContentTypes;
147152
}

src/main/java/me/itzg/helpers/http/SharedFetch.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class SharedFetch implements AutoCloseable {
3333

3434
private final HttpClient reactiveClient;
3535

36+
private final ConcurrencyLimiter concurrencyLimiter;
37+
3638
public SharedFetch(String forCommand, Options options) {
3739
final String userAgent = String.format("%s/%s/%s (cmd=%s)",
3840
"itzg",
@@ -74,6 +76,8 @@ public SharedFetch(String forCommand, Options options) {
7476
);
7577

7678
headers.put("x-fetch-session", fetchSessionId);
79+
80+
concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads());
7781
}
7882

7983
public FetchBuilderBase<?> fetch(URI uri) {
@@ -116,6 +120,9 @@ public static class Options {
116120
@Default
117121
private final Duration pendingAcquireTimeout = Duration.ofSeconds(120);
118122

123+
@Default
124+
private final int concurrentFileDownloads = 10;
125+
119126
private final Map<String,String> extraHeaders;
120127

121128
public Options withHeader(String key, String value) {
@@ -124,7 +131,7 @@ public Options withHeader(String key, String value) {
124131
newHeaders.put(key, value);
125132

126133
return new Options(
127-
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout,
134+
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads,
128135
newHeaders
129136
);
130137
}

src/main/java/me/itzg/helpers/http/SharedFetchArgs.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ public void setPendingAcquireTimeout(Duration timeout) {
4848
optionsBuilder.pendingAcquireTimeout(timeout);
4949
}
5050

51+
@Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}",
52+
paramLabel = "COUNT",
53+
description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}"
54+
)
55+
public void setConcurrentFileDownloads(int concurrentFileDownloads) {
56+
optionsBuilder.concurrentFileDownloads(concurrentFileDownloads);
57+
}
58+
5159
public Options options() {
5260
return optionsBuilder.build();
5361
}

src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static java.lang.System.currentTimeMillis;
66
import static java.util.Objects.requireNonNull;
77

8+
import io.netty.handler.codec.http.HttpHeaders;
89
import io.netty.handler.codec.http.HttpResponseStatus;
910
import java.io.IOException;
1011
import java.net.URI;
@@ -17,6 +18,8 @@
1718
import me.itzg.helpers.errors.GenericException;
1819
import me.itzg.helpers.files.ReactiveFileUtils;
1920
import reactor.core.publisher.Mono;
21+
import reactor.netty.ByteBufFlux;
22+
import reactor.netty.http.client.HttpClientResponse;
2023

2124
@Slf4j
2225
@Accessors(fluent = true)
@@ -65,69 +68,78 @@ public Mono<Path> assemble() {
6568
final boolean useIfModifiedSince = skipUpToDate && Files.exists(file);
6669

6770
return useReactiveClient(client ->
68-
client
69-
.doOnRequest((httpClientRequest, connection) ->
70-
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
71-
)
72-
.headers(headers -> {
73-
if (useIfModifiedSince) {
74-
try {
75-
final FileTime lastModifiedTime;
76-
lastModifiedTime = Files.getLastModifiedTime(file);
77-
headers.set(
78-
IF_MODIFIED_SINCE,
79-
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
71+
getConcurrencyLimiter().limit(
72+
client
73+
.doOnRequest((httpClientRequest, connection) ->
74+
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
75+
)
76+
.headers(headers ->
77+
setupHeaders(headers, useIfModifiedSince)
78+
)
79+
.followRedirect(true)
80+
.doOnRequest(debugLogRequest(log, "file fetch"))
81+
.get()
82+
.uri(uri)
83+
.response((resp, byteBufFlux) ->
84+
processResponse(resp, byteBufFlux, useIfModifiedSince, uri)
85+
)
86+
.last()
87+
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
88+
)
89+
);
90+
}
91+
92+
private Mono<Path> processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) {
93+
final HttpResponseStatus status = resp.status();
94+
95+
if (useIfModifiedSince && status == NOT_MODIFIED) {
96+
log.debug("The file {} is already up to date", file);
97+
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
98+
return Mono.just(file);
99+
}
100+
101+
if (notSuccess(resp)) {
102+
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
103+
}
104+
105+
if (notExpectedContentType(resp)) {
106+
return failedContentTypeMono(resp);
107+
}
108+
109+
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
110+
.flatMap(fileSize -> {
111+
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
112+
downloadedHandler.call(uri, file, fileSize);
113+
return Mono
114+
.deferContextual(contextView -> {
115+
if (log.isDebugEnabled()) {
116+
final long durationMillis =
117+
currentTimeMillis() - contextView.<Long>get("downloadStart");
118+
log.debug("Download of {} took {} at {}",
119+
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
80120
);
81-
} catch (IOException e) {
82-
throw new GenericException("Unable to get last modified time of " + file, e);
83121
}
122+
return Mono.just(file);
123+
});
124+
});
125+
}
84126

85-
}
127+
private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) {
128+
if (useIfModifiedSince) {
129+
try {
130+
final FileTime lastModifiedTime;
131+
lastModifiedTime = Files.getLastModifiedTime(file);
132+
headers.set(
133+
IF_MODIFIED_SINCE,
134+
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
135+
);
136+
} catch (IOException e) {
137+
throw new GenericException("Unable to get last modified time of " + file, e);
138+
}
86139

87-
applyHeaders(headers);
88-
})
89-
.followRedirect(true)
90-
.doOnRequest(debugLogRequest(log, "file fetch"))
91-
.get()
92-
.uri(uri)
93-
.response((resp, byteBufFlux) -> {
94-
final HttpResponseStatus status = resp.status();
140+
}
95141

96-
if (useIfModifiedSince && status == NOT_MODIFIED) {
97-
log.debug("The file {} is already up to date", file);
98-
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
99-
return Mono.just(file);
100-
}
101-
102-
if (notSuccess(resp)) {
103-
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
104-
}
105-
106-
if (notExpectedContentType(resp)) {
107-
return failedContentTypeMono(resp);
108-
}
109-
110-
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
111-
.flatMap(fileSize -> {
112-
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
113-
downloadedHandler.call(uri, file, fileSize);
114-
return Mono
115-
.deferContextual(contextView -> {
116-
if (log.isDebugEnabled()) {
117-
final long durationMillis =
118-
currentTimeMillis() - contextView.<Long>get("downloadStart");
119-
log.debug("Download of {} took {} at {}",
120-
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
121-
);
122-
}
123-
return Mono.just(file);
124-
});
125-
});
126-
127-
})
128-
.last()
129-
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
130-
);
142+
applyHeaders(headers);
131143
}
132144

133145
}

0 commit comments

Comments
 (0)