Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Security Manager Replacement] Enhance Java Agent to intercept Runtime::halt ([#17757](https://github.com/opensearch-project/OpenSearch/pull/17757))
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
- [Tiered caching] Create a single cache manager for all the disk caches. ([#17513](https://github.com/opensearch-project/OpenSearch/pull/17513))
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.setSegmentCount(1) // We don't need to make underlying caches multi-segmented
.setStatsTrackingEnabled(false)
.setMaxSizeInBytes(diskCacheSizeInBytes)
.setStoragePath(builder.cacheConfig.getStoragePath() + "/" + segmentNumber)
.setStoragePath(builder.cacheConfig.getStoragePath())
.setCacheAlias("tiered_disk_cache#" + segmentNumber)
.build(),
builder.cacheType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -36,17 +37,24 @@ public class EhcacheDiskCacheSettings {

public static final Setting.AffixSetting<Integer> DISK_WRITE_MINIMUM_THREADS_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".min_threads",
(key) -> Setting.intSetting(key, 2, 1, 5, NodeScope)
(key) -> Setting.intSetting(key, 2, 1, ThreadPool.searchThreadPoolSize(Runtime.getRuntime().availableProcessors()), NodeScope)
);

/**
* Ehcache disk write maximum threads for its pool
* Ehcache disk write maximum threads for its pool.
*
* Setting pattern: {cache_type}.ehcache_disk.max_threads
*/
public static final Setting.AffixSetting<Integer> DISK_WRITE_MAXIMUM_THREADS_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_threads",
(key) -> Setting.intSetting(key, 2, 1, 20, NodeScope)
(key) -> Setting.intSetting(
key,
ThreadPool.searchThreadPoolSize(Runtime.getRuntime().availableProcessors()),
1,
Runtime.getRuntime().availableProcessors() * 100, // The max one can configure this in setting is 100 times
// CPU cores. Ideally won't be required, but in case one way use it.
NodeScope
)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -153,16 +152,19 @@
if (this.storagePath == null || this.storagePath.isBlank()) {
throw new IllegalArgumentException("Storage path shouldn't be null or empty");
}
// Delete all the previous disk cache related files/data. We don't persist data between process restart for
// now which is why need to do this. Clean up in case there was a non graceful restart and we had older disk
// cache data still lying around.
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);
// Delete all the previous disk cache related files/data only if cache manager doesn't exist. As we can
// create multiple caches via single cache manager for a cache type. We don't persist data between process
// restart for now which is why need to do this. Clean up in case there was a non graceful restart and we had
// older disk cache data still lying around.
if (!EhcacheDiskCacheManager.doesCacheManagerExist(cacheType)) {
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
logger.info("Found older disk cache data lying around during initialization under path: {}", this.storagePath);
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
throw new OpenSearchException(String.format(CACHE_DATA_CLEANUP_DURING_INITIALIZATION_EXCEPTION, this.storagePath), e);

Check warning on line 166 in plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

View check run for this annotation

Codecov / codecov/patch

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java#L165-L166

Added lines #L165 - L166 were not covered by tests
}
}
}
if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) {
Expand All @@ -173,7 +175,7 @@
this.settings = Objects.requireNonNull(builder.getSettings(), "Settings objects shouldn't be null");
this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null");
this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null");
this.cacheManager = buildCacheManager();
this.cacheManager = EhcacheDiskCacheManager.getCacheManager(cacheType, this.storagePath, settings, this.threadPoolAlias);
Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null");
this.removalListener = builder.getRemovalListener();
Objects.requireNonNull(builder.getWeigher(), "Weigher can't be null");
Expand All @@ -189,73 +191,54 @@
}
}

// Package private for testing
PersistentCacheManager getCacheManager() {
return this.cacheManager;
}

@SuppressWarnings({ "rawtypes", "removal" })
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
// Creating the cache requires permissions specified in plugin-security.policy
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
try {
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_SEGMENT_KEY)
.get(settings);
if (builder.getNumberOfSegments() > 0) {
segmentCount = builder.getNumberOfSegments();
int segmentCount = (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings);
if (builder.getNumberOfSegments() > 0) {
segmentCount = builder.getNumberOfSegments();
}
CacheConfigurationBuilder<ICacheKey, ByteArrayWrapper> cacheConfigurationBuilder = CacheConfigurationBuilder
.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
ICacheKey.class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
.get(DISK_WRITE_CONCURRENCY_KEY)
.get(settings),
segmentCount
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
throw ex;
} catch (IllegalStateException ex) {
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
throw ex;
}
});

@Override
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(
ICacheKey key,
Supplier<? extends ByteArrayWrapper> oldValue,
ByteArrayWrapper newValue
) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_WRITE_CONCURRENCY_KEY).get(settings),
segmentCount
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer()); // We pass ByteArrayWrapperSerializer as ehcache's value serializer. If
// V is an interface, and we pass its
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
// before V hits ehcache.

return EhcacheDiskCacheManager.createCache(cacheType, this.diskCacheAlias, cacheConfigurationBuilder);
}

private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
Expand Down Expand Up @@ -470,21 +453,7 @@
@Override
@SuppressForbidden(reason = "Ehcache uses File.io")
public void close() {
try {
cacheManager.close();
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception occurred while trying to close ehcache manager"), e);
}
// Delete all the disk cache related files/data in case it is present
Path ehcacheDirectory = Paths.get(this.storagePath);
if (Files.exists(ehcacheDirectory)) {
try {
IOUtils.rm(ehcacheDirectory);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete ehcache disk cache data under path: {}", this.storagePath));
}
}

EhcacheDiskCacheManager.closeCache(cacheType, diskCacheAlias, storagePath);
}

/**
Expand Down Expand Up @@ -597,16 +566,24 @@
* Wrapper over ICacheKeySerializer which is compatible with ehcache's serializer requirements.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
public class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<ICacheKey> {
private ICacheKeySerializer<K> serializer;

/**
* Constructor for key serializer
* @param internalKeySerializer serializer for internal key
*/
public KeySerializerWrapper(Serializer<K, byte[]> internalKeySerializer) {
this.serializer = new ICacheKeySerializer<>(internalKeySerializer);
}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
/**
* This constructor must be present, but does not have to work as we are not actually persisting the disk
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
* .html#persistent-vs-transient-caches
* @param classLoader
* @param persistenceContext
*/
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
Expand All @@ -632,12 +609,19 @@
/**
* Wrapper allowing Ehcache to serialize ByteArrayWrapper.
*/
private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
public static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
/**
* Default constructor
*/
public ByteArrayWrapperSerializer() {}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
/**
* This constructor must be present, but does not have to work as we are not actually persisting the disk
* cache after a restart. See https://www.ehcache.org/documentation/3.0/serializers-copiers
* .html#persistent-vs-transient-caches
* @param classLoader
* @param persistenceContext
*/
public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
Expand Down Expand Up @@ -906,9 +890,13 @@
* A wrapper over byte[], with equals() that works using Arrays.equals().
* Necessary due to a limitation in how Ehcache compares byte[].
*/
static class ByteArrayWrapper {
public static class ByteArrayWrapper {
private final byte[] value;

/**
* Constructor for byte array wrapper.
* @param value value to wrap.
*/
public ByteArrayWrapper(byte[] value) {
this.value = value;
}
Expand Down
Loading
Loading