88
99package org .opensearch .cache ;
1010
11+ import com .carrotsearch .randomizedtesting .annotations .ThreadLeakFilters ;
12+
1113import org .opensearch .action .admin .cluster .node .info .NodeInfo ;
1214import org .opensearch .action .admin .cluster .node .info .NodesInfoRequest ;
1315import org .opensearch .action .admin .cluster .node .info .NodesInfoResponse ;
1416import org .opensearch .action .admin .cluster .node .info .PluginsAndModules ;
17+ import org .opensearch .action .admin .indices .cache .clear .ClearIndicesCacheRequest ;
18+ import org .opensearch .action .admin .indices .cache .clear .ClearIndicesCacheResponse ;
19+ import org .opensearch .action .admin .indices .forcemerge .ForceMergeResponse ;
20+ import org .opensearch .action .search .SearchResponse ;
21+ import org .opensearch .action .search .SearchType ;
22+ import org .opensearch .cache .store .disk .EhcacheDiskCache ;
23+ import org .opensearch .cache .store .disk .EhcacheThreadLeakFilter ;
24+ import org .opensearch .client .Client ;
25+ import org .opensearch .cluster .metadata .IndexMetadata ;
26+ import org .opensearch .common .cache .CacheType ;
27+ import org .opensearch .common .cache .settings .CacheSettings ;
28+ import org .opensearch .common .settings .Settings ;
29+ import org .opensearch .common .unit .TimeValue ;
30+ import org .opensearch .common .util .FeatureFlags ;
31+ import org .opensearch .env .NodeEnvironment ;
32+ import org .opensearch .index .cache .request .RequestCacheStats ;
33+ import org .opensearch .index .query .QueryBuilders ;
34+ import org .opensearch .indices .IndicesRequestCache ;
1535import org .opensearch .plugins .Plugin ;
1636import org .opensearch .plugins .PluginInfo ;
37+ import org .opensearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
1738import org .opensearch .test .OpenSearchIntegTestCase ;
39+ import org .opensearch .test .hamcrest .OpenSearchAssertions ;
1840import org .junit .Assert ;
1941
42+ import java .io .IOException ;
43+ import java .time .ZoneId ;
2044import java .util .Arrays ;
2145import java .util .Collection ;
2246import java .util .List ;
47+ import java .util .UUID ;
48+ import java .util .concurrent .TimeUnit ;
2349import java .util .function .Function ;
2450import java .util .stream .Collectors ;
2551import java .util .stream .Stream ;
2652
53+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DEFAULT_CACHE_SIZE_IN_BYTES ;
54+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY ;
55+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_LISTENER_MODE_SYNC_KEY ;
56+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_MAX_SIZE_IN_BYTES_KEY ;
57+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_STORAGE_PATH_KEY ;
58+ import static org .opensearch .indices .IndicesService .INDICES_CACHE_CLEAN_INTERVAL_SETTING ;
59+ import static org .opensearch .search .aggregations .AggregationBuilders .dateHistogram ;
60+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
61+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertNoFailures ;
62+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertSearchResponse ;
63+ import static org .hamcrest .Matchers .greaterThan ;
64+
65+ @ OpenSearchIntegTestCase .ClusterScope (numDataNodes = 0 , scope = OpenSearchIntegTestCase .Scope .TEST )
66+ @ ThreadLeakFilters (filters = { EhcacheThreadLeakFilter .class })
2767public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase {
2868
2969 @ Override
3070 protected Collection <Class <? extends Plugin >> nodePlugins () {
3171 return Arrays .asList (EhcacheCachePlugin .class );
3272 }
3373
74+ @ Override
75+ protected Settings featureFlagSettings () {
76+ return Settings .builder ().put (super .featureFlagSettings ()).put (FeatureFlags .PLUGGABLE_CACHE , "true" ).build ();
77+ }
78+
79+ private Settings defaultSettings (long sizeInBytes , TimeValue expirationTime ) {
80+ if (expirationTime == null ) {
81+ expirationTime = TimeValue .MAX_VALUE ;
82+ }
83+ try (NodeEnvironment env = newNodeEnvironment (Settings .EMPTY )) {
84+ return Settings .builder ()
85+ .put (
86+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
87+ .get (DISK_STORAGE_PATH_KEY )
88+ .getKey (),
89+ env .nodePaths ()[0 ].indicesPath .toString () + "/" + UUID .randomUUID () + "/request_cache/"
90+ )
91+ .put (
92+ CacheSettings .getConcreteStoreNameSettingForCacheType (CacheType .INDICES_REQUEST_CACHE ).getKey (),
93+ EhcacheDiskCache .EhcacheDiskCacheFactory .EHCACHE_DISK_CACHE_NAME
94+ )
95+ .put (
96+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
97+ .get (DISK_LISTENER_MODE_SYNC_KEY )
98+ .getKey (),
99+ true
100+ )
101+ .put (
102+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
103+ .get (DISK_MAX_SIZE_IN_BYTES_KEY )
104+ .getKey (),
105+ sizeInBytes
106+ )
107+ .put (
108+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
109+ .get (DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY )
110+ .getKey (),
111+ expirationTime
112+ )
113+ .build ();
114+ } catch (IOException e ) {
115+ throw new RuntimeException (e );
116+ }
117+ }
118+
34119 public void testPluginsAreInstalled () {
120+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null )).build ());
35121 NodesInfoRequest nodesInfoRequest = new NodesInfoRequest ();
36122 nodesInfoRequest .addMetric (NodesInfoRequest .Metric .PLUGINS .metricName ());
37123 NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase .client ().admin ().cluster ().nodesInfo (nodesInfoRequest ).actionGet ();
@@ -45,4 +131,258 @@ public void testPluginsAreInstalled() {
45131 pluginInfos .stream ().anyMatch (pluginInfo -> pluginInfo .getName ().equals ("org.opensearch.cache.EhcacheCachePlugin" ))
46132 );
47133 }
134+
135+ public void testSanityChecksWithIndicesRequestCache () throws InterruptedException {
136+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null )).build ());
137+ Client client = client ();
138+ assertAcked (
139+ client .admin ()
140+ .indices ()
141+ .prepareCreate ("index" )
142+ .setMapping ("f" , "type=date" )
143+ .setSettings (
144+ Settings .builder ()
145+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
146+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
147+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
148+ .build ()
149+ )
150+ .get ()
151+ );
152+ indexRandom (
153+ true ,
154+ client .prepareIndex ("index" ).setSource ("f" , "2014-03-10T00:00:00.000Z" ),
155+ client .prepareIndex ("index" ).setSource ("f" , "2014-05-13T00:00:00.000Z" )
156+ );
157+ ensureSearchable ("index" );
158+
159+ // This is not a random example: serialization with time zones writes shared strings
160+ // which used to not work well with the query cache because of the handles stream output
161+ // see #9500
162+ final SearchResponse r1 = client .prepareSearch ("index" )
163+ .setSize (0 )
164+ .setSearchType (SearchType .QUERY_THEN_FETCH )
165+ .addAggregation (
166+ dateHistogram ("histo" ).field ("f" )
167+ .timeZone (ZoneId .of ("+01:00" ))
168+ .minDocCount (0 )
169+ .dateHistogramInterval (DateHistogramInterval .MONTH )
170+ )
171+ .get ();
172+ assertSearchResponse (r1 );
173+
174+ // The cached is actually used
175+ assertThat (
176+ client .admin ().indices ().prepareStats ("index" ).setRequestCache (true ).get ().getTotal ().getRequestCache ().getMemorySizeInBytes (),
177+ greaterThan (0L )
178+ );
179+ }
180+
181+ public void testInvalidationWithIndicesRequestCache () throws Exception {
182+ internalCluster ().startNode (
183+ Settings .builder ()
184+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null ))
185+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
186+ .build ()
187+ );
188+ Client client = client ();
189+ assertAcked (
190+ client .admin ()
191+ .indices ()
192+ .prepareCreate ("index" )
193+ .setMapping ("k" , "type=keyword" )
194+ .setSettings (
195+ Settings .builder ()
196+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
197+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
198+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
199+ .put ("index.refresh_interval" , -1 )
200+ )
201+ .get ()
202+ );
203+ int numberOfIndexedItems = randomIntBetween (5 , 10 );
204+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
205+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
206+ }
207+ ensureSearchable ("index" );
208+ refreshAndWaitForReplication ();
209+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
210+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
211+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
212+ long perQuerySizeInCacheInBytes = -1 ;
213+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
214+ SearchResponse resp = client .prepareSearch ("index" )
215+ .setRequestCache (true )
216+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
217+ .get ();
218+ if (perQuerySizeInCacheInBytes == -1 ) {
219+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
220+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
221+ }
222+ assertSearchResponse (resp );
223+ }
224+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
225+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
226+ assertEquals (0 , requestCacheStats .getHitCount ());
227+ assertEquals (0 , requestCacheStats .getEvictions ());
228+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
229+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
230+ SearchResponse resp = client .prepareSearch ("index" )
231+ .setRequestCache (true )
232+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
233+ .get ();
234+ assertSearchResponse (resp );
235+ }
236+ requestCacheStats = getRequestCacheStats (client , "index" );
237+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
238+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
239+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
240+ assertEquals (0 , requestCacheStats .getEvictions ());
241+ // Explicit refresh would invalidate cache entries.
242+ refreshAndWaitForReplication ();
243+ assertBusy (() -> {
244+ // Explicit refresh should clear up cache entries
245+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
246+ }, 1 , TimeUnit .SECONDS );
247+ requestCacheStats = getRequestCacheStats (client , "index" );
248+ assertEquals (0 , requestCacheStats .getMemorySizeInBytes ());
249+ // Hits and misses stats shouldn't get cleared up.
250+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
251+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
252+ }
253+
254+ public void testExplicitCacheClearWithIndicesRequestCache () throws Exception {
255+ internalCluster ().startNode (
256+ Settings .builder ()
257+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null ))
258+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
259+ .build ()
260+ );
261+ Client client = client ();
262+ assertAcked (
263+ client .admin ()
264+ .indices ()
265+ .prepareCreate ("index" )
266+ .setMapping ("k" , "type=keyword" )
267+ .setSettings (
268+ Settings .builder ()
269+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
270+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
271+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
272+ .put ("index.refresh_interval" , -1 )
273+ )
274+ .get ()
275+ );
276+ int numberOfIndexedItems = randomIntBetween (5 , 10 );
277+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
278+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
279+ }
280+ ensureSearchable ("index" );
281+ refreshAndWaitForReplication ();
282+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
283+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
284+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
285+
286+ long perQuerySizeInCacheInBytes = -1 ;
287+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
288+ SearchResponse resp = client .prepareSearch ("index" )
289+ .setRequestCache (true )
290+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
291+ .get ();
292+ if (perQuerySizeInCacheInBytes == -1 ) {
293+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
294+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
295+ }
296+ assertSearchResponse (resp );
297+ }
298+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
299+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
300+ assertEquals (0 , requestCacheStats .getHitCount ());
301+ assertEquals (0 , requestCacheStats .getEvictions ());
302+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
303+
304+ // Explicit clear the cache.
305+ ClearIndicesCacheRequest request = new ClearIndicesCacheRequest ("index" );
306+ ClearIndicesCacheResponse response = client .admin ().indices ().clearCache (request ).get ();
307+ assertNoFailures (response );
308+
309+ assertBusy (() -> {
310+ // All entries should get cleared up.
311+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
312+ }, 1 , TimeUnit .SECONDS );
313+ }
314+
315+ public void testEvictionsFlowWithExpirationTime () throws Exception {
316+ internalCluster ().startNode (
317+ Settings .builder ()
318+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , new TimeValue (0 ))) // Immediately evict items after
319+ // access
320+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
321+ .build ()
322+ );
323+ Client client = client ();
324+ assertAcked (
325+ client .admin ()
326+ .indices ()
327+ .prepareCreate ("index" )
328+ .setMapping ("k" , "type=keyword" )
329+ .setSettings (
330+ Settings .builder ()
331+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
332+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
333+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
334+ .put ("index.refresh_interval" , -1 )
335+ )
336+ .get ()
337+ );
338+ int numberOfIndexedItems = 2 ;// randomIntBetween(5, 10);
339+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
340+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
341+ }
342+ ensureSearchable ("index" );
343+ refreshAndWaitForReplication ();
344+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
345+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
346+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
347+
348+ long perQuerySizeInCacheInBytes = -1 ;
349+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
350+ SearchResponse resp = client .prepareSearch ("index" )
351+ .setRequestCache (true )
352+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
353+ .get ();
354+ if (perQuerySizeInCacheInBytes == -1 ) {
355+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
356+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
357+ }
358+ assertSearchResponse (resp );
359+ }
360+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
361+ assertEquals (0 , requestCacheStats .getHitCount ());
362+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
363+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
364+ assertEquals (0 , requestCacheStats .getEvictions ());
365+
366+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
367+ SearchResponse resp = client .prepareSearch ("index" )
368+ .setRequestCache (true )
369+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
370+ .get ();
371+ assertSearchResponse (resp );
372+ }
373+ requestCacheStats = getRequestCacheStats (client , "index" );
374+ // Now that we have access the entries, they should expire after 1ms. So lets wait and verify that cache gets
375+ // cleared up.
376+ assertBusy (() -> {
377+ // Explicit refresh should clear up cache entries
378+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
379+ }, 10 , TimeUnit .MILLISECONDS );
380+ // Validate hit and miss count.
381+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
382+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
383+ }
384+
385+ private RequestCacheStats getRequestCacheStats (Client client , String indexName ) {
386+ return client .admin ().indices ().prepareStats (indexName ).setRequestCache (true ).get ().getTotal ().getRequestCache ();
387+ }
48388}
0 commit comments