4040import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadata ;
4141import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadataHandler ;
4242import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
43+ import org .opensearch .node .remotestore .RemoteStorePinnedTimestampService ;
4344import org .opensearch .threadpool .ThreadPool ;
4445
4546import java .io .FileNotFoundException ;
@@ -91,6 +92,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
9192
9293 private final RemoteStoreLockManager mdLockManager ;
9394
95+ private final Map <Long , String > metadataFilePinnedTimestampMap ;
96+
9497 private final ThreadPool threadPool ;
9598
9699 /**
@@ -132,6 +135,7 @@ public RemoteSegmentStoreDirectory(
132135 this .remoteMetadataDirectory = remoteMetadataDirectory ;
133136 this .mdLockManager = mdLockManager ;
134137 this .threadPool = threadPool ;
138+ this .metadataFilePinnedTimestampMap = new HashMap <>();
135139 this .logger = Loggers .getLogger (getClass (), shardId );
136140 init ();
137141 }
@@ -176,6 +180,42 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
176180 return remoteSegmentMetadata ;
177181 }
178182
183+ /**
184+ * Initializes the remote segment metadata to a specific timestamp.
185+ *
186+ * @param timestamp The timestamp to initialize the remote segment metadata to.
187+ * @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
188+ * @throws IOException If an I/O error occurs while reading the metadata file.
189+ */
190+ public RemoteSegmentMetadata initializeToSpecificTimestamp (long timestamp ) throws IOException {
191+ List <String > metadataFiles = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
192+ MetadataFilenameUtils .METADATA_PREFIX ,
193+ Integer .MAX_VALUE
194+ );
195+ Set <String > lockedMetadataFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
196+ metadataFiles ,
197+ Set .of (timestamp ),
198+ MetadataFilenameUtils ::getTimestamp ,
199+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
200+ );
201+ if (lockedMetadataFiles .isEmpty ()) {
202+ return null ;
203+ }
204+ if (lockedMetadataFiles .size () > 1 ) {
205+ throw new IOException (
206+ "Expected exactly one metadata file matching timestamp: " + timestamp + " but got " + lockedMetadataFiles
207+ );
208+ }
209+ String metadataFile = lockedMetadataFiles .iterator ().next ();
210+ RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile (metadataFile );
211+ if (remoteSegmentMetadata != null ) {
212+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>(remoteSegmentMetadata .getMetadata ());
213+ } else {
214+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>();
215+ }
216+ return remoteSegmentMetadata ;
217+ }
218+
179219 /**
180220 * Read the latest metadata file to get the list of segments uploaded to the remote segment store.
181221 * We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
@@ -324,7 +364,8 @@ public static String getMetadataFilename(
324364 long translogGeneration ,
325365 long uploadCounter ,
326366 int metadataVersion ,
327- String nodeId
367+ String nodeId ,
368+ long creationTimestamp
328369 ) {
329370 return String .join (
330371 SEPARATOR ,
@@ -334,11 +375,30 @@ public static String getMetadataFilename(
334375 RemoteStoreUtils .invertLong (translogGeneration ),
335376 RemoteStoreUtils .invertLong (uploadCounter ),
336377 String .valueOf (Objects .hash (nodeId )),
337- RemoteStoreUtils .invertLong (System . currentTimeMillis () ),
378+ RemoteStoreUtils .invertLong (creationTimestamp ),
338379 String .valueOf (metadataVersion )
339380 );
340381 }
341382
383+ public static String getMetadataFilename (
384+ long primaryTerm ,
385+ long generation ,
386+ long translogGeneration ,
387+ long uploadCounter ,
388+ int metadataVersion ,
389+ String nodeId
390+ ) {
391+ return getMetadataFilename (
392+ primaryTerm ,
393+ generation ,
394+ translogGeneration ,
395+ uploadCounter ,
396+ metadataVersion ,
397+ nodeId ,
398+ System .currentTimeMillis ()
399+ );
400+ }
401+
342402 // Visible for testing
343403 static long getPrimaryTerm (String [] filenameTokens ) {
344404 return RemoteStoreUtils .invertLong (filenameTokens [1 ]);
@@ -793,6 +853,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
793853 );
794854 return ;
795855 }
856+
796857 List <String > sortedMetadataFileList = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
797858 MetadataFilenameUtils .METADATA_PREFIX ,
798859 Integer .MAX_VALUE
@@ -806,16 +867,44 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
806867 return ;
807868 }
808869
809- List <String > metadataFilesEligibleToDelete = new ArrayList <>(
810- sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
870+ // Check last fetch status of pinned timestamps. If stale, return.
871+ if (RemoteStoreUtils .isPinnedTimestampStateStale ()) {
872+ logger .warn ("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale" );
873+ return ;
874+ }
875+
876+ Tuple <Long , Set <Long >> pinnedTimestampsState = RemoteStorePinnedTimestampService .getPinnedTimestamps ();
877+
878+ Set <String > implicitLockedFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
879+ sortedMetadataFileList ,
880+ pinnedTimestampsState .v2 (),
881+ metadataFilePinnedTimestampMap ,
882+ MetadataFilenameUtils ::getTimestamp ,
883+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
811884 );
812- Set <String > allLockFiles ;
885+ final Set <String > allLockFiles = new HashSet <>(implicitLockedFiles );
886+
813887 try {
814- allLockFiles = ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX );
888+ allLockFiles .addAll (
889+ ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX )
890+ );
815891 } catch (Exception e ) {
816892 logger .error ("Exception while fetching segment metadata lock files, skipping deleteStaleSegments" , e );
817893 return ;
818894 }
895+
896+ List <String > metadataFilesEligibleToDelete = new ArrayList <>(
897+ sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
898+ );
899+
900+ // Along with last N files, we need to keep files since last successful run of scheduler
901+ long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState .v1 ();
902+ metadataFilesEligibleToDelete = RemoteStoreUtils .filterOutMetadataFilesBasedOnAge (
903+ metadataFilesEligibleToDelete ,
904+ MetadataFilenameUtils ::getTimestamp ,
905+ lastSuccessfulFetchOfPinnedTimestamps
906+ );
907+
819908 List <String > metadataFilesToBeDeleted = metadataFilesEligibleToDelete .stream ()
820909 .filter (metadataFile -> allLockFiles .contains (metadataFile ) == false )
821910 .collect (Collectors .toList ());
0 commit comments