diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheIT.java new file mode 100644 index 0000000000000..db74f2767b922 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheIT.java @@ -0,0 +1,278 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.transport.client.Client; + +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +/** + * Integration tests for File Cache Prune API. + * Validates cache pruning with real data in cluster environment. + * + * @opensearch.internal + */ +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class PruneFileCacheIT extends AbstractSnapshotIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Settings.Builder randomRepositorySettings() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()).put("compress", randomBoolean()); + settings.put(FsRepository.BASE_PATH_SETTING.getKey(), "file_cache_prune_it"); + return settings; + } + + /** + * Tests file cache pruning with real data on single warm node. + */ + public void testPruneCacheWithRealData() throws Exception { + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final String repoName = "test-repo"; + final String snapshotName = "test-snap"; + final Client client = client(); + + logger.info("--> Create index with documents on data node"); + internalCluster().ensureAtLeastNumDataNodes(1); + createIndexWithDocsAndEnsureGreen(0, 100, indexName); + + logger.info("--> Create repository and take snapshot"); + createRepositoryWithSettings(null, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + deleteIndicesAndEnsureGreen(client, indexName); + + logger.info("--> Start warm node and restore as searchable snapshot"); + internalCluster().ensureAtLeastNumWarmNodes(1); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + logger.info("--> Trigger cache population by running multiple queries"); + // Run multiple queries to ensure cache is populated + for (int i = 0; i < 3; i++) { + assertDocCount(restoredIndexName, 100L); + } + + assertBusy(() -> { + long usage = getFileCacheUsage(); + assertTrue("Cache should be populated after index access", usage > 0); + }, 30, TimeUnit.SECONDS); + + long usageBefore = getFileCacheUsage(); + logger.info("--> File cache usage before prune: {} bytes", usageBefore); + assertTrue("File cache should have data before prune", usageBefore > 0); + + PruneFileCacheRequest request = new PruneFileCacheRequest(); + PlainActionFuture future = new PlainActionFuture<>(); + client.execute(PruneFileCacheAction.INSTANCE, request, future); + PruneFileCacheResponse response = future.actionGet(); + + logger.info("--> Prune response: pruned {} bytes from {} nodes", response.getTotalPrunedBytes(), response.getNodes().size()); + + // Verify response first - this is the key assertion + assertNotNull("Response should not be null", response); + assertEquals("Should have 1 successful node", 1, response.getNodes().size()); + assertEquals("Should have no failures", 0, response.failures().size()); + assertTrue("Operation should be successful", response.isCompletelySuccessful()); + assertTrue("Operation should be acknowledged", response.isAcknowledged()); + + // The key assertion: pruned bytes should be > 0 (proves API actually worked) + assertTrue("Should have pruned bytes", response.getTotalPrunedBytes() > 0); + + // Verify cache usage after prune + long usageAfter = getFileCacheUsage(); + logger.info("--> File cache usage after prune: {} bytes", usageAfter); + + // Cache should be reduced (might not be zero if files are still referenced) + assertTrue("Cache usage should be reduced after prune", usageAfter <= usageBefore); + + // The pruned bytes should roughly match the reduction + long actualReduction = usageBefore - usageAfter; + logger.info("--> Actual cache reduction: {} bytes, reported pruned: {} bytes", actualReduction, response.getTotalPrunedBytes()); + + assertDocCount(restoredIndexName, 100L); + } + + /** + * Tests prune API response structure and metrics validation. + */ + public void testPruneResponseMetrics() throws Exception { + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final String repoName = "test-repo"; + final String snapshotName = "test-snap"; + final Client client = client(); + + logger.info("--> Setup simple scenario to test API response metrics"); + internalCluster().ensureAtLeastNumDataNodes(1); + createIndexWithDocsAndEnsureGreen(0, 100, indexName); + + createRepositoryWithSettings(null, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + deleteIndicesAndEnsureGreen(client, indexName); + + internalCluster().ensureAtLeastNumWarmNodes(1); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + logger.info("--> Populate cache and measure before state"); + assertDocCount(restoredIndexName, 100L); + + assertBusy(() -> { + long usage = getFileCacheUsage(); + assertTrue("Cache should be populated", usage > 0); + }, 30, TimeUnit.SECONDS); + + long usageBefore = getFileCacheUsage(); + + PruneFileCacheRequest request = new PruneFileCacheRequest(); + PlainActionFuture future = new PlainActionFuture<>(); + client.execute(PruneFileCacheAction.INSTANCE, request, future); + PruneFileCacheResponse response = future.actionGet(); + + assertNotNull("Response should not be null", response); + assertTrue("Should report acknowledged", response.isAcknowledged()); + assertEquals("Should target 1 warm node", 1, response.getNodes().size()); + assertEquals("Should have 0 failures", 0, response.failures().size()); + assertTrue("Should be successful", response.isCompletelySuccessful()); + + NodePruneFileCacheResponse nodeResponse = response.getNodes().get(0); + assertNotNull("Node response should not be null", nodeResponse); + assertTrue("Node should have cache capacity", nodeResponse.getCacheCapacity() > 0); + assertTrue("Node should report pruned bytes", nodeResponse.getPrunedBytes() >= 0); + + long usageAfter = getFileCacheUsage(); + long expectedPruned = usageBefore - usageAfter; + assertEquals("Response should match actual cache reduction", expectedPruned, response.getTotalPrunedBytes()); + } + + /** + * Creates index with documents and ensures cluster health is green. + */ + private void createIndexWithDocsAndEnsureGreen(int numReplicas, int numDocs, String indexName) throws InterruptedException { + createIndex( + indexName, + Settings.builder() + .put(SETTING_NUMBER_OF_REPLICAS, numReplicas) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) + .build() + ); + ensureGreen(); + indexRandomDocs(indexName, numDocs); + ensureGreen(); + } + + /** + * Creates snapshot repository with optional custom settings. + */ + private void createRepositoryWithSettings(Settings.Builder repositorySettings, String repoName) { + if (repositorySettings == null) { + createRepository(repoName, FsRepository.TYPE); + } else { + createRepository(repoName, FsRepository.TYPE, repositorySettings); + } + } + + /** + * Creates snapshot and validates success. + */ + private void takeSnapshot(Client client, String snapshotName, String repoName, String... indices) { + final var response = client.admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .setIndices(indices) + .get(); + + assertThat(response.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(response.getSnapshotInfo().successfulShards(), equalTo(response.getSnapshotInfo().totalShards())); + } + + /** + * Deletes indices and ensures cluster health is green. + */ + private void deleteIndicesAndEnsureGreen(Client client, String... indices) { + assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged()); + ensureGreen(); + } + + /** + * Restores snapshot as searchable snapshot and ensures cluster health is green. + */ + private void restoreSnapshotAndEnsureGreen(Client client, String snapshotName, String repoName) { + client.admin() + .cluster() + .prepareRestoreSnapshot(repoName, snapshotName) + .setRenamePattern("(.+)") + .setRenameReplacement("$1-copy") + .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) + .setWaitForCompletion(true) + .execute() + .actionGet(); + ensureGreen(); + } + + /** + * Validates that indices are configured as remote snapshot type. + */ + private void assertRemoteSnapshotIndexSettings(Client client, String... indexNames) { + var settingsResponse = client.admin().indices().prepareGetSettings(indexNames).execute().actionGet(); + + assertEquals(indexNames.length, settingsResponse.getIndexToSettings().keySet().size()); + + for (String indexName : indexNames) { + assertEquals( + IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey(), + settingsResponse.getSetting(indexName, IndexModule.INDEX_STORE_TYPE_SETTING.getKey()) + ); + } + } + + /** + * Returns total file cache usage across all warm nodes in bytes. + */ + private long getFileCacheUsage() { + NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet(); + + long totalUsage = 0L; + for (NodeStats stats : response.getNodes()) { + if (stats.getNode().isWarmNode()) { + AggregateFileCacheStats fcStats = stats.getFileCacheStats(); + if (fcStats != null) { + totalUsage += fcStats.getUsed().getBytes(); + } + } + } + return totalUsage; + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 12fbabf341c41..c4a78c288c26e 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -46,6 +46,8 @@ import org.opensearch.action.admin.cluster.decommission.awareness.get.TransportGetDecommissionStateAction; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheAction; +import org.opensearch.action.admin.cluster.filecache.TransportPruneFileCacheAction; import org.opensearch.action.admin.cluster.health.ClusterHealthAction; import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -378,6 +380,7 @@ import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction; import org.opensearch.rest.action.admin.cluster.RestNodesUsageAction; import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction; +import org.opensearch.rest.action.admin.cluster.RestPruneCacheAction; import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; @@ -661,6 +664,7 @@ public void reg actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class); + actions.register(PruneFileCacheAction.INSTANCE, TransportPruneFileCacheAction.class); actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class); actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class); actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class); @@ -870,6 +874,8 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestClusterRerouteAction(settingsFilter)); registerHandler.accept(new RestClusterSearchShardsAction()); registerHandler.accept(new RestPendingClusterTasksAction()); + // FileCache API + registerHandler.accept(new RestPruneCacheAction()); registerHandler.accept(new RestPutRepositoryAction()); registerHandler.accept(new RestGetRepositoriesAction(settingsFilter)); registerHandler.accept(new RestDeleteRepositoryAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/NodePruneFileCacheResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/NodePruneFileCacheResponse.java new file mode 100644 index 0000000000000..bc62d0e54ddde --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/NodePruneFileCacheResponse.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response for pruning remote file cache operation from a single node. + * Contains essential metrics about the cache operation including freed bytes + * and cache capacity. + * + * @opensearch.internal + */ +public class NodePruneFileCacheResponse extends BaseNodeResponse implements ToXContentFragment { + + private final long prunedBytes; + private final long cacheCapacity; + + public NodePruneFileCacheResponse(StreamInput in) throws IOException { + super(in); + this.prunedBytes = in.readLong(); + this.cacheCapacity = in.readLong(); + } + + public NodePruneFileCacheResponse(DiscoveryNode node, long prunedBytes, long cacheCapacity) { + super(node); + this.prunedBytes = prunedBytes; + this.cacheCapacity = cacheCapacity; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(prunedBytes); + out.writeLong(cacheCapacity); + } + + public long getPrunedBytes() { + return prunedBytes; + } + + public long getCacheCapacity() { + return cacheCapacity; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if ((o instanceof NodePruneFileCacheResponse) == false) return false; + if (!super.equals(o)) return false; + NodePruneFileCacheResponse that = (NodePruneFileCacheResponse) o; + return prunedBytes == that.prunedBytes && cacheCapacity == that.cacheCapacity; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), prunedBytes, cacheCapacity); + } + + @Override + public String toString() { + return "NodePruneFileCacheResponse{" + + "node=" + + getNode().getId() + + ", prunedBytes=" + + prunedBytes + + ", cacheCapacity=" + + cacheCapacity + + '}'; + } + + /** + * Serializes this node response to XContent format. + * Outputs node identification fields and cache operation metrics. + * + * @param builder the XContent builder + * @param params serialization parameters + * @return the XContent builder for method chaining + * @throws IOException if an I/O error occurs during serialization + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("name", getNode().getName()); + builder.field("transport_address", getNode().getAddress().toString()); + builder.field("host", getNode().getHostName()); + builder.field("ip", getNode().getHostAddress()); + builder.field("pruned_bytes", prunedBytes); + builder.field("cache_capacity", cacheCapacity); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheAction.java new file mode 100644 index 0000000000000..0ae1196333420 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.action.ActionType; + +/** + * Transport action to prune file cache + * + * @opensearch.internal + */ +public class PruneFileCacheAction extends ActionType { + + public static final PruneFileCacheAction INSTANCE = new PruneFileCacheAction(); + public static final String NAME = "cluster:admin/filecache/prune"; + + private PruneFileCacheAction() { + super(NAME, PruneFileCacheResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequest.java new file mode 100644 index 0000000000000..7e6be591cabcb --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequest.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request for pruning remote file cache across multiple nodes. + * Supports node targeting for efficient cache management. + * + * @opensearch.internal + */ +public class PruneFileCacheRequest extends BaseNodesRequest { + + public PruneFileCacheRequest() { + super((String[]) null); + } + + public PruneFileCacheRequest(String... nodesIds) { + super(nodesIds); + } + + public PruneFileCacheRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheResponse.java new file mode 100644 index 0000000000000..9696882332ab7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheResponse.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Response for pruning remote file cache across multiple nodes. + * Aggregates individual node responses and provides comprehensive operational visibility + * including per-node metrics, cluster-wide summaries, and failure tracking. + * + * @opensearch.internal + */ +public class PruneFileCacheResponse extends BaseNodesResponse implements ToXContentObject { + + /** + * Constructor for stream input deserialization + * + * @param in the stream input + * @throws IOException if an I/O exception occurs + */ + public PruneFileCacheResponse(StreamInput in) throws IOException { + super(in); + } + + /** + * Constructor for multi-node response aggregation + * + * @param clusterName the cluster name + * @param nodes the successful node responses + * @param failures the failed node responses + */ + public PruneFileCacheResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodePruneFileCacheResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + builder.field("acknowledged", true); + + long totalPrunedBytes = getTotalPrunedBytes(); + builder.field("total_pruned_bytes", totalPrunedBytes); + + long totalCacheCapacity = 0; + + builder.startObject("summary"); + builder.field("total_nodes_targeted", getNodes().size() + failures().size()); + builder.field("successful_nodes", getNodes().size()); + builder.field("failed_nodes", failures().size()); + for (NodePruneFileCacheResponse nodeResponse : getNodes()) { + if (nodeResponse != null) { + totalCacheCapacity += nodeResponse.getCacheCapacity(); + } + } + + builder.field("total_cache_capacity", totalCacheCapacity); + builder.endObject(); + + builder.startObject("nodes"); + for (NodePruneFileCacheResponse nodeResponse : getNodes()) { + if (nodeResponse != null && nodeResponse.getNode() != null) { + builder.startObject(nodeResponse.getNode().getId()); + nodeResponse.toXContent(builder, params); + builder.endObject(); + } + } + builder.endObject(); + + if (!failures().isEmpty()) { + builder.startArray("failures"); + for (FailedNodeException failure : failures()) { + builder.startObject(); + builder.field("node_id", failure.nodeId()); + builder.field("reason", failure.getDetailedMessage()); + builder.field("caused_by", failure.getCause() != null ? failure.getCause().getClass().getSimpleName() : null); + builder.endObject(); + } + builder.endArray(); + } + + builder.endObject(); + return builder; + } + + /** + * Calculate total bytes pruned across all successful nodes + * + * @return total bytes freed by all successful prune operations + */ + public long getTotalPrunedBytes() { + return getNodes().stream().filter(Objects::nonNull).mapToLong(NodePruneFileCacheResponse::getPrunedBytes).sum(); + } + + /** + * Check if the operation was partially successful (some nodes succeeded, some failed) + * + * @return true if some nodes succeeded and some failed + */ + // VisibleForTesting + public boolean isPartiallySuccessful() { + return getNodes().isEmpty() == false && failures().isEmpty() == false; + } + + /** + * Check if the operation was completely successful (all targeted nodes succeeded) + * + * @return true if all targeted nodes succeeded + */ + // VisibleForTesting + public boolean isCompletelySuccessful() { + return getNodes().isEmpty() == false && failures().isEmpty(); + } + + /** + * @return whether the operation was acknowledged (always true for multi-node responses) + */ + public boolean isAcknowledged() { + return true; + } + + /** + * @return total bytes freed across all nodes + */ + public long getPrunedBytes() { + return getTotalPrunedBytes(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheAction.java new file mode 100644 index 0000000000000..42519afdfa984 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheAction.java @@ -0,0 +1,154 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Transport action for pruning remote file cache across multiple nodes. + * + * @opensearch.internal + */ +public class TransportPruneFileCacheAction extends TransportNodesAction< + PruneFileCacheRequest, + PruneFileCacheResponse, + TransportPruneFileCacheAction.NodeRequest, + NodePruneFileCacheResponse> { + + private final FileCache fileCache; + + @Inject + public TransportPruneFileCacheAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + @Nullable FileCache fileCache + ) { + super( + PruneFileCacheAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + PruneFileCacheRequest::new, + NodeRequest::new, + ThreadPool.Names.MANAGEMENT, + NodePruneFileCacheResponse.class + ); + this.fileCache = fileCache; + } + + @Override + protected void resolveRequest(PruneFileCacheRequest request, ClusterState clusterState) { + assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; + + List allWarmNodes = new ArrayList<>(clusterState.nodes().getWarmNodes().values()); + + List warmNodes; + if (request.nodesIds() != null && request.nodesIds().length > 0) { + String[] resolvedNodeIds = clusterState.nodes().resolveNodes(request.nodesIds()); + Set requestedIds = Set.of(resolvedNodeIds); + + warmNodes = allWarmNodes.stream().filter(node -> requestedIds.contains(node.getId())).collect(Collectors.toList()); + + if (warmNodes.isEmpty()) { + throw new IllegalArgumentException( + "No warm nodes found matching the specified criteria. " + "FileCache operations can only target warm nodes." + ); + } + } else { + warmNodes = allWarmNodes; + } + + request.setConcreteNodes(warmNodes.toArray(new DiscoveryNode[warmNodes.size()])); + } + + @Override + protected PruneFileCacheResponse newResponse( + PruneFileCacheRequest request, + List responses, + List failures + ) { + return new PruneFileCacheResponse(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeRequest newNodeRequest(PruneFileCacheRequest request) { + return new NodeRequest(request); + } + + @Override + protected NodePruneFileCacheResponse newNodeResponse(StreamInput in) throws IOException { + return new NodePruneFileCacheResponse(in); + } + + @Override + protected NodePruneFileCacheResponse nodeOperation(NodeRequest nodeRequest) { + if (fileCache == null) { + return new NodePruneFileCacheResponse(transportService.getLocalNode(), 0, 0); + } + + try { + long capacity = fileCache.capacity(); + long prunedBytes = fileCache.prune(); + + return new NodePruneFileCacheResponse(transportService.getLocalNode(), prunedBytes, capacity); + + } catch (Exception e) { + throw new RuntimeException("FileCache prune operation failed on node " + transportService.getLocalNode().getId(), e); + } + } + + /** + * Node-level request for cache pruning operation. + */ + public static class NodeRequest extends TransportRequest { + private PruneFileCacheRequest request; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new PruneFileCacheRequest(in); + } + + public NodeRequest(PruneFileCacheRequest request) { + this.request = Objects.requireNonNull(request, "PruneFileCacheRequest cannot be null"); + } + + public PruneFileCacheRequest getRequest() { + return request; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/filecache/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/package-info.java new file mode 100644 index 0000000000000..75a43a991e806 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/filecache/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * File cache management actions for cluster administration. + */ +package org.opensearch.action.admin.cluster.filecache; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ae8299ee7ccb5..f81db57250e56 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1595,6 +1595,11 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); b.bind(IndicesService.class).toInstance(indicesService); b.bind(RemoteStoreStatsTrackerFactory.class).toInstance(remoteStoreStatsTrackerFactory); + if (fileCache != null) { + b.bind(FileCache.class).toInstance(fileCache); + } else { + b.bind(FileCache.class).toProvider(Providers.of(null)); + } b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); b.bind(AwarenessReplicaBalance.class).toInstance(awarenessReplicaBalance); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheAction.java new file mode 100644 index 0000000000000..b52ef228e8491 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheAction.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheAction; +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheRequest; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestRequest.Method; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.transport.client.node.NodeClient; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static java.util.Collections.singletonList; + +/** + * REST action to manually trigger FileCache prune operation across multiple nodes. + * Supports node targeting for efficient cache management. + * + * @opensearch.api + */ +public class RestPruneCacheAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList(new Route(Method.POST, "/_filecache/prune")); + } + + @Override + public String getName() { + return "prune_filecache_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] nodeIds = parseNodeTargeting(request); + + PruneFileCacheRequest pruneFileCacheRequest = new PruneFileCacheRequest(nodeIds); + pruneFileCacheRequest.timeout(request.paramAsTime("timeout", pruneFileCacheRequest.timeout())); + + return channel -> client.execute(PruneFileCacheAction.INSTANCE, pruneFileCacheRequest, new RestToXContentListener<>(channel)); + } + + private String[] parseNodeTargeting(RestRequest request) { + String nodes = request.param("nodes"); + String node = request.param("node"); + + if (nodes != null && !nodes.trim().isEmpty()) { + String[] parsed = Strings.splitStringByCommaToArray(nodes); + return Arrays.stream(parsed).filter(s -> s != null && !s.trim().isEmpty()).map(String::trim).toArray(String[]::new); + } else if (node != null && !node.trim().isEmpty()) { + return new String[] { node.trim() }; + } + + return null; + } + + @Override + public boolean canTripCircuitBreaker() { + return false; + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequestResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequestResponseTests.java new file mode 100644 index 0000000000000..4b71771e29f1d --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/filecache/PruneFileCacheRequestResponseTests.java @@ -0,0 +1,220 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.Version; +import org.opensearch.action.FailedNodeException; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Tests for {@link PruneFileCacheRequest} and {@link PruneFileCacheResponse} with enhanced multi-node architecture. + * Covers node targeting and rich response aggregation. + */ +public class PruneFileCacheRequestResponseTests extends OpenSearchTestCase { + + private DiscoveryNode createRealNode(String nodeId, String nodeName) { + return new DiscoveryNode( + nodeName, + nodeId, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.emptySet(), + Version.CURRENT + ); + } + + private PruneFileCacheResponse createMultiNodeResponse(long... prunedBytesPerNode) { + ClusterName clusterName = new ClusterName("test-cluster"); + + List nodeResponses = Arrays.asList(); + if (prunedBytesPerNode.length > 0) { + nodeResponses = new java.util.ArrayList<>(); + for (int i = 0; i < prunedBytesPerNode.length; i++) { + DiscoveryNode node = createRealNode("node" + i, "test-node-" + i); + nodeResponses.add(new NodePruneFileCacheResponse(node, prunedBytesPerNode[i], 10737418240L)); + } + } + + List failures = Collections.emptyList(); + + return new PruneFileCacheResponse(clusterName, nodeResponses, failures); + } + + public void testPruneCacheRequestSerialization() throws IOException { + PruneFileCacheRequest originalRequest = new PruneFileCacheRequest("node1", "node2"); + + BytesStreamOutput out = new BytesStreamOutput(); + originalRequest.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + PruneFileCacheRequest deserializedRequest = new PruneFileCacheRequest(in); + + assertArrayEquals(originalRequest.nodesIds(), deserializedRequest.nodesIds()); + } + + public void testPruneCacheRequestDefaultParameters() throws IOException { + PruneFileCacheRequest originalRequest = new PruneFileCacheRequest(); + + BytesStreamOutput out = new BytesStreamOutput(); + originalRequest.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + PruneFileCacheRequest deserializedRequest = new PruneFileCacheRequest(in); + + assertEquals(originalRequest.timeout(), deserializedRequest.timeout()); + } + + public void testPruneCacheResponseMultiNodeSerialization() throws IOException { + PruneFileCacheResponse originalResponse = createMultiNodeResponse(1048576L, 2097152L); + + BytesStreamOutput out = new BytesStreamOutput(); + originalResponse.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + PruneFileCacheResponse deserializedResponse = new PruneFileCacheResponse(in); + assertEquals("Node count should match", 2, deserializedResponse.getNodes().size()); + assertEquals("Total pruned bytes should match", 3145728L, deserializedResponse.getTotalPrunedBytes()); + assertEquals("getPrunedBytes should work", 3145728L, deserializedResponse.getPrunedBytes()); + assertTrue("Should be acknowledged", deserializedResponse.isAcknowledged()); + assertTrue("Should be completely successful", deserializedResponse.isCompletelySuccessful()); + assertFalse("Should not be partially successful", deserializedResponse.isPartiallySuccessful()); + } + + public void testPruneCacheResponseWithFailures() { + ClusterName clusterName = new ClusterName("test-cluster"); + List successfulNodes = Arrays.asList( + new NodePruneFileCacheResponse(createRealNode("node1", "successful-node"), 1048576L, 10737418240L) + ); + List failures = Arrays.asList( + new FailedNodeException("node2", "Cache operation failed", new RuntimeException("Test failure")) + ); + + PruneFileCacheResponse response = new PruneFileCacheResponse(clusterName, successfulNodes, failures); + + assertEquals("Should have one successful node", 1, response.getNodes().size()); + assertEquals("Should have one failure", 1, response.failures().size()); + assertTrue("Should be partially successful", response.isPartiallySuccessful()); + assertFalse("Should not be completely successful", response.isCompletelySuccessful()); + } + + public void testPruneCacheResponseEnhancedJSON() throws IOException { + PruneFileCacheResponse response = createMultiNodeResponse(1048576L, 2097152L); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + + String jsonString = builder.toString(); + assertTrue("Should contain acknowledged field", jsonString.contains("\"acknowledged\":true")); + assertTrue("Should contain total_pruned_bytes", jsonString.contains("\"total_pruned_bytes\":3145728")); + assertTrue("Should contain summary section", jsonString.contains("\"summary\"")); + assertTrue("Should contain nodes section", jsonString.contains("\"nodes\"")); + assertTrue("Should contain successful_nodes count", jsonString.contains("\"successful_nodes\":2")); + assertTrue("Should contain failed_nodes count", jsonString.contains("\"failed_nodes\":0")); + } + + public void testPruneCacheResponseJSONWithFailures() throws IOException { + ClusterName clusterName = new ClusterName("test-cluster"); + List successfulNodes = Arrays.asList( + new NodePruneFileCacheResponse(createRealNode("node1", "good-node"), 1048576L, 10737418240L) + ); + List failures = Arrays.asList( + new FailedNodeException("node2", "Test failure message", new RuntimeException("Root cause")) + ); + + PruneFileCacheResponse response = new PruneFileCacheResponse(clusterName, successfulNodes, failures); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + + String jsonString = builder.toString(); + + assertTrue("Should contain failures array", jsonString.contains("\"failures\"")); + assertTrue("Should contain failed node ID", jsonString.contains("\"node_id\":\"node2\"")); + assertTrue("Should contain failure reason", jsonString.contains("\"reason\"")); + assertTrue("Should contain caused_by", jsonString.contains("\"caused_by\":\"RuntimeException\"")); + } + + public void testPruneCacheResponseScenarios() { + PruneFileCacheResponse zeroResponse = createMultiNodeResponse(0L, 0L); + assertEquals("Zero bytes should sum correctly", 0L, zeroResponse.getTotalPrunedBytes()); + + PruneFileCacheResponse mixedResponse = createMultiNodeResponse(1048576L, 0L, 2097152L); + assertEquals("Mixed results should sum correctly", 3145728L, mixedResponse.getTotalPrunedBytes()); + + PruneFileCacheResponse largeResponse = createMultiNodeResponse(Long.MAX_VALUE / 2, Long.MAX_VALUE / 2); + assertEquals("Large values should sum correctly", Long.MAX_VALUE - 1, largeResponse.getTotalPrunedBytes()); + } + + public void testPruneCacheRequestValidation() { + PruneFileCacheRequest request = new PruneFileCacheRequest(); + assertNull("Validation should be null", request.validate()); + + PruneFileCacheRequest targetedRequest = new PruneFileCacheRequest("node1", "node2"); + assertNull("Targeted request validation should be null", targetedRequest.validate()); + } + + public void testPruneCacheRequestNodeTargeting() { + PruneFileCacheRequest defaultRequest = new PruneFileCacheRequest(); + assertNull("Default request should have null nodeIds", defaultRequest.nodesIds()); + + PruneFileCacheRequest singleNodeRequest = new PruneFileCacheRequest("node1"); + assertArrayEquals("Single node should be set", new String[] { "node1" }, singleNodeRequest.nodesIds()); + + PruneFileCacheRequest multiNodeRequest = new PruneFileCacheRequest("node1", "node2", "node3"); + assertArrayEquals("Multiple nodes should be set", new String[] { "node1", "node2", "node3" }, multiNodeRequest.nodesIds()); + + PruneFileCacheRequest emptyRequest = new PruneFileCacheRequest(new String[0]); + assertArrayEquals("Empty array should be preserved", new String[0], emptyRequest.nodesIds()); + } + + public void testNodePruneCacheResponseSerialization() throws IOException { + DiscoveryNode realNode = createRealNode("test-node", "test-node-name"); + NodePruneFileCacheResponse originalResponse = new NodePruneFileCacheResponse(realNode, 1048576L, 10737418240L); + + BytesStreamOutput out = new BytesStreamOutput(); + originalResponse.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + NodePruneFileCacheResponse deserializedResponse = new NodePruneFileCacheResponse(in); + assertEquals("Pruned bytes should match", originalResponse.getPrunedBytes(), deserializedResponse.getPrunedBytes()); + assertEquals("Cache capacity should match", originalResponse.getCacheCapacity(), deserializedResponse.getCacheCapacity()); + } + + public void testResponseAggregationEdgeCases() { + PruneFileCacheResponse emptyResponse = new PruneFileCacheResponse( + new ClusterName("test"), + Collections.emptyList(), + Collections.emptyList() + ); + assertEquals("Empty response total should be 0", 0L, emptyResponse.getTotalPrunedBytes()); + assertFalse("Empty response should not be completely successful (no nodes processed)", emptyResponse.isCompletelySuccessful()); + assertFalse("Empty response should not be partially successful (no nodes processed)", emptyResponse.isPartiallySuccessful()); + + PruneFileCacheResponse allFailureResponse = new PruneFileCacheResponse( + new ClusterName("test"), + Collections.emptyList(), + Arrays.asList(new FailedNodeException("node1", "Test failure", new RuntimeException())) + ); + assertEquals("All-failure response total should be 0", 0L, allFailureResponse.getTotalPrunedBytes()); + assertFalse("All-failure response should not be successful", allFailureResponse.isCompletelySuccessful()); + assertFalse("All-failure response should not be partially successful", allFailureResponse.isPartiallySuccessful()); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheActionTests.java new file mode 100644 index 0000000000000..04493d3f351bb --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/filecache/TransportPruneFileCacheActionTests.java @@ -0,0 +1,267 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.filecache; + +import org.opensearch.Version; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link TransportPruneFileCacheAction} using TransportNodesAction pattern. + * Tests enhanced multi-node architecture and warm node intelligence. + */ +public class TransportPruneFileCacheActionTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + private CapturingTransport transport; + private TransportService transportService; + private ActionFilters actionFilters; + private FileCache fileCache; + private TransportPruneFileCacheAction action; + + @Before + public void setUp() throws Exception { + super.setUp(); + + threadPool = new TestThreadPool("test"); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> clusterService.localNode(), + null, + Collections.emptySet(), + null + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + actionFilters = new ActionFilters(Collections.emptySet()); + fileCache = mock(FileCache.class); + + setupClusterWithWarmNodes(); + + action = new TransportPruneFileCacheAction(threadPool, clusterService, transportService, actionFilters, fileCache); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + transport.close(); + clusterService.close(); + } + + private void setupClusterWithWarmNodes() { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + + DiscoveryNode warmNode1 = new DiscoveryNode( + "warm-node-1", + "warm-node-1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Set.of(DiscoveryNodeRole.WARM_ROLE), + Version.CURRENT + ); + DiscoveryNode warmNode2 = new DiscoveryNode( + "warm-node-2", + "warm-node-2", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Set.of(DiscoveryNodeRole.WARM_ROLE), + Version.CURRENT + ); + + DiscoveryNode dataNode1 = new DiscoveryNode( + "data-node-1", + "data-node-1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Set.of(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + + nodesBuilder.add(warmNode1).add(warmNode2).add(dataNode1); + nodesBuilder.localNodeId(warmNode1.getId()); + nodesBuilder.clusterManagerNodeId(warmNode1.getId()); + + ClusterState clusterState = ClusterState.builder(clusterService.getClusterName()).nodes(nodesBuilder).build(); + + setState(clusterService, clusterState); + } + + public void testRequestPreparationAndTargeting() { + PruneFileCacheRequest request = new PruneFileCacheRequest(); + + TransportPruneFileCacheAction.NodeRequest nodeRequest = action.newNodeRequest(request); + assertNotNull("Node request should not be null", nodeRequest); + assertEquals("Node request should wrap original request", request, nodeRequest.getRequest()); + } + + public void testNodeOperation() { + when(fileCache.capacity()).thenReturn(10737418240L); + when(fileCache.prune()).thenReturn(1048576L); + + PruneFileCacheRequest globalRequest = new PruneFileCacheRequest(); + TransportPruneFileCacheAction.NodeRequest nodeRequest = new TransportPruneFileCacheAction.NodeRequest(globalRequest); + + NodePruneFileCacheResponse response = action.nodeOperation(nodeRequest); + + assertNotNull("Response should not be null", response); + assertEquals("Pruned bytes should match", 1048576L, response.getPrunedBytes()); + assertEquals("Capacity should match", 10737418240L, response.getCacheCapacity()); + + verify(fileCache).prune(); + verify(fileCache).capacity(); + } + + public void testNullFileCache() { + + CapturingTransport nullTransport = new CapturingTransport(); + TransportService nullTransportService = nullTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> clusterService.localNode(), + null, + Collections.emptySet(), + null + ); + nullTransportService.start(); + nullTransportService.acceptIncomingRequests(); + + try { + TransportPruneFileCacheAction nullCacheAction = new TransportPruneFileCacheAction( + threadPool, + clusterService, + nullTransportService, + actionFilters, + null + ); + + PruneFileCacheRequest globalRequest = new PruneFileCacheRequest(); + TransportPruneFileCacheAction.NodeRequest nodeRequest = new TransportPruneFileCacheAction.NodeRequest(globalRequest); + + NodePruneFileCacheResponse response = nullCacheAction.nodeOperation(nodeRequest); + + assertEquals("Pruned bytes should be 0 for null cache", 0L, response.getPrunedBytes()); + assertEquals("Capacity should be 0 for null cache", 0L, response.getCacheCapacity()); + } finally { + nullTransportService.close(); + nullTransport.close(); + } + } + + public void testWarmNodeResolution() { + + PruneFileCacheRequest defaultRequest = new PruneFileCacheRequest(); + + try { + action.resolveRequest(defaultRequest, clusterService.state()); + + assertEquals("Should resolve to 2 warm nodes", 2, defaultRequest.concreteNodes().length); + assertTrue( + "Should include warm-node-1", + Arrays.stream(defaultRequest.concreteNodes()).anyMatch(node -> "warm-node-1".equals(node.getId())) + ); + assertTrue( + "Should include warm-node-2", + Arrays.stream(defaultRequest.concreteNodes()).anyMatch(node -> "warm-node-2".equals(node.getId())) + ); + } catch (IllegalArgumentException e) { + fail("Default request should not fail: " + e.getMessage()); + } + } + + public void testSpecificNodeTargeting() { + + PruneFileCacheRequest specificRequest = new PruneFileCacheRequest("warm-node-1"); + + action.resolveRequest(specificRequest, clusterService.state()); + + assertEquals("Should resolve to 1 warm node", 1, specificRequest.concreteNodes().length); + assertEquals("Should target warm-node-1", "warm-node-1", specificRequest.concreteNodes()[0].getId()); + } + + public void testInvalidNodeTargeting() { + + PruneFileCacheRequest invalidRequest = new PruneFileCacheRequest("data-node-1"); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> action.resolveRequest(invalidRequest, clusterService.state()) + ); + + assertTrue( + "Error message should mention warm nodes", + exception.getMessage().contains("FileCache operations can only target warm nodes") + ); + } + + public void testFileCacheException() { + when(fileCache.prune()).thenThrow(new RuntimeException("Cache corruption")); + + PruneFileCacheRequest globalRequest = new PruneFileCacheRequest(); + TransportPruneFileCacheAction.NodeRequest nodeRequest = new TransportPruneFileCacheAction.NodeRequest(globalRequest); + + RuntimeException exception = expectThrows( + RuntimeException.class, + () -> action.nodeOperation(nodeRequest) + ); + + assertTrue("Exception should contain node ID", exception.getMessage().contains("node")); + assertTrue("Exception should mention failure", exception.getMessage().contains("failed")); + } + + public void testResponseAggregation() { + PruneFileCacheRequest request = new PruneFileCacheRequest(); + + List responses = Arrays.asList( + new NodePruneFileCacheResponse(clusterService.state().nodes().get("warm-node-1"), 1048576L, 10737418240L), + new NodePruneFileCacheResponse(clusterService.state().nodes().get("warm-node-2"), 2097152L, 10737418240L) + ); + + List failures = Collections.emptyList(); + + PruneFileCacheResponse aggregatedResponse = action.newResponse(request, responses, failures); + + assertNotNull("Aggregated response should not be null", aggregatedResponse); + assertEquals("Should have 2 successful nodes", 2, aggregatedResponse.getNodes().size()); + assertEquals("Should have no failures", 0, aggregatedResponse.failures().size()); + assertEquals("Total pruned bytes should be sum", 3145728L, aggregatedResponse.getTotalPrunedBytes()); + assertTrue("Should be completely successful", aggregatedResponse.isCompletelySuccessful()); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheActionTests.java new file mode 100644 index 0000000000000..fa48956f3de10 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheActionTests.java @@ -0,0 +1,319 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.filecache.NodePruneFileCacheResponse; +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheAction; +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheRequest; +import org.opensearch.action.admin.cluster.filecache.PruneFileCacheResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the enhanced {@link RestPruneCacheAction} class. + * Covers route registration, parameter parsing, and multi-node response handling. + */ +public class RestPruneCacheActionTests extends RestActionTestCase { + + private RestPruneCacheAction action; + + @Before + public void setUpAction() { + action = new RestPruneCacheAction(); + controller().registerHandler(action); + } + + private PruneFileCacheResponse createMockMultiNodeResponse() { + DiscoveryNode mockNode = mock(DiscoveryNode.class); + when(mockNode.getId()).thenReturn("node1"); + when(mockNode.getName()).thenReturn("test-node"); + when(mockNode.getHostName()).thenReturn("localhost"); + when(mockNode.getHostAddress()).thenReturn("127.0.0.1"); + when(mockNode.getAddress()).thenReturn(buildNewFakeTransportAddress()); + + List nodeResponses = Arrays.asList(new NodePruneFileCacheResponse(mockNode, 1024L, 10737418240L)); + + return new PruneFileCacheResponse(new ClusterName("test-cluster"), nodeResponses, Collections.emptyList()); + } + + public void testRoutes() { + assertEquals(1, action.routes().size()); + assertEquals(RestRequest.Method.POST, action.routes().get(0).getMethod()); + assertEquals("/_filecache/prune", action.routes().get(0).getPath()); + } + + /** + * Verifies that the action has the correct registered name. + */ + public void testGetName() { + assertEquals("prune_filecache_action", action.getName()); + } + + /** + * Tests basic request preparation without parameters (all defaults). + */ + public void testPrepareRequestDefaults() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertNull("Default should target all nodes", pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .build(); + + dispatchRequest(request); + } + + /** + * Tests enhanced parameter handling. + */ + public void testEnhancedParameterHandling() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertArrayEquals("Node targeting should work", new String[] { "node1", "node2" }, pruneFileCacheRequest.nodesIds()); + assertEquals("Timeout should be set", 30000, pruneFileCacheRequest.timeout().getMillis()); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("nodes", "node1,node2"); + params.put("timeout", "30s"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests single node parameter handling. + */ + public void testSingleNodeParameter() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertArrayEquals("Single node should be targeted", new String[] { "node1" }, pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("node", "node1"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests nodes parameter takes priority over node parameter. + */ + public void testParameterPriority() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + + assertArrayEquals("Nodes parameter should take priority", new String[] { "node1", "node2" }, pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("nodes", "node1,node2"); + params.put("node", "node3"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests warm node targeting syntax. + */ + public void testWarmNodeTargetingSyntax() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertArrayEquals("Should target warm nodes", new String[] { "warm:true" }, pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("nodes", "warm:true"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests parameter cleaning and validation. + */ + public void testParameterCleaning() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + // Should clean whitespace and filter empty strings + assertArrayEquals("Should clean parameters", new String[] { "node1", "node2" }, pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("nodes", " node1 , node2 , , "); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests that the action correctly reports it cannot trip circuit breaker. + */ + public void testCanTripCircuitBreaker() { + assertFalse("Prune cache action should not trip circuit breaker", action.canTripCircuitBreaker()); + } + + /** + * Tests comprehensive parameter combinations. + */ + public void testComprehensiveParameterCombinations() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertArrayEquals( + "Node targeting should work", + new String[] { "warm-node-1", "warm-node-2" }, + pruneFileCacheRequest.nodesIds() + ); + + return createMockMultiNodeResponse(); + }); + + Map params = new HashMap<>(); + params.put("nodes", "warm-node-1,warm-node-2"); + params.put("timeout", "5m"); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests empty parameter handling edge cases. + */ + public void testEmptyParameterHandling() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + assertNull("Empty nodes parameter should result in null", pruneFileCacheRequest.nodesIds()); + + return createMockMultiNodeResponse(); + }); + + // Test empty parameters + Map params = new HashMap<>(); + params.put("nodes", ""); + params.put("node", ""); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } + + /** + * Tests large node list parameter handling. + */ + public void testLargeNodeListHandling() throws Exception { + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertEquals(PruneFileCacheAction.INSTANCE, actionType); + assertThat(request, instanceOf(PruneFileCacheRequest.class)); + + PruneFileCacheRequest pruneFileCacheRequest = (PruneFileCacheRequest) request; + + assertEquals("Should handle large node list", 10, pruneFileCacheRequest.nodesIds().length); + + return createMockMultiNodeResponse(); + }); + + // Test large node list + StringBuilder largeNodeList = new StringBuilder(); + for (int i = 0; i < 10; i++) { + if (i > 0) largeNodeList.append(","); + largeNodeList.append("node").append(i); + } + + Map params = new HashMap<>(); + params.put("nodes", largeNodeList.toString()); + + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/_filecache/prune") + .withParams(params) + .build(); + + dispatchRequest(request); + } +}