5252import org .opensearch .common .settings .Settings ;
5353import org .opensearch .common .unit .TimeValue ;
5454import org .opensearch .common .util .concurrent .CountDown ;
55- import org .opensearch .core .common .Strings ;
5655import org .opensearch .core .common .bytes .BytesArray ;
5756import org .opensearch .core .common .bytes .BytesReference ;
5857import org .opensearch .core .common .unit .ByteSizeValue ;
7574import java .util .concurrent .atomic .AtomicInteger ;
7675import java .util .concurrent .atomic .AtomicReference ;
7776
77+ import fixture .gcs .ContentHttpHeadersParser ;
7878import fixture .gcs .FakeOAuth2HttpHandler ;
7979import org .threeten .bp .Duration ;
8080
9292import static org .hamcrest .Matchers .instanceOf ;
9393import static org .hamcrest .Matchers .is ;
9494import static org .hamcrest .Matchers .notNullValue ;
95- import static fixture .gcs .GoogleCloudStorageHttpHandler .getContentRangeEnd ;
96- import static fixture .gcs .GoogleCloudStorageHttpHandler .getContentRangeLimit ;
97- import static fixture .gcs .GoogleCloudStorageHttpHandler .getContentRangeStart ;
9895import static fixture .gcs .GoogleCloudStorageHttpHandler .parseMultipartRequestBody ;
9996
10097@ SuppressForbidden (reason = "use a http server" )
@@ -152,7 +149,6 @@ StorageOptions createStorageOptions(
152149 .setInitialRetryDelay (Duration .ofMillis (10L ))
153150 .setRetryDelayMultiplier (1.0d )
154151 .setMaxRetryDelay (Duration .ofSeconds (1L ))
155- .setJittered (false )
156152 .setInitialRpcTimeout (Duration .ofSeconds (1 ))
157153 .setRpcTimeoutMultiplier (options .getRetrySettings ().getRpcTimeoutMultiplier ())
158154 .setMaxRpcTimeout (Duration .ofSeconds (1 ));
@@ -163,6 +159,7 @@ StorageOptions createStorageOptions(
163159 .setHost (options .getHost ())
164160 .setCredentials (options .getCredentials ())
165161 .setRetrySettings (retrySettingsBuilder .build ())
162+ .setStorageRetryStrategy (new GoogleShouldRetryStorageStrategy ())
166163 .build ();
167164 }
168165 };
@@ -220,7 +217,8 @@ public void testWriteBlobWithRetries() throws Exception {
220217 assertThat (content .isPresent (), is (true ));
221218 assertThat (content .get ().v1 (), equalTo ("write_blob_max_retries" ));
222219 if (Objects .deepEquals (bytes , BytesReference .toBytes (content .get ().v2 ()))) {
223- byte [] response = ("{\" bucket\" :\" bucket\" ,\" name\" :\" " + content .get ().v1 () + "\" }" ).getBytes (UTF_8 );
220+ byte [] response = String .format ("""
221+ {"bucket":"bucket","name":"%s"}""" , content .get ().v1 ()).getBytes (UTF_8 );
224222 exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
225223 exchange .sendResponseHeaders (RestStatus .OK .getStatus (), response .length );
226224 exchange .getResponseBody ().write (response );
@@ -274,8 +272,7 @@ public void testWriteBlobWithReadTimeouts() {
274272 }
275273
276274 public void testWriteLargeBlob () throws IOException {
277- // See {@link BaseWriteChannel#DEFAULT_CHUNK_SIZE}
278- final int defaultChunkSize = 60 * 256 * 1024 ;
275+ final int defaultChunkSize = Math .toIntExact (ByteSizeValue .parseBytesSizeValue ("16mb" , "aaa" ).getBytes ());
279276 final int nbChunks = randomIntBetween (3 , 5 );
280277 final int lastChunkSize = randomIntBetween (1 , defaultChunkSize - 1 );
281278 final int totalChunks = nbChunks + 1 ;
@@ -295,6 +292,7 @@ public void testWriteLargeBlob() throws IOException {
295292 final AtomicInteger countUploads = new AtomicInteger (nbErrors * totalChunks );
296293 final AtomicBoolean allow410Gone = new AtomicBoolean (randomBoolean ());
297294 final AtomicBoolean allowReadTimeout = new AtomicBoolean (rarely ());
295+ final AtomicInteger bytesReceived = new AtomicInteger ();
298296 final int wrongChunk = randomIntBetween (1 , totalChunks );
299297
300298 final AtomicReference <String > sessionUploadId = new AtomicReference <>(UUIDs .randomBase64UUID ());
@@ -325,7 +323,6 @@ public void testWriteLargeBlob() throws IOException {
325323 assertThat (wrongChunk , greaterThan (0 ));
326324 return ;
327325 }
328-
329326 } else if ("PUT" .equals (exchange .getRequestMethod ())) {
330327 final String uploadId = params .get ("upload_id" );
331328 if (uploadId .equals (sessionUploadId .get ()) == false ) {
@@ -348,29 +345,43 @@ public void testWriteLargeBlob() throws IOException {
348345 // we must reset the counters because the whole object upload will be retried
349346 countInits .set (nbErrors );
350347 countUploads .set (nbErrors * totalChunks );
348+ bytesReceived .set (0 );
351349
352350 exchange .sendResponseHeaders (HttpStatus .SC_GONE , -1 );
353351 return ;
354352 }
355353 }
356354
357- final String range = exchange .getRequestHeaders ().getFirst ("Content-Range" );
358- assertTrue (Strings .hasLength (range ));
355+ final String contentRangeHeaderValue = exchange .getRequestHeaders ().getFirst ("Content-Range" );
356+ final var contentRange = ContentHttpHeadersParser .parseContentRangeHeader (contentRangeHeaderValue );
357+ assertNotNull ("Invalid content range header: " + contentRangeHeaderValue , contentRange );
358+
359+ if (!contentRange .hasRange ()) {
360+ // Content-Range: */... is a status check
361+ // https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
362+ final int receivedSoFar = bytesReceived .get ();
363+ if (receivedSoFar > 0 ) {
364+ exchange .getResponseHeaders ().add ("Range" , String .format ("bytes=0-%s" , receivedSoFar ));
365+ }
366+ exchange .getResponseHeaders ().add ("Content-Length" , "0" );
367+ exchange .sendResponseHeaders (308 /* Resume Incomplete */ , -1 );
368+ return ;
369+ }
359370
360371 if (countUploads .decrementAndGet () % 2 == 0 ) {
361372 assertThat (Math .toIntExact (requestBody .length ()), anyOf (equalTo (defaultChunkSize ), equalTo (lastChunkSize )));
362-
363- final int rangeStart = getContentRangeStart (range );
364- final int rangeEnd = getContentRangeEnd (range );
373+ final int rangeStart = Math .toIntExact (contentRange .start ());
374+ final int rangeEnd = Math .toIntExact (contentRange .end ());
365375 assertThat (rangeEnd + 1 - rangeStart , equalTo (Math .toIntExact (requestBody .length ())));
366376 assertThat (new BytesArray (data , rangeStart , rangeEnd - rangeStart + 1 ), is (requestBody ));
377+ bytesReceived .updateAndGet (existing -> Math .max (existing , rangeEnd ));
367378
368- final Integer limit = getContentRangeLimit ( range );
369- if ( limit != null ) {
379+ if ( contentRange . size () != null ) {
380+ exchange . getResponseHeaders (). add ( "x-goog-stored-content-length" , String . valueOf ( bytesReceived . get () + 1 ));
370381 exchange .sendResponseHeaders (RestStatus .OK .getStatus (), -1 );
371382 return ;
372383 } else {
373- exchange .getResponseHeaders ().add ("Range" , String .format (Locale . ROOT , "bytes=%d/%d " , rangeStart , rangeEnd ));
384+ exchange .getResponseHeaders ().add ("Range" , String .format ("bytes=%s-%s " , rangeStart , rangeEnd ));
374385 exchange .getResponseHeaders ().add ("Content-Length" , "0" );
375386 exchange .sendResponseHeaders (308 /* Resume Incomplete */ , -1 );
376387 return ;
0 commit comments