Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix warnings from SLF4J on startup when repository-s3 is installed ([#16194](https://github.com/opensearch-project/OpenSearch/pull/16194))
- Fix protobuf-java leak through client library dependencies ([#16254](https://github.com/opensearch-project/OpenSearch/pull/16254))
- Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265))
- Fix bug in new cache stats API when closing shards while using TieredSpilloverCache ([#16560](https://github.com/opensearch-project/OpenSearch/pull/16560))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testPluginsAreInstalled() {
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream()
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common.tier.TieredSpilloverCachePlugin"))
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.common.cache.tier.TieredSpilloverCachePlugin"))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchResponse;
Expand Down Expand Up @@ -37,9 +38,10 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME;
import static org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

Expand Down Expand Up @@ -417,6 +419,55 @@ public void testStatsWithMultipleSegments() throws Exception {
assertTrue(diskCacheStat.getEvictions() == 0);
}

public void testClosingShard() throws Exception {
// Closing the shard should totally remove the stats associated with that shard.
internalCluster().startNodes(
1,
Settings.builder()
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
String index = "index";
Client client = client();
startIndex(client, index);

// First search one time to see how big a single value will be
searchIndex(client, index, 0);
// get total stats
long singleSearchSize = getTotalStats(client).getSizeInBytes();
// Select numbers so we get some values on both heap and disk
int itemsOnHeap = HEAP_CACHE_SIZE / (int) singleSearchSize;
int itemsOnDisk = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk
int expectedEntries = itemsOnHeap + itemsOnDisk;

for (int i = 1; i < expectedEntries; i++) {
// Cause misses
searchIndex(client, index, i);
}
int expectedMisses = itemsOnHeap + itemsOnDisk;

// Cause some hits
int expectedHits = randomIntBetween(itemsOnHeap, expectedEntries); // Select it so some hits come from both tiers
for (int i = 0; i < expectedHits; i++) {
searchIndex(client, index, i);
}

// Check the new stats API values are as expected
assertEquals(
new ImmutableCacheStats(expectedHits, expectedMisses, 0, expectedEntries * singleSearchSize, expectedEntries),
getTotalStats(client)
);

// Closing the index should close the shard
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("index")).get());
assertEquals(new ImmutableCacheStats(0, 0, 0, 0, 0), getTotalStats(client));
}

private void startIndex(Client client, String indexName) throws InterruptedException {
assertAcked(
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@
* GitHub history for details.
*/

package org.opensearch.cache.common.policy;
package org.opensearch.common.cache.policy;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
*/

/** A package for policies controlling what can enter caches. */
package org.opensearch.cache.common.policy;
package org.opensearch.common.cache.policy;
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.stats;

import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
import org.opensearch.common.cache.tier.TieredSpilloverCache;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -43,6 +43,8 @@ public class TieredSpilloverCacheStatsHolder extends DefaultCacheStatsHolder {
/** Dimension value for on-disk cache, like EhcacheDiskCache. */
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

static final List<String> TIER_VALUES = List.of(TIER_DIMENSION_VALUE_ON_HEAP, TIER_DIMENSION_VALUE_DISK);

/**
* Constructor for the stats holder.
* @param originalDimensionNames the original dimension names, not including TIER_DIMENSION_NAME
Expand All @@ -65,7 +67,7 @@ private static List<String> getDimensionNamesWithTier(List<String> dimensionName
/**
* Add tierValue to the end of a copy of the initial dimension values, so they can appropriately be used in this stats holder.
*/
List<String> getDimensionsWithTierValue(List<String> initialDimensions, String tierValue) {
public List<String> getDimensionsWithTierValue(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
Expand Down Expand Up @@ -164,7 +166,31 @@ public void decrementItems(List<String> dimensionValues) {
super.decrementItems(dimensionValues);
}

void setDiskCacheEnabled(boolean diskCacheEnabled) {
public void setDiskCacheEnabled(boolean diskCacheEnabled) {
this.diskCacheEnabled = diskCacheEnabled;
}

@Override
public void removeDimensions(List<String> dimensionValues) {
assert dimensionValues.size() == dimensionNames.size() - 1
: "Must specify a value for every dimension except tier when removing from StatsHolder";
// As we are removing nodes from the tree, obtain the lock
lock.lock();
try {
removeDimensionsHelper(dimensionValues, getStatsRoot(), 0);
} finally {
lock.unlock();
}
}

@Override
protected ImmutableCacheStats removeDimensionsBaseCase(Node node) {
// The base case will be the node whose children represent individual tiers.
// Manually delete this node's children
for (String tierValue : TIER_VALUES) {
node.children.remove(tierValue);
}
// Pass up a snapshot of the original stats to avoid issues when the original is decremented by other fn invocations
return node.getImmutableStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
Expand All @@ -20,7 +19,9 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.policy.TookTimePolicy;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -49,14 +50,14 @@
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.common.cache.settings.CacheSettings.VALID_SEGMENT_COUNT_VALUES;
import static org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.common.cache.stats.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand Down Expand Up @@ -373,14 +374,14 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader

@Override
public void invalidate(ICacheKey<K> key) {
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
if (key.getDropStatsForDimensions()) {
List<String> dimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, cacheEntry.getValue().tierName);
statsHolder.removeDimensions(dimensionValues);
}
if (key.key != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
cacheEntry.getKey().invalidate(key);
if (key.getDropStatsForDimensions()) {
statsHolder.removeDimensions(key.dimensions);
} else {
for (Map.Entry<ICache<K, V>, TierInfo> cacheEntry : caches.entrySet()) {
if (key.key != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
cacheEntry.getKey().invalidate(key);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
Expand All @@ -20,8 +20,8 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Plugin for TieredSpilloverCache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
*/

/** Package related to cache tiers **/
package org.opensearch.cache.common.tier;
package org.opensearch.common.cache.tier;
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
* compatible open source license.
*/

package org.opensearch.cache.common.policy;
package org.opensearch.common.cache.policy;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -31,7 +30,7 @@
import java.util.Random;
import java.util.function.Function;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
import static org.opensearch.common.cache.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
Expand Down
Loading
Loading