From 6997f5afe745ae920f0c6edce21b604211fb44d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grant=20Pal=C3=A1u=20Spencer?= Date: Mon, 10 Mar 2025 15:05:35 -0700 Subject: [PATCH 1/8] Change enabling hard constraint logging statement from INFO to DEBUG level (#3002) --- .../rebalancer/waged/constraints/ConstraintBasedAlgorithm.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index b03af0771c..c40ad88480 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -143,7 +143,7 @@ private Optional getNodeWithHighestPoints(AssignableReplica repl return Optional.empty(); } - LOG.info("Disabling hard constraint level logging for cluster: {}", clusterContext.getClusterName()); + LOG.debug("Disabling hard constraint level logging for cluster: {}", clusterContext.getClusterName()); removeFullLoggingForCluster(); return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node, From aa592c6c5ea79df18865fcf85530036f6c7cb3a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grant=20Pal=C3=A1u=20Spencer?= Date: Wed, 2 Apr 2025 13:34:01 -0700 Subject: [PATCH 2/8] Clear removed instances from the cached offline time map (#3012) Clear removed instances from the cached offline time map. This fixes a bug where stale entries in the map caused participants to be incorrectly deregistered after leaving and re-joining a cluster leveraging the participant auto deregistration feature. --- .../BaseControllerDataProvider.java | 3 ++ .../ParticipantDeregistrationStage.java | 2 - .../TestParticipantDeregistrationStage.java | 53 ++++++++++++++++++- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index 997f0f8aae..1b4c3b026c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -1049,6 +1049,9 @@ public void notifyDataChange(HelixConstants.ChangeType changeType, String pathCh private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { if (!_updateInstanceOfflineTime) { + // Clean up entries for nodes that have been removed from the cluster. This prevents a stale offline time from + // being used when the node is re-added to the cluster but before it updates its offline time. + _instanceOfflineTimeMap.keySet().retainAll(_allInstanceConfigCache.getPropertyMap().keySet()); return; } List offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet()); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java index a4fcab3e1f..ef950a185d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java @@ -10,8 +10,6 @@ import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.ParticipantHistory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline; diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java index 7e9bfabca2..0fad7e2d1c 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java @@ -8,14 +8,18 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -31,6 +35,7 @@ public class TestParticipantDeregistrationStage extends ZkTestBase { private HelixDataAccessor _dataAccessor; private ClusterControllerManager _controller; private ConfigAccessor _configAccessor; + private BestPossibleExternalViewVerifier _verifier; @BeforeClass public void beforeClass() { @@ -53,6 +58,9 @@ public void beforeClass() { _dataAccessor = _controller.getHelixDataAccessor(); setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); + + _verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); } // Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config @@ -208,7 +216,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { + new Date(System.currentTimeMillis())); long longDeregisterTimeout = 1000*60*60*24; long shortDeregisterTimeout = 1000; - setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); + setAutoDeregisterConfigs(CLUSTER_NAME, longDeregisterTimeout); // Create and immediately kill participants List killedParticipants = new ArrayList<>(); @@ -222,7 +230,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { Thread.sleep(shortDeregisterTimeout); // Trigger on shorten deregister timeout - setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); + setAutoDeregisterConfigs(CLUSTER_NAME, shortDeregisterTimeout); // Assert participants have been deregistered boolean result = TestHelper.verify(() -> { @@ -236,10 +244,51 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { killedParticipants.forEach(participant -> { dropParticipant(CLUSTER_NAME, participant); }); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); } + @Test + public void testParticipantDeregisteredAndRejoins() throws Exception { + String firstDB = "firstDB"; + _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, 3, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO.name(), null); + IdealState idealStateOne = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB); + idealStateOne.setMinActiveReplicas(2); + idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne); + // Set replica count to be # instances in cluster + 1, so that we can ensure that each participant assigned a replica + int replicaCount = _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME).size(); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, replicaCount); + + // Wait for cluster to converge + Assert.assertTrue(_verifier.verifyByPolling()); + + MockParticipantManager participantToDeregister = _participants.get(0); + participantToDeregister.syncStop(); + boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participant should have been deregistered"); + + addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); + + // Wait for cluster to converge + Assert.assertTrue(_verifier.verifyByPolling()); + + // loop through partitions and ensure that the participant is assigned to at least one partition + ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB); + boolean assigned = false; + for (String partition : ev.getPartitionSet()) { + if (ev.getStateMap(partition).containsKey(participantToDeregister.getInstanceName())) { + assigned = true; + break; + } + } + Assert.assertTrue(assigned, "Participant should have been assigned to at least one partition"); + } + @Override public void dropParticipant(String clusterName, MockParticipantManager participant) { _participants.remove(participant); From e25ac5d8d6745195fc3bd51e54fa9178cdcb3b20 Mon Sep 17 00:00:00 2001 From: Junkai Date: Thu, 3 Apr 2025 14:15:50 -0700 Subject: [PATCH 3/8] Add custom ideal state test --- .../rebalancer/TestCustomIdealState.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomIdealState.java index cd04ef56e6..7fc13bd912 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomIdealState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomIdealState.java @@ -19,11 +19,15 @@ * under the License. */ +import java.util.Arrays; import java.util.Date; +import org.apache.helix.HelixAdmin; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.TestDriver; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterSetup; import org.testng.annotations.Test; @@ -128,4 +132,39 @@ public void testDrop() throws Exception { deleteCluster(uniqClusterName); System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis())); } + + @Test + public void testCustomIdealState() throws Exception { + int numInstance = 5; + + String uniqClusterName = "TestCustomISForOFFLINEState"; + TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, 1, 1, numInstance, 1); + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + + admin.addResource(uniqClusterName, "OFFLINE_TEST", 1, "LeaderStandby", + IdealState.RebalanceMode.CUSTOMIZED.name()); + IdealState idealState = admin.getResourceIdealState(uniqClusterName, "OFFLINE_TEST"); + idealState.setPreferenceList("p0", + Arrays.asList("localhost_12918")); // clear the preference list + idealState.setPartitionState("p0", "localhost_12918", "OFFLINE"); + admin.updateIdealState(uniqClusterName, "OFFLINE_TEST", idealState); + for (int i = 0; i < numInstance; i++) { + TestDriver.startDummyParticipant(uniqClusterName, i); + } + TestDriver.startController(uniqClusterName);// set the partition state to OFFLINE + TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000); + + // drop resource group + ClusterSetup setup = new ClusterSetup(ZK_ADDR); + setup.dropResourceFromCluster(uniqClusterName, "TestDB0"); + + TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, uniqClusterName, + "TestDB0", + TestHelper.setOf("localhost_12918", "localhost_12919", "localhost_12920", "localhost_12921", + "localhost_12922"), ZK_ADDR); + + TestDriver.stopCluster(uniqClusterName); + deleteCluster(uniqClusterName); + System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis())); + } } From f87d948598b33de0788653c2438307bf089dece6 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Tue, 15 Apr 2025 09:37:40 -0700 Subject: [PATCH 4/8] Improve setInstanceOperation performance (#3017) This change will switch to parallel/async get on all instance configs, using HelixDataAccessor, and avoid calling findInstancesWithMatchingLogicalId for instance operation transitions where this check is not required. --- .../apache/helix/manager/zk/ZKHelixAdmin.java | 9 +- .../org/apache/helix/util/InstanceUtil.java | 179 ++++++++++++++---- 2 files changed, 142 insertions(+), 46 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index ae914682b4..0d72ac4aaa 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -211,7 +211,7 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { } List matchingLogicalIdInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (matchingLogicalIdInstances.size() > 1) { throw new HelixException( @@ -224,7 +224,8 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { InstanceConstants.InstanceOperation attemptedInstanceOperation = instanceConfig.getInstanceOperation().getOperation(); try { - InstanceUtil.validateInstanceOperationTransition(_configAccessor, clusterName, instanceConfig, + InstanceUtil.validateInstanceOperationTransition(_baseDataAccessor, clusterName, + instanceConfig, InstanceConstants.InstanceOperation.UNKNOWN, attemptedInstanceOperation); } catch (HelixException e) { instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN); @@ -616,7 +617,7 @@ public boolean canCompleteSwap(String clusterName, String instanceName) { } List swappingInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (swappingInstances.size() != 1) { logger.warn( @@ -655,7 +656,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, } List swappingInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (swappingInstances.size() != 1) { logger.warn( diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java index 967d561e74..e632f08570 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java @@ -20,23 +20,25 @@ */ import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import com.google.common.collect.ImmutableMap; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.zkclient.DataUpdater; public class InstanceUtil { @@ -45,27 +47,43 @@ private InstanceUtil() { } // Validators for instance operation transitions - private static final Function, Boolean> ALWAYS_ALLOWED = - (matchingInstances) -> true; - private static final Function, Boolean> ALL_MATCHES_ARE_UNKNOWN = - (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.UNKNOWN)); - private static final Function, Boolean> ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE = - (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.UNKNOWN) - || instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE)); - private static final Function, Boolean> ANY_MATCH_ENABLE_OR_DISABLE = - (matchingInstances) -> !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.ENABLE) || instance.getInstanceOperation() - .getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)); + private static final InstanceOperationValidator ALWAYS_ALLOWED = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> true; + private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return matchingInstances.isEmpty() || matchingInstances.stream().allMatch( + instance -> instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN)); + }; + private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return matchingInstances.isEmpty() || matchingInstances.stream().allMatch(instance -> + instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN) + || instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.EVACUATE)); + }; + private static final InstanceOperationValidator ANY_MATCH_ENABLE_OR_DISABLE = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch(instance -> + instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.ENABLE) + || instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.DISABLE)); + }; // Validator map for valid instance operation transitions :: - private static final ImmutableMap, Boolean>>> - validInstanceOperationTransitions = + private static final ImmutableMap> + VALID_INSTANCE_OPERATION_TRANSITIONS = ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in SWAP_IN and set to ENABLE in a transaction. ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED, @@ -100,22 +118,55 @@ private InstanceUtil() { * @param instanceConfig The current instance configuration * @param currentOperation The current operation * @param targetOperation The target operation + * @deprecated Use {@link #validateInstanceOperationTransition(BaseDataAccessor, String, InstanceConfig, InstanceConstants.InstanceOperation, InstanceConstants.InstanceOperation)} + * instead for better performance. */ + @Deprecated public static void validateInstanceOperationTransition(ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig, InstanceConstants.InstanceOperation currentOperation, InstanceConstants.InstanceOperation targetOperation) { - // Check if the current operation and target operation are in the valid transitions map - if (!validInstanceOperationTransitions.containsKey(currentOperation) - || !validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation)) { + + validateInstanceOperationTransition(null, configAccessor, clusterName, instanceConfig, + currentOperation, targetOperation); + } + + /** + * Validates if the transition from the current operation to the target operation is valid. + * + * @param baseDataAccessor The BaseDataAccessor instance + * @param clusterName The cluster name + * @param instanceConfig The current instance configuration + * @param currentOperation The current operation + * @param targetOperation The target operation + */ + public static void validateInstanceOperationTransition( + BaseDataAccessor baseDataAccessor, String clusterName, + InstanceConfig instanceConfig, + InstanceConstants.InstanceOperation currentOperation, + InstanceConstants.InstanceOperation targetOperation) { + + validateInstanceOperationTransition(baseDataAccessor, null, clusterName, instanceConfig, + currentOperation, targetOperation); + } + + private static void validateInstanceOperationTransition( + @Nullable BaseDataAccessor baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig, + InstanceConstants.InstanceOperation currentOperation, + InstanceConstants.InstanceOperation targetOperation) { + ImmutableMap transitionMap = + VALID_INSTANCE_OPERATION_TRANSITIONS.get(currentOperation); + + if (transitionMap == null || !transitionMap.containsKey(targetOperation)) { throw new HelixException( "Invalid instance operation transition from " + currentOperation + " to " + targetOperation); } - // Throw exception if the validation fails - if (!validInstanceOperationTransitions.get(currentOperation).get(targetOperation) - .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig))) { + InstanceOperationValidator validator = transitionMap.get(targetOperation); + if (validator == null || !validator.validate(baseDataAccessor, configAccessor, clusterName, + instanceConfig)) { throw new HelixException( "Failed validation for instance operation transition from " + currentOperation + " to " + targetOperation); @@ -130,6 +181,7 @@ public static void validateInstanceOperationTransition(ConfigAccessor configAcce * @param instanceConfig The instance configuration to match * @return A list of matching instances */ + @Deprecated public static List findInstancesWithMatchingLogicalId( ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) { String logicalIdKey = @@ -148,17 +200,57 @@ public static List findInstancesWithMatchingLogicalId( .collect(Collectors.toList()); } + /** + * Finds the instances that have a matching logical ID with the given instance. + * + * @param clusterName The cluster name + * @param instanceConfig The instance configuration to match + * @return A list of matching instances + */ + public static List findInstancesWithMatchingLogicalId( + BaseDataAccessor baseDataAccessor, String clusterName, + InstanceConfig instanceConfig) { + HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor); + + ClusterConfig clusterConfig = + helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig()); + String logicalIdKey = + ClusterTopologyConfig.createFromClusterConfig(clusterConfig).getEndNodeType(); + + List instanceConfigs = + helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(), true); + + // Retrieve and filter instances with matching logical ID + return instanceConfigs.stream().filter(potentialInstanceConfig -> + !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName()) + && potentialInstanceConfig.getLogicalId(logicalIdKey) + .equals(instanceConfig.getLogicalId(logicalIdKey))).collect(Collectors.toList()); + } + + private static List findInstancesWithMatchingLogicalId( + @Nullable BaseDataAccessor baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) { + if (baseDataAccessor == null && configAccessor == null) { + throw new HelixException( + "Both BaseDataAccessor and ConfigAccessor cannot be null at the same time"); + } + + return baseDataAccessor != null ? findInstancesWithMatchingLogicalId(baseDataAccessor, + clusterName, instanceConfig) + : findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig); + } + /** * Sets the instance operation for the given instance. * * @param configAccessor The ConfigAccessor instance - * @param baseAccessor The BaseDataAccessor instance + * @param baseDataAccessor The BaseDataAccessor instance * @param clusterName The cluster name * @param instanceName The instance name * @param instanceOperation The instance operation to set */ public static void setInstanceOperation(ConfigAccessor configAccessor, - BaseDataAccessor baseAccessor, String clusterName, String instanceName, + BaseDataAccessor baseDataAccessor, String clusterName, String instanceName, InstanceConfig.InstanceOperation instanceOperation) { String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); @@ -170,24 +262,22 @@ public static void setInstanceOperation(ConfigAccessor configAccessor, } // Validate the instance operation transition - validateInstanceOperationTransition(configAccessor, clusterName, instanceConfig, + validateInstanceOperationTransition(baseDataAccessor, configAccessor, clusterName, + instanceConfig, instanceConfig.getInstanceOperation().getOperation(), instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE : instanceOperation.getOperation()); // Update the instance operation - boolean succeeded = baseAccessor.update(path, new DataUpdater() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName - + ", participant config is null"); - } - - InstanceConfig config = new InstanceConfig(currentData); - config.setInstanceOperation(instanceOperation); - return config.getRecord(); + boolean succeeded = baseDataAccessor.update(path, currentData -> { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName + + ", participant config is null"); } + + InstanceConfig config = new InstanceConfig(currentData); + config.setInstanceOperation(instanceOperation); + return config.getRecord(); }, AccessOption.PERSISTENT); if (!succeeded) { @@ -195,4 +285,9 @@ public ZNRecord update(ZNRecord currentData) { "Failed to update instance operation. Please check if instance is disabled."); } } + + private interface InstanceOperationValidator { + boolean validate(@Nullable BaseDataAccessor baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig); + } } From a56e05dc3cae73b3d5ddbe9a3822364ee8b4b552 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Fri, 21 Feb 2025 11:08:52 -0800 Subject: [PATCH 5/8] pin xstreams in helix-rest/pom.xml to 1.4.19 as 1.4.21 is banned at LI --- helix-rest/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml index 24ddf0d621..e13a8c2c67 100644 --- a/helix-rest/pom.xml +++ b/helix-rest/pom.xml @@ -121,7 +121,7 @@ com.thoughtworks.xstream xstream - 1.4.21 + 1.4.19 com.fasterxml.jackson.core From d96a63e3a02f0963e3836510798797dc64392e56 Mon Sep 17 00:00:00 2001 From: Anubhav Agarwal Date: Thu, 27 Mar 2025 11:40:31 +0530 Subject: [PATCH 6/8] Add check for customized resources in isEvacuatedFinished - Add isInstanceDrained method in HelixAdmin - Expose the method via instance update rest end point - Change the conditional checks order in isEvacuateFinished to improve latency --- .../java/org/apache/helix/HelixAdmin.java | 2 ++ .../apache/helix/manager/zk/ZKHelixAdmin.java | 28 +++++++++++-------- .../org/apache/helix/mock/MockHelixAdmin.java | 5 ++++ .../server/resources/AbstractResource.java | 1 + .../resources/helix/PerInstanceAccessor.java | 10 +++++++ .../rest/server/TestPerInstanceAccessor.java | 6 ++++ 6 files changed, 41 insertions(+), 11 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 5c2ef10f20..a15dc50f66 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -807,6 +807,8 @@ default boolean isEvacuateFinished(String clusterName, String instancesNames) { throw new UnsupportedOperationException("isEvacuateFinished is not implemented."); } + boolean isInstanceDrained(String clusterName, String instanceName); + /** * Check to see if swapping between two instances can be completed. Either the swapOut or * swapIn instance can be passed in. diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 0d72ac4aaa..5dbc4d31a5 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -460,12 +460,17 @@ public void setInstanceOperation(String clusterName, String instanceName, @Override public boolean isEvacuateFinished(String clusterName, String instanceName) { - if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { - InstanceConfig config = getInstanceConfig(clusterName, instanceName); - return config != null && config.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE); + InstanceConfig config = getInstanceConfig(clusterName, instanceName); + if (config == null || !config.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.EVACUATE)) { + return false; } - return false; + return !instanceHasCurrentStateOrMessage(clusterName, instanceName); + } + + @Override + public boolean isInstanceDrained(String clusterName, String instanceName) { + return !instanceHasCurrentStateOrMessage(clusterName, instanceName); } /** @@ -721,7 +726,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, @Override public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) { - if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { + if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( config.getInstanceOperation().getOperation()); @@ -763,7 +768,7 @@ public boolean forceKillInstance(String clusterName, String instanceName, String * @param instanceName * @return */ - private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, + private boolean instanceHasCurrentStateOrMessage(String clusterName, String instanceName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -803,13 +808,14 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, return true; } - // Get set of FULL_AUTO resources + // Get set of FULL_AUTO and CUSTOMIZED resources List idealStates = accessor.getChildValues(keyBuilder.idealStates(), true); - Set fullAutoResources = idealStates != null ? idealStates.stream() - .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) + Set resources = idealStates != null ? idealStates.stream() + .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO || + idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) .map(IdealState::getResourceName).collect(Collectors.toSet()) : Collections.emptySet(); - return currentStates.stream().anyMatch(fullAutoResources::contains); + return currentStates.stream().anyMatch(resources::contains); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 5a1a8a5bcb..7f516345ae 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -577,6 +577,11 @@ public boolean isEvacuateFinished(String clusterName, String instancesNames) { return false; } + @Override + public boolean isInstanceDrained(String clusterName, String instancesNames) { + return false; + } + @Override public boolean canCompleteSwap(String clusterName, String instancesNames) { return false; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index 847eb802fa..13cabb1ee4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -90,6 +90,7 @@ public enum Command { completeSwapIfPossible, onDemandRebalance, isEvacuateFinished, + isInstanceDrained, setPartitionsToError, forceKillInstance } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index d82c177962..1a16e3b9e8 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -508,6 +508,16 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, return serverError(e); } return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", evacuateFinished))); + case isInstanceDrained: + boolean instanceDrained; + try { + instanceDrained = admin.isInstanceDrained(clusterId, instanceName); + } catch (HelixException e) { + LOG.error(String.format("Encountered error when checking if instance is drained for cluster: " + + "{}, instance: {}", clusterId, instanceName), e); + return serverError(e); + } + return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", instanceDrained))); case forceKillInstance: boolean instanceForceKilled = admin.forceKillInstance(clusterId, instanceName, reason, instanceOperationSource); if (!instanceForceKilled) { diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 32f47baae4..3ed432f00a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -580,6 +580,12 @@ public void updateInstance() throws Exception { Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertTrue(evacuateFinishedResult.get("successful")); + response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isInstanceDrained") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + Map instanceDrainedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class); + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + Assert.assertTrue(instanceDrainedResult.get("successful")); + // test isEvacuateFinished on instance with EVACUATE and no currentState // Create new instance so no currentState or messages assigned to it String test_instance_name = INSTANCE_NAME + "_foo"; From af4c7bcec0e17dbc83b9dcd4075994efeb2530af Mon Sep 17 00:00:00 2001 From: Anubhav Agarwal Date: Thu, 27 Mar 2025 12:39:56 +0530 Subject: [PATCH 7/8] Updated the documentation --- .../main/java/org/apache/helix/HelixAdmin.java | 18 ++++++++++++++---- .../apache/helix/manager/zk/ZKHelixAdmin.java | 7 ++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index a15dc50f66..68bfba86c2 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -800,14 +800,24 @@ Map validateInstancesForWagedRebalance(String clusterName, /** * Return if instance operation 'Evacuate' is finished. * @param clusterName - * @param instancesNames - * @return Return true if there is no current state nor pending message on the instance. + * @param instancesName + * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor + * any pending message on the instance. */ - default boolean isEvacuateFinished(String clusterName, String instancesNames) { + default boolean isEvacuateFinished(String clusterName, String instancesName) { throw new UnsupportedOperationException("isEvacuateFinished is not implemented."); } - boolean isInstanceDrained(String clusterName, String instanceName); + /** + * Check to see if instance is drained. + * @param clusterName + * @param instanceName + * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor + * any pending message on the instance. + */ + default boolean isInstanceDrained(String clusterName, String instanceName) { + throw new UnsupportedOperationException("isInstanceDrained is not implemented."); + } /** * Check to see if swapping between two instances can be completed. Either the swapOut or diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 5dbc4d31a5..29216cf4b4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -461,8 +461,8 @@ public void setInstanceOperation(String clusterName, String instanceName, @Override public boolean isEvacuateFinished(String clusterName, String instanceName) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); - if (config == null || !config.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE)) { + if (config == null || config.getInstanceOperation().getOperation() != + InstanceConstants.InstanceOperation.EVACUATE ) { return false; } return !instanceHasCurrentStateOrMessage(clusterName, instanceName); @@ -762,7 +762,8 @@ public boolean forceKillInstance(String clusterName, String instanceName, String } /** - * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline, + * Return true if instance has any resource with FULL_AUTO or CUSTOMIZED rebalance mode in current state or + * if instance has any pending message. Otherwise, return false if instance is offline, * instance has no active session, or if instance is online but has no current state or pending message. * @param clusterName * @param instanceName From 109154f1c45578bdaa262b37c36b77ef34ad9985 Mon Sep 17 00:00:00 2001 From: Xiaxuan Gao Date: Thu, 27 Mar 2025 12:08:27 -0700 Subject: [PATCH 8/8] fix interface name --- helix-core/src/main/java/org/apache/helix/HelixAdmin.java | 4 ++-- .../main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 68bfba86c2..5228ee499f 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -800,11 +800,11 @@ Map validateInstancesForWagedRebalance(String clusterName, /** * Return if instance operation 'Evacuate' is finished. * @param clusterName - * @param instancesName + * @param instancesNames * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor * any pending message on the instance. */ - default boolean isEvacuateFinished(String clusterName, String instancesName) { + default boolean isEvacuateFinished(String clusterName, String instancesNames) { throw new UnsupportedOperationException("isEvacuateFinished is not implemented."); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 29216cf4b4..48af677141 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -459,13 +459,13 @@ public void setInstanceOperation(String clusterName, String instanceName, } @Override - public boolean isEvacuateFinished(String clusterName, String instanceName) { - InstanceConfig config = getInstanceConfig(clusterName, instanceName); + public boolean isEvacuateFinished(String clusterName, String instanceNames) { + InstanceConfig config = getInstanceConfig(clusterName, instanceNames); if (config == null || config.getInstanceOperation().getOperation() != InstanceConstants.InstanceOperation.EVACUATE ) { return false; } - return !instanceHasCurrentStateOrMessage(clusterName, instanceName); + return !instanceHasCurrentStateOrMessage(clusterName, instanceNames); } @Override