From dbf861a63cc1ad37a3c7954505c8bf2c1732d790 Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Tue, 16 Sep 2025 11:01:57 +0530 Subject: [PATCH] Cache serialised cluster state during node-join based on cluster state version and node version. Signed-off-by: Swetha Guptha --- CHANGELOG.md | 2 + .../coordination/CompressedStreamUtils.java | 1 + .../cluster/coordination/JoinHelper.java | 84 +++++++++-- .../cluster/coordination/JoinHelperTests.java | 130 +++++++++++++++++- 4 files changed, 205 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b31303556992..fba33a4a2d64f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320)) - Fix updated keyword field params leading to stale responses from request cache ([#19385](https://github.com/opensearch-project/OpenSearch/pull/19385)) - Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458)) +- Cache serialised cluster state based on cluster state version and node version.([#19307](https://github.com/opensearch-project/OpenSearch/pull/19307)) + ### Dependencies - Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400)) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java index dc7b203eb7c4b..c0f5ba60cfa83 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -38,6 +38,7 @@ public static BytesReference createCompressedStream(Version version, CheckedCons throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); try (StreamOutput stream = new OutputStreamStreamOutput(CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream))) { + // Version is set for performing serialization but is not transmitted over the wire. stream.setVersion(version); outputConsumer.accept(stream); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 9bf6bac07da53..4b04ea6464360 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -129,7 +129,7 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; - private final AtomicReference> serializedState = new AtomicReference<>(); + private final AtomicReference serializedClusterStateCache = new AtomicReference<>(); JoinHelper( Settings settings, @@ -263,7 +263,7 @@ private void runJoinValidators( joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState)); } - private void handleCompressedValidateJoinRequest( + protected void handleCompressedValidateJoinRequest( Supplier currentStateSupplier, Collection> joinValidators, BytesTransportRequest request @@ -464,21 +464,25 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti ); } else { try { - final BytesReference bytes = serializedState.updateAndGet(cachedState -> { - if (cachedState == null || cachedState.v1() != state.version()) { + final BytesReference bytes = serializedClusterStateCache.updateAndGet(currentCache -> { + if (currentCache == null || state.version() != currentCache.getClusterStateVersion()) { + currentCache = new SerializedClusterStateCache(state.version()); + } + if (currentCache.containsStateForNodeVersion(state.version(), node.getVersion()) == false) { + BytesReference compressedStream; try { - return new Tuple<>( - state.version(), - CompressedStreamUtils.createCompressedStream(node.getVersion(), state::writeTo) - ); + compressedStream = CompressedStreamUtils.createCompressedStream(node.getVersion(), state::writeTo); } catch (IOException e) { // mandatory as AtomicReference doesn't rethrow IOException. throw new RuntimeException(e); } - } else { - return cachedState; + return SerializedClusterStateCache.createNewCache(currentCache, node.getVersion(), compressedStream); } - }).v2(); + return currentCache; + // This will not be null as we reference to the new cache created which contains the serialized cluster state for the + // node version. + }).getStateForNodeVersion(state.version(), node.getVersion()); + // Joining node version is read when deserializing the cluster state final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); transportService.sendRequest( node, @@ -493,6 +497,64 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti } } + /** + * Cache for serialized cluster state with keys cluster state version and opensearch version + * + * @opensearch.internal + */ + public static final class SerializedClusterStateCache { + + private final Long clusterStateVersion; + private final Map serialisedClusterStateBySoftwareVersion; + private static final int MAX_VERSIONS_SIZE = 2; + + public SerializedClusterStateCache(Long clusterStateVersion) { + this.clusterStateVersion = clusterStateVersion; + this.serialisedClusterStateBySoftwareVersion = Collections.emptyMap(); + } + + private SerializedClusterStateCache( + Long clusterStateVersion, + Map serialisedClusterStateBySoftwareVersion + ) { + this.clusterStateVersion = clusterStateVersion; + this.serialisedClusterStateBySoftwareVersion = Collections.unmodifiableMap( + new HashMap<>(serialisedClusterStateBySoftwareVersion) + ); + } + + public Long getClusterStateVersion() { + return clusterStateVersion; + } + + private boolean containsStateForNodeVersion(Long clusterStateVersion, Version softwareVersion) { + if (this.clusterStateVersion == null || !this.clusterStateVersion.equals(clusterStateVersion)) { + return false; + } + return serialisedClusterStateBySoftwareVersion.containsKey(softwareVersion); + } + + private BytesReference getStateForNodeVersion(Long clusterStateVersion, Version softwareVersion) { + if (this.clusterStateVersion == null || !this.clusterStateVersion.equals(clusterStateVersion)) { + return null; + } + return serialisedClusterStateBySoftwareVersion.get(softwareVersion); + } + + private static SerializedClusterStateCache createNewCache( + SerializedClusterStateCache serializedClusterStateCache, + Version versionToSerialize, + BytesReference bytes + ) { + Map newMap = new HashMap<>(serializedClusterStateCache.serialisedClusterStateBySoftwareVersion); + if (newMap.size() == MAX_VERSIONS_SIZE) { + newMap.remove(newMap.keySet().iterator().next()); + } + newMap.put(versionToSerialize, bytes); + return new SerializedClusterStateCache(serializedClusterStateCache.clusterStateVersion, newMap); + } + } + /** * The callback interface. * diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 78c3b5d45a9ab..98deca8902a0e 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -34,10 +34,17 @@ import org.apache.logging.log4j.Level; import org.opensearch.Version; import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.admin.indices.rollover.RolloverInfo; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.Context; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.IngestionStatus; +import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; @@ -46,6 +53,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -64,7 +72,11 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -454,6 +466,71 @@ public void onFailure(Exception e) { assertTrue(t.getCause().getMessage().contains("different cluster uuid")); } + public void testCompressedValidateJoinRequestForDifferentNodeVersions() throws Exception { + TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, true); // Use capturing transport + + ClusterState clusterState = testCluster.localClusterState; + + // Test with 2.19 node + DiscoveryNode node219 = new DiscoveryNode("node219", buildNewFakeTransportAddress(), Version.V_2_19_0); + testCluster.joinHelper.sendValidateJoinRequest(node219, clusterState, new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + logger.info("validation successful for 2.19 node"); + } + + @Override + public void onFailure(Exception e) { + logger.error("validation failed for 2.19 node", e); + } + }); + + // Test with 3.1 node + DiscoveryNode node31 = new DiscoveryNode("node31", buildNewFakeTransportAddress(), Version.V_3_1_0); + testCluster.joinHelper.sendValidateJoinRequest(node31, clusterState, new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + logger.info("validation successful for 3.1 node"); + } + + @Override + public void onFailure(Exception e) { + logger.error("validation failed for 3.1 node", e); + } + }); + + // Verify both requests were sent with VALIDATE_COMPRESSED_JOIN_ACTION_NAME + CapturedRequest[] requests = testCluster.capturingTransport.getCapturedRequestsAndClear(); + assertEquals(2, requests.length); + + // Create JoinHelper for 2.19 node to test deserialization with same cluster UUID + BytesTransportRequest request219 = (BytesTransportRequest) requests[0].request; + try (StreamInput input = CompressedStreamUtils.decompressBytes(request219, namedWriteableRegistry)) { + ClusterState incomingState = ClusterState.readFrom(input, node219); + IndexMetadata indexMetadata = incomingState.metadata().index("test-index"); + assertNotNull(indexMetadata.context()); + assertEquals("context", indexMetadata.context().name()); + assertNotNull(indexMetadata.context().params()); + // Ingestion Status is set to default value by the IndexMetadata builder + assertNotNull(indexMetadata.getIngestionStatus()); + assertFalse(indexMetadata.getIngestionStatus().isPaused()); + } + logger.info("Decompression test passed for 2.19 node"); + + // Test on 3.1 node JoinHelper with same cluster UUID + BytesTransportRequest request31 = (BytesTransportRequest) requests[1].request; + try (StreamInput input = CompressedStreamUtils.decompressBytes(request31, namedWriteableRegistry)) { + ClusterState incomingState = ClusterState.readFrom(input, node31); + IndexMetadata indexMetadata = incomingState.metadata().index("test-index"); + assertNotNull(indexMetadata.context()); + assertEquals("context", indexMetadata.context().name()); + assertNotNull(indexMetadata.context().params()); + assertNotNull(indexMetadata.getIngestionStatus()); + assertTrue(indexMetadata.getIngestionStatus().isPaused()); + } + logger.info("Decompression test passed for 3.1 node"); + } + private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturingTransport) { version = version == null ? Version.CURRENT : version; DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( @@ -464,8 +541,59 @@ private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturin CapturingTransport capturingTransport = new CapturingTransport(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), version); + // Create IndexMetadata with all fields set + String indexName = "test-index"; + IndexMetadata.Builder indexBuilder = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, version) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_CREATION_DATE, System.currentTimeMillis()) + .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid") + .put(IndexMetadata.SETTING_PRIORITY, 100) + .put(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, "DOCUMENT") + .build() + ) + .numberOfShards(1) + .numberOfReplicas(1) + .creationDate(System.currentTimeMillis()) + .version(1) + .mappingVersion(1) + .settingsVersion(1) + .aliasesVersion(1) + .state(IndexMetadata.State.OPEN); + + // Add mapping + Map mappingSource = new HashMap<>(); + mappingSource.put("properties", Map.of("field1", Map.of("type", "text"))); + indexBuilder.putMapping(new MappingMetadata("_doc", mappingSource)); + + // Add alias + indexBuilder.putAlias(AliasMetadata.builder("test-alias").build()); + + // Add custom data + indexBuilder.putCustom("test-custom", Map.of("key1", "value1")); + + // Add in-sync allocation IDs + indexBuilder.putInSyncAllocationIds(0, Set.of("alloc-1", "alloc-2")); + + // Add rollover info + indexBuilder.putRolloverInfo(new RolloverInfo("test-alias", Collections.emptyList(), 1000L)); + indexBuilder.context(new Context("context")); + indexBuilder.ingestionStatus(new IngestionStatus(true)); + IndexMetadata indexMetadata = indexBuilder.build(); + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .metadata( + Metadata.builder() + .generateClusterUuidIfNeeded() + .clusterUUIDCommitted(true) + .indices(Map.of(indexName, indexMetadata)) + .templates(Map.of(indexName, IndexTemplateMetadata.builder(indexName).patterns(List.of("pattern")).build())) + ) .build(); TransportService transportService; if (isCapturingTransport) {