Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))

### Changed
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))

### Dependencies
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testClusterInfoServiceCollectsInformation() {
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
final Map<String, AggregateFileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non warm nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

Expand Down Expand Up @@ -227,12 +227,12 @@ public void testClusterInfoServiceCollectsFileCacheInformation() {
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
final Map<String, AggregateFileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both warm nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
for (AggregateFileCacheStats aggregateFileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", aggregateFileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -27,6 +30,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.indices.IndicesService;
Expand All @@ -36,7 +40,9 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -49,6 +55,7 @@
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final String INDEX_NAME_2 = "test-idx-2";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Expand Down Expand Up @@ -172,4 +179,82 @@ public void testWritableWarmBasic() throws Exception {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}

public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException {

InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataAndWarmNodes(1);

Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
.get();

Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting docs again before force merge
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME_2);

// ensuring cluster is green
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME_2);

FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();

// TODO: Make these validation more robust, when SwitchableIndexInput is implemented.

NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();

AggregateFileCacheStats fileCacheStats = nodesStatsResponse.getNodes()
.stream()
.filter(n -> n.getNode().isDataNode())
.toList()
.getFirst()
.getFileCacheStats();

if (Objects.isNull(fileCacheStats)) {
fail("File Cache Stats should not be null");
}

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get());
fileCache.prune();

NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
AggregateFileCacheStats fcStats = stats.getFileCacheStats();
if (Objects.isNull(fcStats) == false) {
if (isFileCacheEmpty(fcStats) == false) {
nonEmptyFileCacheNodes++;
}
}
}
assertEquals(0, nonEmptyFileCacheNodes);

}

private boolean isFileCacheEmpty(AggregateFileCacheStats stats) {
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;
Expand Down Expand Up @@ -711,7 +711,7 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
AggregateFileCacheStats fcstats = stats.getFileCacheStats();
if (fcstats != null) {
assertTrue(isFileCacheEmpty(fcstats));
}
Expand All @@ -722,7 +722,7 @@ private void assertNodesFileCacheNonEmpty(int numNodes) {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcStats = stats.getFileCacheStats();
AggregateFileCacheStats fcStats = stats.getFileCacheStats();
if (stats.getNode().isWarmNode()) {
if (!isFileCacheEmpty(fcStats)) {
nonEmptyFileCacheNodes++;
Expand All @@ -735,7 +735,7 @@ private void assertNodesFileCacheNonEmpty(int numNodes) {
assertEquals(numNodes, nonEmptyFileCacheNodes);
}

private boolean isFileCacheEmpty(FileCacheStats stats) {
private boolean isFileCacheEmpty(AggregateFileCacheStats stats) {
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.ingest.IngestStats;
import org.opensearch.monitor.fs.FsInfo;
Expand Down Expand Up @@ -143,7 +143,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
private WeightedRoutingStats weightedRoutingStats;

@Nullable
private FileCacheStats fileCacheStats;
private AggregateFileCacheStats fileCacheStats;

@Nullable
private TaskCancellationStats taskCancellationStats;
Expand Down Expand Up @@ -208,7 +208,7 @@ public NodeStats(StreamInput in) throws IOException {
weightedRoutingStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
fileCacheStats = in.readOptionalWriteable(FileCacheStats::new);
fileCacheStats = in.readOptionalWriteable(AggregateFileCacheStats::new);
} else {
fileCacheStats = null;
}
Expand Down Expand Up @@ -277,7 +277,7 @@ public NodeStats(
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable AggregateFileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
Expand Down Expand Up @@ -444,7 +444,7 @@ public WeightedRoutingStats getWeightedRoutingStats() {
return weightedRoutingStats;
}

public FileCacheStats getFileCacheStats() {
public AggregateFileCacheStats getFileCacheStats() {
return fileCacheStats;
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -68,7 +68,7 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
final Map<String, AggregateFileCacheStats> nodeFileCacheStats;
private long avgTotalBytes;
private long avgFreeByte;

Expand All @@ -92,7 +92,7 @@ public ClusterInfo(
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
final Map<String, AggregateFileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
Expand All @@ -117,7 +117,7 @@ public ClusterInfo(StreamInput in) throws IOException {
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
this.nodeFileCacheStats = in.readMap(StreamInput::readString, AggregateFileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();
}
Expand Down Expand Up @@ -242,7 +242,7 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
public Map<String, AggregateFileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand Down Expand Up @@ -112,7 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile Map<String, AggregateFileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -280,7 +280,7 @@ private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation
ClusterInfo clusterInfo = allocation.clusterInfo();
// TODO: Change the default value to 5 instead of 0
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
return (long) dataToFileCacheSizeRatio * nodeCacheSize;
}
Expand Down Expand Up @@ -309,7 +309,7 @@ private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation)
}

// Fail open if there are no file cache stats available
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
if (fileCacheStats == null) {
if (logger.isTraceEnabled()) {
logger.trace("unable to get file cache stats for node [{}], allowing allocation", node.nodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void resetHelper(Node current) {

@Override
public long count() {
// Include this here so caches don't have to create an entire CacheStats object to run count().
// Include this here so caches don't have to create an entire AggregateRefCountedCacheStats object to run count().
return statsRoot.getEntries();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Objects;

/**
* An immutable snapshot of CacheStats.
* An immutable snapshot of AggregateRefCountedCacheStats.
*
* @opensearch.experimental
*/
Expand Down
Loading
Loading