Skip to content

Commit 6b1657e

Browse files
committed
Migrate GCS repository plugin to use GCS SDK v 2.x
Signed-off-by: Andrey Pleskach <[email protected]>
1 parent 6cc1606 commit 6b1657e

File tree

8 files changed

+301
-89
lines changed

8 files changed

+301
-89
lines changed

buildSrc/src/main/groovy/org/opensearch/gradle/test/TestWithSslPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public void apply(Project project) {
7777
File keystoreDir = new File(project.getBuildDir(), "keystore/test/ssl");
7878
File nodeKeystore = new File(keystoreDir, "test-node.jks");
7979
File clientKeyStore = new File(keystoreDir, "test-client.jks");
80+
@SuppressWarnings("unchecked")
8081
NamedDomainObjectContainer<OpenSearchCluster> clusters = (NamedDomainObjectContainer<OpenSearchCluster>) project.getExtensions()
8182
.getByName(TestClustersPlugin.EXTENSION_NAME);
8283
clusters.all(c -> {

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,18 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long
300300
@SuppressForbidden(reason = "channel is based on a socket")
301301
@Override
302302
public int write(final ByteBuffer src) throws IOException {
303-
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
303+
try {
304+
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
305+
} catch (final IOException ioe) {
306+
final StorageException storageException = (StorageException) ExceptionsHelper.unwrap(
307+
ioe,
308+
StorageException.class
309+
);
310+
if (storageException != null) {
311+
throw storageException;
312+
}
313+
throw ioe;
314+
}
304315
}
305316

306317
@Override

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ StorageOptions createStorageOptions(
216216
mapBuilder.put("user-agent", clientSettings.getApplicationName());
217217
}
218218
return mapBuilder.immutableMap();
219-
});
219+
})
220+
.setStorageRetryStrategy(new GoogleShouldRetryStorageStrategy());
220221
if (Strings.hasLength(clientSettings.getHost())) {
221222
storageOptionsBuilder.setHost(clientSettings.getHost());
222223
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.gcs;
10+
11+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
12+
import com.google.api.gax.retrying.TimedAttemptSettings;
13+
import com.google.cloud.BaseService;
14+
import com.google.cloud.storage.StorageRetryStrategy;
15+
import org.opensearch.ExceptionsHelper;
16+
17+
import java.net.SocketException;
18+
import java.net.UnknownHostException;
19+
import java.util.concurrent.CancellationException;
20+
21+
import static java.util.Objects.nonNull;
22+
23+
public class GoogleShouldRetryStorageStrategy implements StorageRetryStrategy {
24+
25+
private final DelagateResultRetryAlgorithm<?> idempotentHandler = new DelagateResultRetryAlgorithm<>(BaseService.EXCEPTION_HANDLER);
26+
27+
private final DelagateResultRetryAlgorithm<?> nonIdempotentHandler = new DelagateResultRetryAlgorithm<>(BaseService.EXCEPTION_HANDLER);
28+
29+
private static final class DelagateResultRetryAlgorithm<T> implements ResultRetryAlgorithm<T> {
30+
31+
private final ResultRetryAlgorithm<T> resultRetryAlgorithm;
32+
33+
private DelagateResultRetryAlgorithm(ResultRetryAlgorithm<T> resultRetryAlgorithm) {
34+
this.resultRetryAlgorithm = resultRetryAlgorithm;
35+
}
36+
37+
@Override
38+
public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, T prevResponse, TimedAttemptSettings prevSettings) {
39+
return resultRetryAlgorithm.createNextAttempt(prevThrowable, prevResponse, prevSettings);
40+
}
41+
42+
@Override
43+
public boolean shouldRetry(Throwable prevThrowable, T prevResponse) throws CancellationException {
44+
if (nonNull(ExceptionsHelper.unwrap(prevThrowable, UnknownHostException.class))) {
45+
return true;
46+
}
47+
if (nonNull(ExceptionsHelper.unwrap(prevThrowable, SocketException.class))) {
48+
return true;
49+
}
50+
return resultRetryAlgorithm.shouldRetry(prevThrowable, prevResponse);
51+
}
52+
};
53+
54+
@Override
55+
public ResultRetryAlgorithm<?> getIdempotentHandler() {
56+
return idempotentHandler;
57+
}
58+
59+
@Override
60+
public ResultRetryAlgorithm<?> getNonidempotentHandler() {
61+
return nonIdempotentHandler;
62+
}
63+
}

plugins/repository-gcs/src/test/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.opensearch.common.settings.Settings;
5353
import org.opensearch.common.unit.TimeValue;
5454
import org.opensearch.common.util.concurrent.CountDown;
55-
import org.opensearch.core.common.Strings;
5655
import org.opensearch.core.common.bytes.BytesArray;
5756
import org.opensearch.core.common.bytes.BytesReference;
5857
import org.opensearch.core.common.unit.ByteSizeValue;
@@ -75,6 +74,7 @@
7574
import java.util.concurrent.atomic.AtomicInteger;
7675
import java.util.concurrent.atomic.AtomicReference;
7776

77+
import fixture.gcs.ContentHttpHeadersParser;
7878
import fixture.gcs.FakeOAuth2HttpHandler;
7979
import org.threeten.bp.Duration;
8080

@@ -92,9 +92,6 @@
9292
import static org.hamcrest.Matchers.instanceOf;
9393
import static org.hamcrest.Matchers.is;
9494
import static org.hamcrest.Matchers.notNullValue;
95-
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeEnd;
96-
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeLimit;
97-
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
9895
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
9996

10097
@SuppressForbidden(reason = "use a http server")
@@ -152,7 +149,6 @@ StorageOptions createStorageOptions(
152149
.setInitialRetryDelay(Duration.ofMillis(10L))
153150
.setRetryDelayMultiplier(1.0d)
154151
.setMaxRetryDelay(Duration.ofSeconds(1L))
155-
.setJittered(false)
156152
.setInitialRpcTimeout(Duration.ofSeconds(1))
157153
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
158154
.setMaxRpcTimeout(Duration.ofSeconds(1));
@@ -163,6 +159,7 @@ StorageOptions createStorageOptions(
163159
.setHost(options.getHost())
164160
.setCredentials(options.getCredentials())
165161
.setRetrySettings(retrySettingsBuilder.build())
162+
.setStorageRetryStrategy(new GoogleShouldRetryStorageStrategy())
166163
.build();
167164
}
168165
};
@@ -220,7 +217,8 @@ public void testWriteBlobWithRetries() throws Exception {
220217
assertThat(content.isPresent(), is(true));
221218
assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
222219
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
223-
byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
220+
byte[] response = String.format("""
221+
{"bucket":"bucket","name":"%s"}""", content.get().v1()).getBytes(UTF_8);
224222
exchange.getResponseHeaders().add("Content-Type", "application/json");
225223
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
226224
exchange.getResponseBody().write(response);
@@ -274,8 +272,7 @@ public void testWriteBlobWithReadTimeouts() {
274272
}
275273

276274
public void testWriteLargeBlob() throws IOException {
277-
// See {@link BaseWriteChannel#DEFAULT_CHUNK_SIZE}
278-
final int defaultChunkSize = 60 * 256 * 1024;
275+
final int defaultChunkSize = Math.toIntExact(ByteSizeValue.parseBytesSizeValue("16mb", "aaa").getBytes());
279276
final int nbChunks = randomIntBetween(3, 5);
280277
final int lastChunkSize = randomIntBetween(1, defaultChunkSize - 1);
281278
final int totalChunks = nbChunks + 1;
@@ -295,6 +292,7 @@ public void testWriteLargeBlob() throws IOException {
295292
final AtomicInteger countUploads = new AtomicInteger(nbErrors * totalChunks);
296293
final AtomicBoolean allow410Gone = new AtomicBoolean(randomBoolean());
297294
final AtomicBoolean allowReadTimeout = new AtomicBoolean(rarely());
295+
final AtomicInteger bytesReceived = new AtomicInteger();
298296
final int wrongChunk = randomIntBetween(1, totalChunks);
299297

300298
final AtomicReference<String> sessionUploadId = new AtomicReference<>(UUIDs.randomBase64UUID());
@@ -325,7 +323,6 @@ public void testWriteLargeBlob() throws IOException {
325323
assertThat(wrongChunk, greaterThan(0));
326324
return;
327325
}
328-
329326
} else if ("PUT".equals(exchange.getRequestMethod())) {
330327
final String uploadId = params.get("upload_id");
331328
if (uploadId.equals(sessionUploadId.get()) == false) {
@@ -348,29 +345,43 @@ public void testWriteLargeBlob() throws IOException {
348345
// we must reset the counters because the whole object upload will be retried
349346
countInits.set(nbErrors);
350347
countUploads.set(nbErrors * totalChunks);
348+
bytesReceived.set(0);
351349

352350
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
353351
return;
354352
}
355353
}
356354

357-
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
358-
assertTrue(Strings.hasLength(range));
355+
final String contentRangeHeaderValue = exchange.getRequestHeaders().getFirst("Content-Range");
356+
final var contentRange = ContentHttpHeadersParser.parseContentRangeHeader(contentRangeHeaderValue);
357+
assertNotNull("Invalid content range header: " + contentRangeHeaderValue, contentRange);
358+
359+
if (!contentRange.hasRange()) {
360+
// Content-Range: */... is a status check
361+
// https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
362+
final int receivedSoFar = bytesReceived.get();
363+
if (receivedSoFar > 0) {
364+
exchange.getResponseHeaders().add("Range", String.format("bytes=0-%s", receivedSoFar));
365+
}
366+
exchange.getResponseHeaders().add("Content-Length", "0");
367+
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
368+
return;
369+
}
359370

360371
if (countUploads.decrementAndGet() % 2 == 0) {
361372
assertThat(Math.toIntExact(requestBody.length()), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
362-
363-
final int rangeStart = getContentRangeStart(range);
364-
final int rangeEnd = getContentRangeEnd(range);
373+
final int rangeStart = Math.toIntExact(contentRange.start());
374+
final int rangeEnd = Math.toIntExact(contentRange.end());
365375
assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(requestBody.length())));
366376
assertThat(new BytesArray(data, rangeStart, rangeEnd - rangeStart + 1), is(requestBody));
377+
bytesReceived.updateAndGet(existing -> Math.max(existing, rangeEnd));
367378

368-
final Integer limit = getContentRangeLimit(range);
369-
if (limit != null) {
379+
if (contentRange.size() != null) {
380+
exchange.getResponseHeaders().add("x-goog-stored-content-length", String.valueOf(bytesReceived.get() + 1));
370381
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
371382
return;
372383
} else {
373-
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", rangeStart, rangeEnd));
384+
exchange.getResponseHeaders().add("Range", String.format("bytes=%s-%s", rangeStart, rangeEnd));
374385
exchange.getResponseHeaders().add("Content-Length", "0");
375386
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
376387
return;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package fixture.gcs;
10+
11+
import java.util.regex.Matcher;
12+
import java.util.regex.Pattern;
13+
14+
public class ContentHttpHeadersParser {
15+
16+
private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes=([0-9]+)-([0-9]+)");
17+
private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("bytes (?:(\\d+)-(\\d+)|\\*)/(?:(\\d+)|\\*)");
18+
19+
20+
public static Range parseRangeHeader(String rangeHeaderValue) {
21+
final Matcher matcher = PATTERN_CONTENT_RANGE.matcher(rangeHeaderValue);
22+
if (matcher.matches()) {
23+
try {
24+
return new Range(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)));
25+
} catch (NumberFormatException e) {
26+
return null;
27+
}
28+
}
29+
return null;
30+
}
31+
32+
public record Range(int start, int end) {
33+
34+
public String header() {
35+
return String.format("bytes=%s-%s", start, end);
36+
}
37+
38+
}
39+
40+
public static ContentRange parseContentRangeHeader(String contentRangeHeaderValue) {
41+
final Matcher matcher = PATTERN_CONTENT_RANGE_BYTES.matcher(contentRangeHeaderValue);
42+
if (matcher.matches()) {
43+
try {
44+
if (matcher.groupCount() == 3) {
45+
final Integer start = parseIntegerValue(matcher.group(1));
46+
final Integer end = parseIntegerValue(matcher.group(2));
47+
final Integer size = parseIntegerValue(matcher.group(3));
48+
return new ContentRange(start, end, size);
49+
}
50+
} catch (NumberFormatException e) {
51+
return null;
52+
}
53+
}
54+
return null;
55+
}
56+
57+
private static Integer parseIntegerValue(String value) {
58+
return value == null ? null : Integer.parseInt(value);
59+
}
60+
61+
public record ContentRange(Integer start, Integer end, Integer size) {
62+
63+
public ContentRange {
64+
assert (start == null) == (end == null) : "Must have either start and end or neither";
65+
}
66+
67+
public boolean hasRange() {
68+
return start != null && end != null;
69+
}
70+
71+
public boolean hasSize() {
72+
return size != null;
73+
}
74+
75+
public String headerString() {
76+
final String rangeString = hasRange() ? start + "-" + end : "*";
77+
final String sizeString = hasSize() ? String.valueOf(size) : "*";
78+
return "bytes " + rangeString + "/" + sizeString;
79+
}
80+
}
81+
82+
}

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ public class FakeOAuth2HttpHandler implements HttpHandler {
4848
@Override
4949
public void handle(final HttpExchange exchange) throws IOException {
5050
try {
51-
while (exchange.getRequestBody().read(BUFFER) >= 0) ;
52-
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
51+
while (exchange.getRequestBody().read(BUFFER) >= 0) {
52+
}
53+
byte[] response = """
54+
{"access_token":"foo","token_type":"Bearer","expires_in":3600}""".getBytes(UTF_8);
5355
exchange.getResponseHeaders().add("Content-Type", "application/json");
56+
exchange.getResponseHeaders().add("Metadata-Flavor", "Google");
5457
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
5558
exchange.getResponseBody().write(response);
5659
} finally {

0 commit comments

Comments
 (0)