From 6a769830313b397274a02caab55dbc02aec38f73 Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Mon, 30 Jun 2025 19:57:55 +0530 Subject: [PATCH 1/2] Fix sync method to account for block files in Composite Directory Signed-off-by: Shreyansh Ray --- .../index/store/CompositeDirectory.java | 30 +++++++++++++++++-- .../index/store/CompositeDirectoryTests.java | 6 ++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index 99eb1db04b296..7046fca966c7b 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -42,6 +42,7 @@ import java.util.stream.Stream; import static org.opensearch.index.store.remote.utils.FileTypeUtils.BLOCK_FILE_IDENTIFIER; +import static org.opensearch.index.store.remote.utils.FileTypeUtils.isBlockFile; import static org.apache.lucene.index.IndexFileNames.SEGMENTS; /** @@ -105,6 +106,24 @@ protected List listBlockFiles(String fileName) throws IOException { .collect(Collectors.toList()); } + /** + * Returns a list of names of all block files stored in the local directory for a given set of files, + * including the original file names itself if present. + * + * @param fileNames The set of files to search for, along with its associated block files. + * @return A list of file names, including the original file (if present) and all its block files. + * @throws IOException in case of I/O error while listing files. + */ + protected List listBlockFiles(String[] fileNames) throws IOException { + Set files = Set.of(fileNames); + return Stream.of(listLocalFiles()) + .filter( + file -> files.contains(file) + || (isBlockFile(file) && files.contains(file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)))) + ) + .collect(Collectors.toList()); + } + /** * Returns names of all files stored in this directory in sorted order * Does not include locally stored block files (having _block_ in their names) and files pending deletion @@ -249,8 +268,15 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti public void sync(Collection names) throws IOException { ensureOpen(); logger.trace("Composite Directory[{}]: sync() called {}", this::toString, () -> names); - Collection remoteFiles = Arrays.asList(getRemoteFiles()); - Collection filesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).collect(Collectors.toList()); + Set remoteFiles = Set.of(getRemoteFiles()); + Set localFiles = Arrays.stream(listLocalFiles()) + .map(file -> isBlockFile(file) ? file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)) : file) + .collect(Collectors.toSet()); + String[] fullFilesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).toArray(String[]::new); + for (String fullFileToSync : fullFilesToSync) { + if (localFiles.contains(fullFileToSync) == false) throw new NoSuchFileException("Unable to sync file " + fullFileToSync); + } + List filesToSync = listBlockFiles(fullFilesToSync); logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> filesToSync); localDirectory.sync(filesToSync); } diff --git a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java index 531bca97df662..7ee00712a236d 100644 --- a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java @@ -132,6 +132,12 @@ public void testSync() throws IOException { // All the files in the below list are present either locally or on remote, so sync should work as expected Collection names = List.of("_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1"); compositeDirectory.sync(names); + // Deleting file _1.cfe and then adding its blocks locally so that full file is not present but block files are present in local + // State of _1.cfe file after these operations - not present in remote, full file not present locally but blocks present in local + compositeDirectory.deleteFile("_1.cfe"); + addFilesToDirectory(new String[] { "_1.cfe_block_0", "_1.cfe_block_2" }); + // Sync should work as expected since blocks are present in local + compositeDirectory.sync(List.of("_1.cfe")); // Below list contains a non-existent file, hence will throw an error Collection names1 = List.of("_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "non_existent_file"); assertThrows(NoSuchFileException.class, () -> compositeDirectory.sync(names1)); From f5d972871dfbf6d6d7bb6e0e8a6ea54dc7166bfd Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Mon, 14 Jul 2025 14:52:42 +0530 Subject: [PATCH 2/2] Skip sync call for block file Signed-off-by: Shreyansh Ray --- .../index/store/CompositeDirectory.java | 36 +++++-------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index 7046fca966c7b..99e70dd215b5b 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -42,7 +42,6 @@ import java.util.stream.Stream; import static org.opensearch.index.store.remote.utils.FileTypeUtils.BLOCK_FILE_IDENTIFIER; -import static org.opensearch.index.store.remote.utils.FileTypeUtils.isBlockFile; import static org.apache.lucene.index.IndexFileNames.SEGMENTS; /** @@ -106,24 +105,6 @@ protected List listBlockFiles(String fileName) throws IOException { .collect(Collectors.toList()); } - /** - * Returns a list of names of all block files stored in the local directory for a given set of files, - * including the original file names itself if present. - * - * @param fileNames The set of files to search for, along with its associated block files. - * @return A list of file names, including the original file (if present) and all its block files. - * @throws IOException in case of I/O error while listing files. - */ - protected List listBlockFiles(String[] fileNames) throws IOException { - Set files = Set.of(fileNames); - return Stream.of(listLocalFiles()) - .filter( - file -> files.contains(file) - || (isBlockFile(file) && files.contains(file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)))) - ) - .collect(Collectors.toList()); - } - /** * Returns names of all files stored in this directory in sorted order * Does not include locally stored block files (having _block_ in their names) and files pending deletion @@ -269,16 +250,15 @@ public void sync(Collection names) throws IOException { ensureOpen(); logger.trace("Composite Directory[{}]: sync() called {}", this::toString, () -> names); Set remoteFiles = Set.of(getRemoteFiles()); - Set localFiles = Arrays.stream(listLocalFiles()) - .map(file -> isBlockFile(file) ? file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)) : file) + Set localFilesHavingBlocks = Arrays.stream(listLocalFiles()) + .filter(FileTypeUtils::isBlockFile) + .map(file -> file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER))) .collect(Collectors.toSet()); - String[] fullFilesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).toArray(String[]::new); - for (String fullFileToSync : fullFilesToSync) { - if (localFiles.contains(fullFileToSync) == false) throw new NoSuchFileException("Unable to sync file " + fullFileToSync); - } - List filesToSync = listBlockFiles(fullFilesToSync); - logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> filesToSync); - localDirectory.sync(filesToSync); + Collection fullFilesToSync = names.stream() + .filter(name -> (remoteFiles.contains(name) == false) && (localFilesHavingBlocks.contains(name) == false)) + .collect(Collectors.toList()); + logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> fullFilesToSync); + localDirectory.sync(fullFilesToSync); } /**