diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index da759f21addff..25ee020de8562 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -397,68 +397,82 @@ private void uploadBlob( assert ioContext != IOContext.READONCE : "Remote upload will fail with IoContext.READONCE"; long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; - try (IndexInput indexInput = from.openInput(src, ioContext)) { + IndexInput indexInput = from.openInput(src, ioContext); + try { contentLength = indexInput.length(); - } - boolean remoteIntegrityEnabled = false; - if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); - } - lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); - RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; - if (lowPriorityUpload) { - offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) - ); - } else { - offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( - new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) - ); - } - RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( - src, - remoteFileName, - contentLength, - true, - lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, - offsetRangeInputStreamSupplier, - expectedChecksum, - remoteIntegrityEnabled - ); - ActionListener completionListener = ActionListener.wrap(resp -> { - try { - postUploadRunner.run(); - listener.onResponse(null); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); - listener.onFailure(e); - } - }, ex -> { - logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); - IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); - if (corruptIndexException != null) { - listener.onFailure(corruptIndexException); - return; - } - Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); - if (throwable != null) { - CorruptFileException corruptFileException = (CorruptFileException) throwable; - listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); - return; + boolean remoteIntegrityEnabled = false; + if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { + remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } - listener.onFailure(ex); - }); + lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); + RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; - completionListener = ActionListener.runBefore(completionListener, () -> { - try { - remoteTransferContainer.close(); - } catch (Exception e) { - logger.warn("Error occurred while closing streams", e); + if (lowPriorityUpload) { + offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) + ); + } else { + offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( + new OffsetRangeIndexInputStream(indexInput.clone(), size, position) + ); } - }); + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, + offsetRangeInputStreamSupplier, + expectedChecksum, + remoteIntegrityEnabled + ); + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postUploadRunner.run(); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); - WriteContext writeContext = remoteTransferContainer.createWriteContext(); - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + completionListener = ActionListener.runAfter(completionListener, () -> { + try { + indexInput.close(); + } catch (IOException e) { + logger.warn("Error occurred while closing index input", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(writeContext, completionListener); + } catch (Exception e) { + logger.warn("Exception while calling asyncBlobUpload, closing IndexInput to avoid leak"); + indexInput.close(); + throw e; + } } private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 47b92a7bca454..af8382e2a3154 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; @@ -599,13 +600,16 @@ public IndexInput openBlockInput(String name, long position, long length, IOCont public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); - boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { - try { - postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); - } catch (IOException e) { - throw new RuntimeException("Exception in segment postUpload for file " + src, e); - } - }, listener, lowPriorityUpload); + boolean uploaded = false; + if (src.startsWith(IndexFileNames.SEGMENTS) == false) { + uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + } catch (IOException e) { + throw new RuntimeException("Exception in segment postUpload for file " + src, e); + } + }, listener, lowPriorityUpload); + } if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 35372c0f6c0e4..8094f1c91ee13 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -54,6 +54,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.mockito.stubbing.Answer; + import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; @@ -822,7 +824,7 @@ private Tuple mockIn when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings); if (testUploadTimeout) { when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10)); - doAnswer(invocation -> { + Answer answer = invocation -> { ActionListener actionListener = invocation.getArgument(5); indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { try { @@ -833,7 +835,11 @@ private Tuple mockIn actionListener.onResponse(null); }); return true; - }).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + }; + doAnswer(answer).when(remoteDirectory) + .copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + + doAnswer(answer).when(remoteDirectory).copyFrom(any(), any(), any(), any()); } RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(