Skip to content

Commit d2ba6b3

Browse files
kkewweiandrross
authored andcommitted
Add a dynamic setting to change skip_cache_factor and min_frequency for querycache (opensearch-project#18351)
* fix changelog conflicts Signed-off-by: kkewwei <[email protected]> * Add a dynamic setting to change skip_cache_factor and min_frequency for querycache Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]> * change the setting name Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]> * add volatile to variable Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]> --------- Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]> Signed-off-by: Andrew Ross <[email protected]> Co-authored-by: Andrew Ross <[email protected]>
1 parent 932a5e0 commit d2ba6b3

File tree

7 files changed

+246
-5
lines changed

7 files changed

+246
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- The dynamic mapping parameter supports false_allow_templates ([#19065](https://github.com/opensearch-project/OpenSearch/pull/19065))
1111
- Add a toBuilder method in EngineConfig to support easy modification of configs([#19054](https://github.com/opensearch-project/OpenSearch/pull/19054))
1212
- Add StoreFactory plugin interface for custom Store implementations([#19091](https://github.com/opensearch-project/OpenSearch/pull/19091))
13+
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
1314

1415
### Changed
1516
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,10 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
616616
logger.debug("completed calling listeners of cluster state for version {}", newClusterState.version());
617617
}
618618

619+
public ClusterSettings clusterSettings() {
620+
return clusterSettings;
621+
}
622+
619623
protected void connectToNodesAndWait(ClusterState newClusterState) {
620624
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
621625
final CountDownLatch countDownLatch = new CountDownLatch(1);

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,9 @@ public void apply(Settings value, Settings current, Settings previous) {
299299
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
300300
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
301301
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
302+
IndicesQueryCache.INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR,
303+
IndicesQueryCache.INDICES_QUERY_CACHE_MIN_FREQUENCY,
304+
IndicesQueryCache.INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY,
302305
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
303306
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
304307
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.lucene.search.QueryCachingPolicy;
4949
import org.apache.lucene.search.ReferenceManager;
5050
import org.apache.lucene.search.Sort;
51-
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
5251
import org.apache.lucene.store.AlreadyClosedException;
5352
import org.apache.lucene.store.BufferedChecksumIndexInput;
5453
import org.apache.lucene.store.ChecksumIndexInput;
@@ -192,6 +191,7 @@
192191
import org.opensearch.index.warmer.ShardIndexWarmerService;
193192
import org.opensearch.index.warmer.WarmerStats;
194193
import org.opensearch.indices.IndexingMemoryController;
194+
import org.opensearch.indices.IndicesQueryCache;
195195
import org.opensearch.indices.IndicesService;
196196
import org.opensearch.indices.RemoteStoreSettings;
197197
import org.opensearch.indices.cluster.IndicesClusterStateService;
@@ -510,7 +510,7 @@ public boolean shouldCache(Query query) {
510510
}
511511
};
512512
} else {
513-
cachingPolicy = new UsageTrackingQueryCachingPolicy();
513+
cachingPolicy = new IndicesQueryCache.OpenseachUsageTrackingQueryCachingPolicy(clusterApplierService.clusterSettings());
514514
}
515515
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
516516
if (indexSettings.isDerivedSourceEnabled()) {
@@ -5755,4 +5755,5 @@ public void stopRefreshTask() {
57555755
public AsyncShardRefreshTask getRefreshTask() {
57565756
return refreshTask;
57575757
}
5758+
57585759
}

server/src/main/java/org/opensearch/indices/IndicesQueryCache.java

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,20 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.lucene.index.LeafReaderContext;
38+
import org.apache.lucene.search.BooleanQuery;
39+
import org.apache.lucene.search.DisjunctionMaxQuery;
3840
import org.apache.lucene.search.Explanation;
3941
import org.apache.lucene.search.LRUQueryCache;
42+
import org.apache.lucene.search.MultiTermQuery;
4043
import org.apache.lucene.search.Query;
4144
import org.apache.lucene.search.QueryCache;
4245
import org.apache.lucene.search.QueryCachingPolicy;
4346
import org.apache.lucene.search.ScorerSupplier;
47+
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
4448
import org.apache.lucene.search.Weight;
4549
import org.opensearch.common.annotation.PublicApi;
4650
import org.opensearch.common.lucene.ShardCoreKeyMap;
51+
import org.opensearch.common.settings.ClusterSettings;
4752
import org.opensearch.common.settings.Setting;
4853
import org.opensearch.common.settings.Setting.Property;
4954
import org.opensearch.common.settings.Settings;
@@ -91,6 +96,33 @@ public class IndicesQueryCache implements QueryCache, Closeable {
9196
Property.NodeScope
9297
);
9398

99+
// dynamic change the skipCacheFactor for query_cache
100+
public static final Setting<Float> INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR = Setting.floatSetting(
101+
"indices.queries.cache.skip_cache_factor",
102+
10,
103+
1,
104+
Property.NodeScope,
105+
Property.Dynamic
106+
);
107+
108+
// dynamic change the min frequency cache threshold for query
109+
public static final Setting<Integer> INDICES_QUERY_CACHE_MIN_FREQUENCY = Setting.intSetting(
110+
"indices.queries.cache.min_frequency",
111+
5,
112+
1,
113+
Property.NodeScope,
114+
Property.Dynamic
115+
);
116+
117+
// dynamic change the min frequency cache threshold for costly query
118+
public static final Setting<Integer> INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY = Setting.intSetting(
119+
"indices.queries.cache.costly_min_frequency",
120+
2,
121+
1,
122+
Property.NodeScope,
123+
Property.Dynamic
124+
);
125+
94126
private final LRUQueryCache cache;
95127
private final ShardCoreKeyMap shardKeyMap = new ShardCoreKeyMap();
96128
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
@@ -101,18 +133,40 @@ public class IndicesQueryCache implements QueryCache, Closeable {
101133
// See onDocIdSetEviction for more info
102134
private final Map<Object, StatsAndCount> stats2 = Collections.synchronizedMap(new IdentityHashMap<>());
103135

136+
// Compatible for public api
104137
public IndicesQueryCache(Settings settings) {
138+
this(settings, null);
139+
}
140+
141+
public IndicesQueryCache(Settings settings, ClusterSettings clusterSettings) {
105142
final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings);
106143
final int count = INDICES_CACHE_QUERY_COUNT_SETTING.get(settings);
107-
logger.debug("using [node] query cache with size [{}] max filter count [{}]", size, count);
144+
float skipCacheFactor = INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.get(settings);
145+
logger.debug("using [node] query cache with size [{}] max filter count [{}] skipCacheFactor [{}]", size, count, skipCacheFactor);
108146
if (INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.get(settings)) {
109147
cache = new OpenSearchLRUQueryCache(count, size.getBytes(), context -> true, 1f);
110148
} else {
111149
cache = new OpenSearchLRUQueryCache(count, size.getBytes());
150+
cache.setSkipCacheFactor(skipCacheFactor);
151+
if (clusterSettings != null) {
152+
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR, this::setSkipCacheFactor);
153+
} else {
154+
logger.warn("clusterSettings is null, so {} is not dynamic", INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey());
155+
}
112156
}
113157
sharedRamBytesUsed = 0;
114158
}
115159

160+
public void setSkipCacheFactor(float skipCacheFactor) {
161+
logger.debug(
162+
"set cluster settings {} {} -> {}",
163+
INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey(),
164+
this.cache.getSkipCacheFactor(),
165+
skipCacheFactor
166+
);
167+
cache.setSkipCacheFactor(skipCacheFactor);
168+
}
169+
116170
/** Get usage statistics for the given shard. */
117171
public QueryCacheStats getStats(ShardId shard) {
118172
final Map<ShardId, QueryCacheStats> stats = new HashMap<>();
@@ -393,4 +447,63 @@ protected void onMiss(Object readerCoreKey, Query filter) {
393447
shardStats.missCount += 1;
394448
}
395449
}
450+
451+
/**
452+
* Custom caching policy for Opensearch.
453+
*/
454+
public static class OpenseachUsageTrackingQueryCachingPolicy extends UsageTrackingQueryCachingPolicy {
455+
private volatile int minFrequency;
456+
private volatile int minFrequencyForCostly;
457+
458+
public OpenseachUsageTrackingQueryCachingPolicy(ClusterSettings clusterSettings) {
459+
minFrequency = clusterSettings.get(INDICES_QUERY_CACHE_MIN_FREQUENCY);
460+
minFrequencyForCostly = clusterSettings.get(INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY);
461+
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERY_CACHE_MIN_FREQUENCY, this::setMinFrequency);
462+
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY, this::setMinFrequencyForCostly);
463+
}
464+
465+
@Override
466+
protected int minFrequencyToCache(Query query) {
467+
if (isCostly(query)) {
468+
return minFrequencyForCostly;
469+
}
470+
int minFrequency = this.minFrequency;
471+
if (query instanceof BooleanQuery || query instanceof DisjunctionMaxQuery) {
472+
--minFrequency;
473+
}
474+
475+
return Math.max(1, minFrequency);
476+
}
477+
478+
/**
479+
* Same to Lucene's UsageTrackingQueryCachingPolicy.isCostly, it's not public in Lucene.
480+
* Given that lucene doesn't give the desired extensibility at this point.
481+
* Also, we can extend it if needed.
482+
*/
483+
private boolean isCostly(Query query) {
484+
return query instanceof MultiTermQuery
485+
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreBlendedWrapper")
486+
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreWrapper")
487+
|| isPointQuery(query);
488+
}
489+
490+
// Same to Lucene's UsageTrackingQueryCachingPolicy.isPointQuery
491+
private boolean isPointQuery(Query query) {
492+
for (Class<?> clazz = query.getClass(); clazz != Query.class; clazz = clazz.getSuperclass()) {
493+
final String simpleName = clazz.getSimpleName();
494+
if (simpleName.startsWith("Point") && simpleName.endsWith("Query")) {
495+
return true;
496+
}
497+
}
498+
return false;
499+
}
500+
501+
public void setMinFrequency(int minFrequency) {
502+
this.minFrequency = minFrequency;
503+
}
504+
505+
public void setMinFrequencyForCostly(int minFrequencyForCostly) {
506+
this.minFrequencyForCostly = minFrequencyForCostly;
507+
}
508+
}
396509
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ public IndicesService(
476476
}
477477
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
478478
}), cacheService, threadPool, clusterService, nodeEnv);
479-
this.indicesQueryCache = new IndicesQueryCache(settings);
479+
this.indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
480480
this.mapperRegistry = mapperRegistry;
481481
this.namedWriteableRegistry = namedWriteableRegistry;
482482
indexingMemoryController = new IndexingMemoryController(
@@ -1205,7 +1205,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
12051205
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
12061206
});
12071207
closeables.add(indicesFieldDataCache);
1208-
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
1208+
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
12091209
closeables.add(indicesQueryCache);
12101210
// this will also fail if some plugin fails etc. which is nice since we can verify that early
12111211
final IndexService service = createIndexService(

server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,49 @@
3333
package org.opensearch.indices;
3434

3535
import org.apache.lucene.document.Document;
36+
import org.apache.lucene.document.IntPoint;
3637
import org.apache.lucene.index.DirectoryReader;
3738
import org.apache.lucene.index.IndexWriter;
39+
import org.apache.lucene.index.IndexWriterConfig;
3840
import org.apache.lucene.index.LeafReaderContext;
41+
import org.apache.lucene.index.Term;
42+
import org.apache.lucene.search.BooleanClause;
43+
import org.apache.lucene.search.BooleanQuery;
3944
import org.apache.lucene.search.ConstantScoreScorer;
4045
import org.apache.lucene.search.ConstantScoreWeight;
4146
import org.apache.lucene.search.DocIdSetIterator;
4247
import org.apache.lucene.search.Explanation;
4348
import org.apache.lucene.search.IndexSearcher;
4449
import org.apache.lucene.search.MatchAllDocsQuery;
50+
import org.apache.lucene.search.MultiTermQuery;
4551
import org.apache.lucene.search.Query;
4652
import org.apache.lucene.search.QueryCachingPolicy;
4753
import org.apache.lucene.search.QueryVisitor;
4854
import org.apache.lucene.search.ScoreMode;
4955
import org.apache.lucene.search.Scorer;
5056
import org.apache.lucene.search.ScorerSupplier;
57+
import org.apache.lucene.search.TermInSetQuery;
58+
import org.apache.lucene.search.TermQuery;
59+
import org.apache.lucene.search.TotalHitCountCollector;
5160
import org.apache.lucene.search.Weight;
5261
import org.apache.lucene.store.Directory;
62+
import org.apache.lucene.util.BytesRef;
5363
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
64+
import org.opensearch.common.settings.ClusterSettings;
5465
import org.opensearch.common.settings.Settings;
5566
import org.opensearch.common.util.io.IOUtils;
5667
import org.opensearch.core.index.shard.ShardId;
5768
import org.opensearch.index.cache.query.QueryCacheStats;
69+
import org.opensearch.index.mapper.KeywordFieldMapper;
5870
import org.opensearch.test.OpenSearchTestCase;
5971

6072
import java.io.IOException;
73+
import java.util.ArrayList;
74+
import java.util.List;
75+
76+
import static org.opensearch.indices.IndicesQueryCache.OpenseachUsageTrackingQueryCachingPolicy;
77+
import static org.apache.lucene.search.MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE;
78+
import static org.apache.lucene.search.MultiTermQuery.CONSTANT_SCORE_REWRITE;
6179

6280
public class IndicesQueryCacheTests extends OpenSearchTestCase {
6381

@@ -492,4 +510,105 @@ public void testDelegatesCount() throws Exception {
492510
cache.onClose(shard);
493511
cache.close();
494512
}
513+
514+
public void testDynamicChangeSettings() throws IOException {
515+
Directory dir = newDirectory();
516+
IndexWriterConfig conf = newIndexWriterConfig();
517+
conf.setMaxBufferedDocs(100000);
518+
IndexWriter w = new IndexWriter(dir, conf);
519+
// lucene will not cache segment whose docs is smaller than 10,000
520+
for (int i = 0; i < 10001; i++) {
521+
Document document = new Document();
522+
document.add(new IntPoint("age", i));
523+
final BytesRef binaryValue = new BytesRef(String.valueOf(i));
524+
document.add(new KeywordFieldMapper.KeywordField("name", binaryValue, KeywordFieldMapper.Defaults.FIELD_TYPE));
525+
w.addDocument(document);
526+
}
527+
DirectoryReader r = DirectoryReader.open(w);
528+
w.close();
529+
ShardId shard = new ShardId("index", "_na_", 0);
530+
r = OpenSearchDirectoryReader.wrap(r, shard);
531+
532+
IndexSearcher searcher = new IndexSearcher(r);
533+
Settings settings = Settings.builder().build();
534+
OpenseachUsageTrackingQueryCachingPolicy queryCachingPolicy = new OpenseachUsageTrackingQueryCachingPolicy(
535+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
536+
);
537+
538+
searcher.setQueryCachingPolicy(queryCachingPolicy);
539+
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
540+
IndicesQueryCache cache = new IndicesQueryCache(settings, clusterSettings);
541+
searcher.setQueryCache(cache);
542+
final TotalHitCountCollector collector = new TotalHitCountCollector();
543+
544+
// test changing queryCacheMinFrequency
545+
{
546+
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
547+
booleanQuery.add(IntPoint.newRangeQuery("age", 1, 9999), BooleanClause.Occur.FILTER);
548+
booleanQuery.add(new TermQuery(new Term("name", "1")), BooleanClause.Occur.FILTER);
549+
searcher.search(booleanQuery.build(), collector);
550+
QueryCacheStats stats = cache.getStats(shard);
551+
assertEquals(0L, stats.getCacheSize());
552+
assertEquals(0L, stats.getCacheCount());
553+
assertEquals(0L, stats.getHitCount());
554+
assertEquals(6L, stats.getMissCount());
555+
556+
queryCachingPolicy.setMinFrequency(2);
557+
searcher.search(booleanQuery.build(), collector);
558+
stats = cache.getStats(shard);
559+
// ensure result is cached
560+
assertEquals(1L, stats.getCacheSize());
561+
assertEquals(1L, stats.getCacheCount());
562+
}
563+
564+
// test changing skipCacheFactor
565+
{
566+
// make sure the range query can be cached, because queryCacheMinFrequency is 2
567+
queryCachingPolicy.setMinFrequency(4);
568+
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
569+
booleanQuery.add(IntPoint.newRangeQuery("age", 2, 9999), BooleanClause.Occur.FILTER);
570+
booleanQuery.add(new TermQuery(new Term("name", "3")), BooleanClause.Occur.MUST);
571+
cache.setSkipCacheFactor(20000);
572+
searcher.search(booleanQuery.build(), collector);
573+
searcher.search(booleanQuery.build(), collector);
574+
QueryCacheStats stats = cache.getStats(shard);
575+
// only range can be cached, assert the range query has been cached because of skipCacheFactor
576+
assertEquals(2L, stats.getCacheSize());
577+
assertEquals(2L, stats.getCacheCount());
578+
assertEquals(0L, stats.getHitCount());
579+
}
580+
IOUtils.close(r, dir);
581+
cache.onClose(shard);
582+
cache.close(); // this triggers some assertions
583+
}
584+
585+
public void testCostlyMinFrequencyToCache() throws IOException {
586+
Settings settings = Settings.builder().build();
587+
OpenseachUsageTrackingQueryCachingPolicy queryCachingPolicy = new OpenseachUsageTrackingQueryCachingPolicy(
588+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
589+
);
590+
int minFrequencyForCostly = 3;
591+
queryCachingPolicy.setMinFrequencyForCostly(minFrequencyForCostly);
592+
593+
// test MultiTermQuery
594+
List<BytesRef> terms = new ArrayList<>();
595+
terms.add(new BytesRef("foo"));
596+
TermInSetQuery termInSetQuery = new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, "field", terms);
597+
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(termInSetQuery));
598+
599+
// test MultiTermQueryConstantScoreBlendedWrapper
600+
Query query = CONSTANT_SCORE_BLENDED_REWRITE.rewrite(null, termInSetQuery);
601+
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));
602+
603+
// test MultiTermQueryConstantScoreWrapper
604+
query = CONSTANT_SCORE_REWRITE.rewrite(null, termInSetQuery);
605+
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));
606+
607+
// test TermInSetQuery
608+
query = CONSTANT_SCORE_REWRITE.rewrite(null, termInSetQuery);
609+
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));
610+
611+
query = IntPoint.newRangeQuery("age", 2, 9999);
612+
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));
613+
}
495614
}

0 commit comments

Comments
 (0)