88
99package org .opensearch .gateway .remote ;
1010
11+ import org .opensearch .action .admin .cluster .node .stats .NodesStatsRequest ;
12+ import org .opensearch .action .admin .cluster .node .stats .NodesStatsResponse ;
1113import org .opensearch .action .admin .cluster .settings .ClusterUpdateSettingsResponse ;
14+ import org .opensearch .cluster .coordination .PersistedStateStats ;
15+ import org .opensearch .cluster .routing .IndexRoutingTable ;
1216import org .opensearch .common .blobstore .BlobPath ;
1317import org .opensearch .common .settings .Settings ;
18+ import org .opensearch .discovery .DiscoveryStats ;
19+ import org .opensearch .gateway .remote .model .RemoteRoutingTableBlobStore ;
20+ import org .opensearch .index .remote .RemoteStoreEnums ;
21+ import org .opensearch .index .remote .RemoteStorePathStrategy ;
1422import org .opensearch .remotestore .RemoteStoreBaseIntegTestCase ;
1523import org .opensearch .repositories .RepositoriesService ;
1624import org .opensearch .repositories .blobstore .BlobStoreRepository ;
1725import org .opensearch .test .OpenSearchIntegTestCase ;
1826import org .junit .Before ;
1927
2028import java .nio .charset .StandardCharsets ;
29+ import java .nio .file .Path ;
30+ import java .util .ArrayList ;
2131import java .util .Base64 ;
32+ import java .util .List ;
2233import java .util .Map ;
2334import java .util .concurrent .TimeUnit ;
35+ import java .util .concurrent .atomic .AtomicLong ;
2436
37+ import static org .opensearch .common .util .FeatureFlags .REMOTE_PUBLICATION_EXPERIMENTAL ;
2538import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT ;
2639import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING ;
2740import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .RETAINED_MANIFESTS ;
2841import static org .opensearch .gateway .remote .RemoteClusterStateCleanupManager .SKIP_CLEANUP_STATE_CHANGES ;
2942import static org .opensearch .gateway .remote .RemoteClusterStateService .REMOTE_CLUSTER_STATE_ENABLED_SETTING ;
43+ import static org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable .INDEX_ROUTING_TABLE ;
3044import static org .opensearch .indices .IndicesService .CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING ;
45+ import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY ;
3146
3247@ OpenSearchIntegTestCase .ClusterScope (scope = OpenSearchIntegTestCase .Scope .TEST , numDataNodes = 0 )
3348public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
3449
3550 private static final String INDEX_NAME = "test-index" ;
51+ private final RemoteStoreEnums .PathType pathType = RemoteStoreEnums .PathType .HASHED_PREFIX ;
3652
3753 @ Before
3854 public void setup () {
@@ -52,6 +68,11 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
5268 return indexStats ;
5369 }
5470
71+ private void initialTestSetup (int shardCount , int replicaCount , int dataNodeCount , int clusterManagerNodeCount , Settings settings ) {
72+ prepareCluster (clusterManagerNodeCount , dataNodeCount , INDEX_NAME , replicaCount , shardCount , settings );
73+ ensureGreen (INDEX_NAME );
74+ }
75+
5576 public void testRemoteCleanupTaskUpdated () {
5677 int shardCount = randomIntBetween (1 , 2 );
5778 int replicaCount = 1 ;
@@ -144,6 +165,102 @@ public void testRemoteCleanupDeleteStale() throws Exception {
144165 assertTrue (response .isAcknowledged ());
145166 }
146167
168+ public void testRemoteCleanupDeleteStaleIndexRoutingFiles () throws Exception {
169+ clusterSettingsSuppliedByTest = true ;
170+ Path segmentRepoPath = randomRepoPath ();
171+ Path translogRepoPath = randomRepoPath ();
172+ Path remoteRoutingTableRepoPath = randomRepoPath ();
173+ Settings .Builder settingsBuilder = Settings .builder ();
174+ settingsBuilder .put (
175+ buildRemoteStoreNodeAttributes (
176+ REPOSITORY_NAME ,
177+ segmentRepoPath ,
178+ REPOSITORY_2_NAME ,
179+ translogRepoPath ,
180+ REMOTE_ROUTING_TABLE_REPO ,
181+ remoteRoutingTableRepoPath ,
182+ false
183+ )
184+ );
185+ settingsBuilder .put (
186+ RemoteRoutingTableBlobStore .REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING .getKey (),
187+ RemoteStoreEnums .PathType .HASHED_PREFIX .toString ()
188+ )
189+ .put ("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY , REMOTE_ROUTING_TABLE_REPO )
190+ .put (REMOTE_PUBLICATION_EXPERIMENTAL , true );
191+
192+ int shardCount = randomIntBetween (1 , 2 );
193+ int replicaCount = 1 ;
194+ int dataNodeCount = shardCount * (replicaCount + 1 );
195+ int clusterManagerNodeCount = 1 ;
196+ initialTestSetup (shardCount , replicaCount , dataNodeCount , clusterManagerNodeCount , settingsBuilder .build ());
197+
198+ // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
199+ // to repository, if manifest files are less than that it means clean up has run
200+ updateClusterStateNTimes (RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1 );
201+
202+ RepositoriesService repositoriesService = internalCluster ().getClusterManagerNodeInstance (RepositoriesService .class );
203+ BlobStoreRepository repository = (BlobStoreRepository ) repositoriesService .repository (REPOSITORY_NAME );
204+ BlobPath baseMetadataPath = getBaseMetadataPath (repository );
205+
206+ BlobStoreRepository routingTableRepository = (BlobStoreRepository ) repositoriesService .repository (REMOTE_ROUTING_TABLE_REPO );
207+ List <IndexRoutingTable > indexRoutingTables = new ArrayList <>(getClusterState ().routingTable ().indicesRouting ().values ());
208+ BlobPath indexRoutingPath = getIndexRoutingPath (baseMetadataPath , indexRoutingTables .get (0 ).getIndex ().getUUID ());
209+ assertBusy (() -> {
210+ // There would be >=3 files as shards will transition from UNASSIGNED -> INIT -> STARTED state
211+ assertTrue (routingTableRepository .blobStore ().blobContainer (indexRoutingPath ).listBlobs ().size () >= 3 );
212+ });
213+
214+ RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster ().getClusterManagerNodeInstance (
215+ RemoteClusterStateCleanupManager .class
216+ );
217+
218+ // set cleanup interval to 100 ms to make the test faster
219+ ClusterUpdateSettingsResponse response = client ().admin ()
220+ .cluster ()
221+ .prepareUpdateSettings ()
222+ .setPersistentSettings (Settings .builder ().put (REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING .getKey (), "100ms" ))
223+ .get ();
224+
225+ assertTrue (response .isAcknowledged ());
226+ assertBusy (() -> assertEquals (100 , remoteClusterStateCleanupManager .getStaleFileDeletionTask ().getInterval ().getMillis ()));
227+
228+ String clusterManagerNode = internalCluster ().getClusterManagerName ();
229+ NodesStatsResponse nodesStatsResponse = client ().admin ()
230+ .cluster ()
231+ .prepareNodesStats (clusterManagerNode )
232+ .addMetric (NodesStatsRequest .Metric .DISCOVERY .metricName ())
233+ .get ();
234+ verifyIndexRoutingFilesDeletion (routingTableRepository , indexRoutingPath , nodesStatsResponse );
235+
236+ // disable the clean up to avoid race condition during shutdown
237+ response = client ().admin ()
238+ .cluster ()
239+ .prepareUpdateSettings ()
240+ .setPersistentSettings (Settings .builder ().put (REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING .getKey (), "-1" ))
241+ .get ();
242+ assertTrue (response .isAcknowledged ());
243+ }
244+
245+ private void verifyIndexRoutingFilesDeletion (
246+ BlobStoreRepository routingTableRepository ,
247+ BlobPath indexRoutingPath ,
248+ NodesStatsResponse nodesStatsResponse
249+ ) throws Exception {
250+ assertBusy (() -> { assertEquals (1 , routingTableRepository .blobStore ().blobContainer (indexRoutingPath ).listBlobs ().size ()); });
251+
252+ // Verify index routing files delete stats
253+ DiscoveryStats discoveryStats = nodesStatsResponse .getNodes ().get (0 ).getDiscoveryStats ();
254+ assertNotNull (discoveryStats .getClusterStateStats ());
255+ for (PersistedStateStats persistedStateStats : discoveryStats .getClusterStateStats ().getPersistenceStats ()) {
256+ Map <String , AtomicLong > extendedFields = persistedStateStats .getExtendedFields ();
257+ assertTrue (extendedFields .containsKey (RemotePersistenceStats .INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT ));
258+ long cleanupAttemptFailedCount = extendedFields .get (RemotePersistenceStats .INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT )
259+ .get ();
260+ assertEquals (0 , cleanupAttemptFailedCount );
261+ }
262+ }
263+
147264 private void updateClusterStateNTimes (int n ) {
148265 int newReplicaCount = randomIntBetween (0 , 3 );
149266 for (int i = n ; i > 0 ; i --) {
@@ -155,4 +272,22 @@ private void updateClusterStateNTimes(int n) {
155272 assertTrue (response .isAcknowledged ());
156273 }
157274 }
275+
276+ private BlobPath getBaseMetadataPath (BlobStoreRepository repository ) {
277+ return repository .basePath ()
278+ .add (
279+ Base64 .getUrlEncoder ()
280+ .withoutPadding ()
281+ .encodeToString (getClusterState ().getClusterName ().value ().getBytes (StandardCharsets .UTF_8 ))
282+ )
283+ .add ("cluster-state" )
284+ .add (getClusterState ().metadata ().clusterUUID ());
285+ }
286+
287+ private BlobPath getIndexRoutingPath (BlobPath baseMetadataPath , String indexUUID ) {
288+ return pathType .path (
289+ RemoteStorePathStrategy .PathInput .builder ().basePath (baseMetadataPath .add (INDEX_ROUTING_TABLE )).indexUUID (indexUUID ).build (),
290+ RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64
291+ );
292+ }
158293}
0 commit comments