88
99package org .opensearch .cache .common .tier ;
1010
11+ import org .apache .logging .log4j .LogManager ;
12+ import org .apache .logging .log4j .Logger ;
1113import org .opensearch .cache .common .policy .TookTimePolicy ;
1214import org .opensearch .common .annotation .ExperimentalApi ;
1315import org .opensearch .common .cache .CacheType ;
3537import java .util .Map ;
3638import java .util .NoSuchElementException ;
3739import java .util .Objects ;
40+ import java .util .concurrent .CompletableFuture ;
41+ import java .util .concurrent .ConcurrentHashMap ;
42+ import java .util .concurrent .ExecutionException ;
3843import java .util .concurrent .atomic .AtomicBoolean ;
3944import java .util .concurrent .locks .ReadWriteLock ;
4045import java .util .concurrent .locks .ReentrantReadWriteLock ;
46+ import java .util .function .BiFunction ;
4147import java .util .function .Function ;
4248import java .util .function .Predicate ;
4349import java .util .function .ToLongBiFunction ;
@@ -61,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
6167
6268 // Used to avoid caching stale entries in lower tiers.
6369 private static final List <RemovalReason > SPILLOVER_REMOVAL_REASONS = List .of (RemovalReason .EVICTED , RemovalReason .CAPACITY );
70+ private static final Logger logger = LogManager .getLogger (TieredSpilloverCache .class );
6471
6572 private final ICache <K , V > diskCache ;
6673 private final ICache <K , V > onHeapCache ;
@@ -86,6 +93,12 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
8693 private final Map <ICache <K , V >, TierInfo > caches ;
8794 private final List <Predicate <V >> policies ;
8895
96+ /**
97+ * This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
98+ * only once.
99+ */
100+ Map <ICacheKey <K >, CompletableFuture <Tuple <ICacheKey <K >, V >>> completableFutureMap = new ConcurrentHashMap <>();
101+
89102 TieredSpilloverCache (Builder <K , V > builder ) {
90103 Objects .requireNonNull (builder .onHeapCacheFactory , "onHeap cache builder can't be null" );
91104 Objects .requireNonNull (builder .diskCacheFactory , "disk cache builder can't be null" );
@@ -190,10 +203,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
190203 // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
191204 // This is needed as there can be many requests for the same key at the same time and we only want to load
192205 // the value once.
193- V value = null ;
194- try (ReleasableLock ignore = writeLock .acquire ()) {
195- value = onHeapCache .computeIfAbsent (key , loader );
196- }
206+ V value = compute (key , loader );
197207 // Handle stats
198208 if (loader .isLoaded ()) {
199209 // The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
@@ -222,6 +232,57 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
222232 return cacheValueTuple .v1 ();
223233 }
224234
235+ private V compute (ICacheKey <K > key , LoadAwareCacheLoader <ICacheKey <K >, V > loader ) throws Exception {
236+ // Only one of the threads will succeed putting a future into map for the same key.
237+ // Rest will fetch existing future and wait on that to complete.
238+ CompletableFuture <Tuple <ICacheKey <K >, V >> future = completableFutureMap .putIfAbsent (key , new CompletableFuture <>());
239+ // Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
240+ // the value. Also before returning value, puts the value in cache.
241+ BiFunction <Tuple <ICacheKey <K >, V >, Throwable , Void > handler = (pair , ex ) -> {
242+ if (pair != null ) {
243+ try (ReleasableLock ignore = writeLock .acquire ()) {
244+ onHeapCache .put (pair .v1 (), pair .v2 ());
245+ } catch (Exception e ) {
246+ // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
247+ // listeners/stats. Needs better exception handling at underlying layers.For now swallowing
248+ // exception.
249+ logger .warn ("Exception occurred while putting item onto heap cache" , e );
250+ }
251+ } else {
252+ if (ex != null ) {
253+ logger .warn ("Exception occurred while trying to compute the value" , ex );
254+ }
255+ }
256+ completableFutureMap .remove (key ); // Remove key from map as not needed anymore.
257+ return null ;
258+ };
259+ V value = null ;
260+ if (future == null ) {
261+ future = completableFutureMap .get (key );
262+ future .handle (handler );
263+ try {
264+ value = loader .load (key );
265+ } catch (Exception ex ) {
266+ future .completeExceptionally (ex );
267+ throw new ExecutionException (ex );
268+ }
269+ if (value == null ) {
270+ NullPointerException npe = new NullPointerException ("Loader returned a null value" );
271+ future .completeExceptionally (npe );
272+ throw new ExecutionException (npe );
273+ } else {
274+ future .complete (new Tuple <>(key , value ));
275+ }
276+ } else {
277+ try {
278+ value = future .get ().v2 ();
279+ } catch (InterruptedException ex ) {
280+ throw new IllegalStateException (ex );
281+ }
282+ }
283+ return value ;
284+ }
285+
225286 @ Override
226287 public void invalidate (ICacheKey <K > key ) {
227288 // We are trying to invalidate the key from all caches though it would be present in only of them.
@@ -328,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
328389 ICacheKey <K > key = notification .getKey ();
329390 boolean wasEvicted = SPILLOVER_REMOVAL_REASONS .contains (notification .getRemovalReason ());
330391 boolean countEvictionTowardsTotal = false ; // Don't count this eviction towards the cache's total if it ends up in the disk tier
331- if (caches .get (diskCache ).isEnabled () && wasEvicted && evaluatePolicies (notification .getValue ())) {
392+ boolean exceptionOccurredOnDiskCachePut = false ;
393+ boolean canCacheOnDisk = caches .get (diskCache ).isEnabled () && wasEvicted && evaluatePolicies (notification .getValue ());
394+ if (canCacheOnDisk ) {
332395 try (ReleasableLock ignore = writeLock .acquire ()) {
333396 diskCache .put (key , notification .getValue ()); // spill over to the disk tier and increment its stats
397+ } catch (Exception ex ) {
398+ // TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
399+ // in this case as it shouldn't cause upstream request to fail.
400+ logger .warn ("Exception occurred while putting item to disk cache" , ex );
401+ exceptionOccurredOnDiskCachePut = true ;
334402 }
335- updateStatsOnPut (TIER_DIMENSION_VALUE_DISK , key , notification .getValue ());
336- } else {
403+ if (!exceptionOccurredOnDiskCachePut ) {
404+ updateStatsOnPut (TIER_DIMENSION_VALUE_DISK , key , notification .getValue ());
405+ }
406+ }
407+ if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut ) {
337408 // If the value is not going to the disk cache, send this notification to the TSC's removal listener
338409 // as the value is leaving the TSC entirely
339410 removalListener .onRemoval (notification );
0 commit comments