|
42 | 42 | import java.util.stream.Stream; |
43 | 43 |
|
44 | 44 | import static org.opensearch.index.store.remote.utils.FileTypeUtils.BLOCK_FILE_IDENTIFIER; |
45 | | -import static org.opensearch.index.store.remote.utils.FileTypeUtils.isBlockFile; |
46 | 45 | import static org.apache.lucene.index.IndexFileNames.SEGMENTS; |
47 | 46 |
|
48 | 47 | /** |
@@ -106,24 +105,6 @@ protected List<String> listBlockFiles(String fileName) throws IOException { |
106 | 105 | .collect(Collectors.toList()); |
107 | 106 | } |
108 | 107 |
|
109 | | - /** |
110 | | - * Returns a list of names of all block files stored in the local directory for a given set of files, |
111 | | - * including the original file names itself if present. |
112 | | - * |
113 | | - * @param fileNames The set of files to search for, along with its associated block files. |
114 | | - * @return A list of file names, including the original file (if present) and all its block files. |
115 | | - * @throws IOException in case of I/O error while listing files. |
116 | | - */ |
117 | | - protected List<String> listBlockFiles(String[] fileNames) throws IOException { |
118 | | - Set<String> files = Set.of(fileNames); |
119 | | - return Stream.of(listLocalFiles()) |
120 | | - .filter( |
121 | | - file -> files.contains(file) |
122 | | - || (isBlockFile(file) && files.contains(file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)))) |
123 | | - ) |
124 | | - .collect(Collectors.toList()); |
125 | | - } |
126 | | - |
127 | 108 | /** |
128 | 109 | * Returns names of all files stored in this directory in sorted order |
129 | 110 | * Does not include locally stored block files (having _block_ in their names) and files pending deletion |
@@ -269,16 +250,15 @@ public void sync(Collection<String> names) throws IOException { |
269 | 250 | ensureOpen(); |
270 | 251 | logger.trace("Composite Directory[{}]: sync() called {}", this::toString, () -> names); |
271 | 252 | Set<String> remoteFiles = Set.of(getRemoteFiles()); |
272 | | - Set<String> localFiles = Arrays.stream(listLocalFiles()) |
273 | | - .map(file -> isBlockFile(file) ? file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER)) : file) |
| 253 | + Set<String> localFilesHavingBlocks = Arrays.stream(listLocalFiles()) |
| 254 | + .filter(FileTypeUtils::isBlockFile) |
| 255 | + .map(file -> file.substring(0, file.indexOf(BLOCK_FILE_IDENTIFIER))) |
274 | 256 | .collect(Collectors.toSet()); |
275 | | - String[] fullFilesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).toArray(String[]::new); |
276 | | - for (String fullFileToSync : fullFilesToSync) { |
277 | | - if (localFiles.contains(fullFileToSync) == false) throw new NoSuchFileException("Unable to sync file " + fullFileToSync); |
278 | | - } |
279 | | - List<String> filesToSync = listBlockFiles(fullFilesToSync); |
280 | | - logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> filesToSync); |
281 | | - localDirectory.sync(filesToSync); |
| 257 | + Collection<String> fullFilesToSync = names.stream() |
| 258 | + .filter(name -> (remoteFiles.contains(name) == false) && (localFilesHavingBlocks.contains(name) == false)) |
| 259 | + .collect(Collectors.toList()); |
| 260 | + logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> fullFilesToSync); |
| 261 | + localDirectory.sync(fullFilesToSync); |
282 | 262 | } |
283 | 263 |
|
284 | 264 | /** |
|
0 commit comments