Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
- Fix updated keyword field params leading to stale responses from request cache ([#19385](https://github.com/opensearch-project/OpenSearch/pull/19385))
- Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458))
- Cache serialised cluster state based on cluster state version and node version.([#19307](https://github.com/opensearch-project/OpenSearch/pull/19307))


### Dependencies
- Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static BytesReference createCompressedStream(Version version, CheckedCons
throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream))) {
// Version is set for performing serialization but is not transmitted over the wire.
stream.setVersion(version);
outputConsumer.accept(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class JoinHelper {
private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;
private final NamedWriteableRegistry namedWriteableRegistry;
private final AtomicReference<Tuple<Long, BytesReference>> serializedState = new AtomicReference<>();
private final AtomicReference<SerializedClusterStateCache> serializedClusterStateCache = new AtomicReference<>();

JoinHelper(
Settings settings,
Expand Down Expand Up @@ -263,7 +263,7 @@ private void runJoinValidators(
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState));
}

private void handleCompressedValidateJoinRequest(
protected void handleCompressedValidateJoinRequest(
Supplier<ClusterState> currentStateSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
BytesTransportRequest request
Expand Down Expand Up @@ -464,21 +464,25 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti
);
} else {
try {
final BytesReference bytes = serializedState.updateAndGet(cachedState -> {
if (cachedState == null || cachedState.v1() != state.version()) {
final BytesReference bytes = serializedClusterStateCache.updateAndGet(currentCache -> {
if (currentCache == null || state.version() != currentCache.getClusterStateVersion()) {
currentCache = new SerializedClusterStateCache(state.version());
}
if (currentCache.containsStateForNodeVersion(state.version(), node.getVersion()) == false) {
BytesReference compressedStream;
try {
return new Tuple<>(
state.version(),
CompressedStreamUtils.createCompressedStream(node.getVersion(), state::writeTo)
);
compressedStream = CompressedStreamUtils.createCompressedStream(node.getVersion(), state::writeTo);
} catch (IOException e) {
// mandatory as AtomicReference doesn't rethrow IOException.
throw new RuntimeException(e);
}
} else {
return cachedState;
return SerializedClusterStateCache.createNewCache(currentCache, node.getVersion(), compressedStream);
}
}).v2();
return currentCache;
// This will not be null as we reference to the new cache created which contains the serialized cluster state for the
// node version.
}).getStateForNodeVersion(state.version(), node.getVersion());
// Joining node version is read when deserializing the cluster state
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
transportService.sendRequest(
node,
Expand All @@ -493,6 +497,64 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti
}
}

/**
* Cache for serialized cluster state with keys cluster state version and opensearch version
*
* @opensearch.internal
*/
public static final class SerializedClusterStateCache {

private final Long clusterStateVersion;
private final Map<Version, BytesReference> serialisedClusterStateBySoftwareVersion;
private static final int MAX_VERSIONS_SIZE = 2;

public SerializedClusterStateCache(Long clusterStateVersion) {
this.clusterStateVersion = clusterStateVersion;
this.serialisedClusterStateBySoftwareVersion = Collections.emptyMap();
}

private SerializedClusterStateCache(
Long clusterStateVersion,
Map<Version, BytesReference> serialisedClusterStateBySoftwareVersion
) {
this.clusterStateVersion = clusterStateVersion;
this.serialisedClusterStateBySoftwareVersion = Collections.unmodifiableMap(
new HashMap<>(serialisedClusterStateBySoftwareVersion)
);
}

public Long getClusterStateVersion() {
return clusterStateVersion;
}

private boolean containsStateForNodeVersion(Long clusterStateVersion, Version softwareVersion) {
if (this.clusterStateVersion == null || !this.clusterStateVersion.equals(clusterStateVersion)) {
return false;
}
return serialisedClusterStateBySoftwareVersion.containsKey(softwareVersion);
}

private BytesReference getStateForNodeVersion(Long clusterStateVersion, Version softwareVersion) {
if (this.clusterStateVersion == null || !this.clusterStateVersion.equals(clusterStateVersion)) {
return null;
}
return serialisedClusterStateBySoftwareVersion.get(softwareVersion);
}

private static SerializedClusterStateCache createNewCache(
SerializedClusterStateCache serializedClusterStateCache,
Version versionToSerialize,
BytesReference bytes
) {
Map<Version, BytesReference> newMap = new HashMap<>(serializedClusterStateCache.serialisedClusterStateBySoftwareVersion);
if (newMap.size() == MAX_VERSIONS_SIZE) {
newMap.remove(newMap.keySet().iterator().next());
}
newMap.put(versionToSerialize, bytes);
return new SerializedClusterStateCache(serializedClusterStateCache.clusterStateVersion, newMap);
}
}

/**
* The callback interface.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@
import org.apache.logging.log4j.Level;
import org.opensearch.Version;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.admin.indices.rollover.RolloverInfo;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.Context;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.IngestionStatus;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -46,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand All @@ -64,7 +72,11 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -454,6 +466,71 @@ public void onFailure(Exception e) {
assertTrue(t.getCause().getMessage().contains("different cluster uuid"));
}

public void testCompressedValidateJoinRequestForDifferentNodeVersions() throws Exception {
TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, true); // Use capturing transport

ClusterState clusterState = testCluster.localClusterState;

// Test with 2.19 node
DiscoveryNode node219 = new DiscoveryNode("node219", buildNewFakeTransportAddress(), Version.V_2_19_0);
testCluster.joinHelper.sendValidateJoinRequest(node219, clusterState, new ActionListener<>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
logger.info("validation successful for 2.19 node");
}

@Override
public void onFailure(Exception e) {
logger.error("validation failed for 2.19 node", e);
}
});

// Test with 3.1 node
DiscoveryNode node31 = new DiscoveryNode("node31", buildNewFakeTransportAddress(), Version.V_3_1_0);
testCluster.joinHelper.sendValidateJoinRequest(node31, clusterState, new ActionListener<>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
logger.info("validation successful for 3.1 node");
}

@Override
public void onFailure(Exception e) {
logger.error("validation failed for 3.1 node", e);
}
});

// Verify both requests were sent with VALIDATE_COMPRESSED_JOIN_ACTION_NAME
CapturedRequest[] requests = testCluster.capturingTransport.getCapturedRequestsAndClear();
assertEquals(2, requests.length);

// Create JoinHelper for 2.19 node to test deserialization with same cluster UUID
BytesTransportRequest request219 = (BytesTransportRequest) requests[0].request;
try (StreamInput input = CompressedStreamUtils.decompressBytes(request219, namedWriteableRegistry)) {
ClusterState incomingState = ClusterState.readFrom(input, node219);
IndexMetadata indexMetadata = incomingState.metadata().index("test-index");
assertNotNull(indexMetadata.context());
assertEquals("context", indexMetadata.context().name());
assertNotNull(indexMetadata.context().params());
// Ingestion Status is set to default value by the IndexMetadata builder
assertNotNull(indexMetadata.getIngestionStatus());
assertFalse(indexMetadata.getIngestionStatus().isPaused());
}
logger.info("Decompression test passed for 2.19 node");

// Test on 3.1 node JoinHelper with same cluster UUID
BytesTransportRequest request31 = (BytesTransportRequest) requests[1].request;
try (StreamInput input = CompressedStreamUtils.decompressBytes(request31, namedWriteableRegistry)) {
ClusterState incomingState = ClusterState.readFrom(input, node31);
IndexMetadata indexMetadata = incomingState.metadata().index("test-index");
assertNotNull(indexMetadata.context());
assertEquals("context", indexMetadata.context().name());
assertNotNull(indexMetadata.context().params());
assertNotNull(indexMetadata.getIngestionStatus());
assertTrue(indexMetadata.getIngestionStatus().isPaused());
}
logger.info("Decompression test passed for 3.1 node");
}

private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturingTransport) {
version = version == null ? Version.CURRENT : version;
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
Expand All @@ -464,8 +541,59 @@ private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturin
CapturingTransport capturingTransport = new CapturingTransport();
DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), version);

// Create IndexMetadata with all fields set
String indexName = "test-index";
IndexMetadata.Builder indexBuilder = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, version)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_CREATION_DATE, System.currentTimeMillis())
.put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid")
.put(IndexMetadata.SETTING_PRIORITY, 100)
.put(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING.getKey(), 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "DOCUMENT")
.build()
)
.numberOfShards(1)
.numberOfReplicas(1)
.creationDate(System.currentTimeMillis())
.version(1)
.mappingVersion(1)
.settingsVersion(1)
.aliasesVersion(1)
.state(IndexMetadata.State.OPEN);

// Add mapping
Map<String, Object> mappingSource = new HashMap<>();
mappingSource.put("properties", Map.of("field1", Map.of("type", "text")));
indexBuilder.putMapping(new MappingMetadata("_doc", mappingSource));

// Add alias
indexBuilder.putAlias(AliasMetadata.builder("test-alias").build());

// Add custom data
indexBuilder.putCustom("test-custom", Map.of("key1", "value1"));

// Add in-sync allocation IDs
indexBuilder.putInSyncAllocationIds(0, Set.of("alloc-1", "alloc-2"));

// Add rollover info
indexBuilder.putRolloverInfo(new RolloverInfo("test-alias", Collections.emptyList(), 1000L));
indexBuilder.context(new Context("context"));
indexBuilder.ingestionStatus(new IngestionStatus(true));
IndexMetadata indexMetadata = indexBuilder.build();

final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true))
.metadata(
Metadata.builder()
.generateClusterUuidIfNeeded()
.clusterUUIDCommitted(true)
.indices(Map.of(indexName, indexMetadata))
.templates(Map.of(indexName, IndexTemplateMetadata.builder(indexName).patterns(List.of("pattern")).build()))
)
.build();
TransportService transportService;
if (isCapturingTransport) {
Expand Down
Loading