From 33ac2fc40fc85b2e07f7c17808c3287e1e84b454 Mon Sep 17 00:00:00 2001 From: r4mek Date: Thu, 6 Nov 2025 10:32:13 +0530 Subject: [PATCH 1/2] added rate limiting queues and refactored code to remove dead code --- .../provider/machinecontroller/controller.go | 12 + .../provider/machinecontroller/machine.go | 129 +++++---- .../machinecontroller/machine_safety.go | 48 ++-- .../machinecontroller/machine_safety_test.go | 6 +- .../machinecontroller/machine_test.go | 10 +- .../machinecontroller/machine_util.go | 251 ++++++++---------- .../machinecontroller/machine_util_test.go | 25 +- .../machinecontroller/machineclass.go | 10 +- 8 files changed, 243 insertions(+), 248 deletions(-) diff --git a/pkg/util/provider/machinecontroller/controller.go b/pkg/util/provider/machinecontroller/controller.go index 4a9a584a9..47d01e54b 100644 --- a/pkg/util/provider/machinecontroller/controller.go +++ b/pkg/util/provider/machinecontroller/controller.go @@ -18,6 +18,7 @@ import ( "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" "github.com/gardener/machine-controller-manager/pkg/util/provider/options" "github.com/gardener/machine-controller-manager/pkg/util/worker" + "golang.org/x/time/rate" machineinternal "github.com/gardener/machine-controller-manager/pkg/apis/machine" machinev1alpha1 "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" @@ -361,3 +362,14 @@ func (dc *controller) Run(workers int, stopCh <-chan struct{}) { waitGroup.Wait() } + +// CustomTypedControllerRateLimiter is a constructor for a custom rate limiter for a workqueue. It has +// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential. +// It is more configurable than DefaultTypedControllerRateLimiter and takes in machineHealthTimeout to be used as limit for item rate-limiter +func CustomTypedControllerRateLimiter[T comparable](maxDelay time.Duration) workqueue.TypedRateLimiter[T] { + return workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, maxDelay), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) +} diff --git a/pkg/util/provider/machinecontroller/machine.go b/pkg/util/provider/machinecontroller/machine.go index bdd317af7..ae5e209d5 100644 --- a/pkg/util/provider/machinecontroller/machine.go +++ b/pkg/util/provider/machinecontroller/machine.go @@ -114,6 +114,13 @@ func (c *controller) enqueueMachineAfter(obj any, after time.Duration, reason st } } +func (c *controller) enqueueMachineAfterRateLimiting(obj any, reason string) { + if key, ok := c.getKeyForObj(obj); ok { + klog.V(3).Infof("Adding machine object to queue %q after rate-limiting, reason: %s", key, reason) + c.machineQueue.AddRateLimited(key) + } +} + func (c *controller) enqueueMachineTermination(machine *v1alpha1.Machine, reason string) { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil { @@ -135,6 +142,16 @@ func (c *controller) enqueueMachineTerminationAfter(machine *v1alpha1.Machine, a } } +func (c *controller) enqueueMachineTerminationAfterRateLimiting(machine *v1alpha1.Machine, reason string) { + if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", machine, err)) + return + } else { + klog.V(3).Infof("Adding machine object to termination queue %q after rate-limiting, reason: %s", key, reason) + c.machineTerminationQueue.AddRateLimited(key) + } +} + func (c *controller) reconcileClusterMachineKey(key string) error { ctx := context.Background() @@ -165,19 +182,17 @@ func (c *controller) reconcileClusterMachineKey(key string) error { return nil } - retryPeriod, err := c.reconcileClusterMachine(ctx, machine) - - var reEnqueReason = "periodic reconcile" - if err != nil { - reEnqueReason = err.Error() + err = c.reconcileClusterMachine(ctx, machine) + if err == nil { + c.enqueueMachineAfter(machine, time.Duration(machineutils.LongRetry), "periodic reconcile") + } else { + c.enqueueMachineAfterRateLimiting(machine, err.Error()) } - c.enqueueMachineAfter(machine, time.Duration(retryPeriod), reEnqueReason) - return nil } -func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alpha1.Machine) error { klog.V(2).Infof("reconcileClusterMachine: Start for %q with phase:%q, description:%q", machine.Name, machine.Status.CurrentStatus.Phase, machine.Status.LastOperation.Description) defer klog.V(2).Infof("reconcileClusterMachine: Stop for %q", machine.Name) @@ -186,54 +201,54 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp // machine is not set for termination don't process it err := fmt.Errorf("Machine controller has frozen. Retrying reconcile after resync period") klog.Error(err) - return machineutils.LongRetry, err + return err } internalMachine := &machineapi.Machine{} if err := c.internalExternalScheme.Convert(machine, internalMachine, nil); err != nil { klog.Error(err) - return machineutils.LongRetry, err + return err } validationerr := validation.ValidateMachine(internalMachine) if validationerr.ToAggregate() != nil && len(validationerr.ToAggregate().Errors()) > 0 { err := fmt.Errorf("validation of Machine failed %s", validationerr.ToAggregate().Error()) klog.Error(err) - return machineutils.LongRetry, err + return err } // Validate MachineClass - machineClass, secretData, retry, err := c.ValidateMachineClass(ctx, &machine.Spec.Class) + machineClass, secretData, err := c.ValidateMachineClass(ctx, &machine.Spec.Class) if err != nil { klog.Errorf("cannot reconcile machine %s: %s", machine.Name, err) - return retry, err + return err } if machine.Labels[v1alpha1.NodeLabelKey] != "" && machine.Status.CurrentStatus.Phase != "" { // If reference to node object exists execute the below - retry, err := c.reconcileMachineHealth(ctx, machine) + err := c.reconcileMachineHealth(ctx, machine) if err != nil { - return retry, err + return err } - retry, err = c.syncMachineNameToNode(ctx, machine) + err = c.syncMachineNameToNode(ctx, machine) if err != nil { - return retry, err + return err } - retry, err = c.syncNodeTemplates(ctx, machine, machineClass) + err = c.syncNodeTemplates(ctx, machine, machineClass) if err != nil { - return retry, err + return err } - retry, err = c.updateNodeConditionBasedOnLabel(ctx, machine) + err = c.updateNodeConditionBasedOnLabel(ctx, machine) if err != nil { - return retry, err + return err } - retry, err = c.inPlaceUpdate(ctx, machine) + err = c.inPlaceUpdate(ctx, machine) if err != nil { - return retry, err + return err } } @@ -248,7 +263,7 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp ) } - return machineutils.LongRetry, nil + return nil } func (c *controller) reconcileClusterMachineTermination(key string) error { @@ -273,15 +288,15 @@ func (c *controller) reconcileClusterMachineTermination(key string) error { machine.Name, machine.Status.CurrentStatus.Phase, machine.Status.LastOperation.Description) defer klog.V(2).Infof("reconcileClusterMachineTermination: Stop for %q", machine.Name) - machineClass, secretData, retry, err := c.ValidateMachineClass(ctx, &machine.Spec.Class) + machineClass, secretData, err := c.ValidateMachineClass(ctx, &machine.Spec.Class) if err != nil { klog.Errorf("cannot reconcile machine %q: %s", machine.Name, err) - c.enqueueMachineTerminationAfter(machine, time.Duration(retry), err.Error()) + c.enqueueMachineTerminationAfterRateLimiting(machine, err.Error()) return err } // Process a delete event - retryPeriod, err := c.triggerDeletionFlow( + err = c.triggerDeletionFlow( ctx, &driver.DeleteMachineRequest{ Machine: machine, @@ -291,16 +306,15 @@ func (c *controller) reconcileClusterMachineTermination(key string) error { ) if err != nil { - c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), err.Error()) - return err + c.enqueueMachineTerminationAfterRateLimiting(machine, err.Error()) } else { // If the informer loses connection to the API server it may need to resync. // If a resource is deleted while the watch is down, the informer won’t get // delete event because the object is already gone. To avoid this edge-case, // a requeue is scheduled post machine deletion as well. - c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), "post-deletion reconcile") - return nil + c.enqueueMachineTerminationAfter(machine, time.Duration(machineutils.LongRetry), "post-deletion reconcile") } + return err } /* @@ -484,7 +498,7 @@ func addedOrRemovedEssentialTaints(oldNode, node *corev1.Node, taintKeys []strin Machine operations - Create, Delete */ -func (c *controller) triggerCreationFlow(ctx context.Context, createMachineRequest *driver.CreateMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) triggerCreationFlow(ctx context.Context, createMachineRequest *driver.CreateMachineRequest) error { var ( // Declarations nodeName, providerID string @@ -508,10 +522,10 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque // we should avoid mutating Secret, since it goes all the way into the Informer's store secretCopy := createMachineRequest.Secret.DeepCopy() if err := c.addBootstrapTokenToUserData(ctx, machine, secretCopy); err != nil { - return machineutils.ShortRetry, err + return err } if err := c.addMachineNameToUserData(machine, secretCopy); err != nil { - return machineutils.ShortRetry, err + return err } createMachineRequest.Secret = secretCopy @@ -530,7 +544,7 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque if !ok { // Error occurred with decoding machine error status, abort with retry. klog.Errorf("Error occurred while decoding machine error for machine %q: %s", machine.Name, err) - return machineutils.MediumRetry, err + return err } klog.Warningf("For machine %q, obtained VM error status as: %s", machineName, machineErr) // Decoding machine error code @@ -586,7 +600,7 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque } // machine obj marked Failed for double security - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -603,11 +617,11 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } klog.V(2).Infof("Machine %q marked Failed as VM was referring to a stale node object", machine.Name) - return machineutils.ShortRetry, err + return err } } uninitializedMachine = true @@ -644,11 +658,10 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque } //initialize VM if not initialized if uninitializedMachine { - var retryPeriod machineutils.RetryPeriod var initResponse *driver.InitializeMachineResponse - initResponse, retryPeriod, err = c.initializeMachine(ctx, clone, createMachineRequest.MachineClass, createMachineRequest.Secret) + initResponse, err = c.initializeMachine(ctx, clone, createMachineRequest.MachineClass, createMachineRequest.Secret) if err != nil { - return retryPeriod, err + return err } if c.targetCoreClient == nil { @@ -659,16 +672,16 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque } clone.Status.Addresses = buildAddressStatus(addresses, nodeName) if _, err := c.controlMachineClient.Machines(clone.Namespace).UpdateStatus(ctx, clone, metav1.UpdateOptions{}); err != nil { - return machineutils.ShortRetry, fmt.Errorf("failed to persist status addresses after initialization was successful: %w", err) + return fmt.Errorf("failed to persist status addresses after initialization was successful: %w", err) } } // Return error even when machine object is updated err = fmt.Errorf("machine creation in process. Machine initialization (if required) is successful") - return machineutils.ShortRetry, err + return err } if err != nil { - return machineutils.ShortRetry, err + return err } if machine.Status.CurrentStatus.Phase == "" || machine.Status.CurrentStatus.Phase == v1alpha1.MachineCrashLoopBackOff { @@ -703,9 +716,9 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque // Return error even when machine object is updated err = fmt.Errorf("machine creation in process. Machine/Status UPDATE successful") } - return machineutils.ShortRetry, err + return err } - return machineutils.LongRetry, nil + return nil } func (c *controller) updateLabels(ctx context.Context, machine *v1alpha1.Machine, nodeName, providerID string) (clone *v1alpha1.Machine, err error) { @@ -739,7 +752,7 @@ func (c *controller) updateLabels(ctx context.Context, machine *v1alpha1.Machine return clone, err } -func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass, secret *corev1.Secret) (resp *driver.InitializeMachineResponse, retry machineutils.RetryPeriod, err error) { +func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass, secret *corev1.Secret) (resp *driver.InitializeMachineResponse, err error) { req := &driver.InitializeMachineRequest{ Machine: machine, MachineClass: machineClass, @@ -751,14 +764,14 @@ func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Ma errStatus, ok := status.FromError(err) if !ok { klog.Errorf("Cannot decode Driver error for machine %q: %s. Unexpected behaviour as Driver errors are expected to be of type status.Status", machine.Name, err) - return nil, machineutils.LongRetry, err + return nil, err } if errStatus.Code() == codes.Unimplemented { klog.V(3).Infof("Provider does not support Driver.InitializeMachine - skipping VM instance initialization for %q.", machine.Name) - return nil, 0, nil + return nil, nil } klog.Errorf("Error occurred while initializing VM instance for machine %q: %s", machine.Name, err) - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -775,15 +788,15 @@ func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Ma machine.Status.LastKnownState, ) if updateErr != nil { - return nil, updateRetryPeriod, updateErr + return nil, updateErr } - return nil, machineutils.ShortRetry, err + return nil, err } klog.V(3).Infof("VM instance %q for machine %q was initialized", resp.ProviderID, machine.Name) - return resp, 0, nil + return resp, nil } -func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error { var ( machine = deleteMachineRequest.Machine finalizers = sets.NewString(machine.Finalizers...) @@ -797,7 +810,7 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque case !finalizers.Has(MCMFinalizerName): // If Finalizers are not present on machine err := fmt.Errorf("Machine %q is missing finalizers. Deletion cannot proceed", machine.Name) - return machineutils.LongRetry, err + return err case machine.Status.CurrentStatus.Phase != v1alpha1.MachineTerminating: return c.setMachineTerminationStatus(ctx, deleteMachineRequest) @@ -824,11 +837,11 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque return c.deleteNodeObject(ctx, machine) case strings.Contains(machine.Status.LastOperation.Description, machineutils.InitiateFinalizerRemoval): - _, err := c.deleteMachineFinalizers(ctx, machine) + err := c.deleteMachineFinalizers(ctx, machine) if err != nil { // Keep retrying until update goes through klog.Errorf("Machine finalizer REMOVAL failed for machine %q. Retrying, error: %s", machine.Name, err) - return machineutils.ShortRetry, err + return err } default: @@ -848,7 +861,7 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque */ klog.V(2).Infof("Machine %q with providerID %q and nodeName %q deleted successfully", machine.Name, getProviderID(machine), getNodeName(machine)) - return machineutils.LongRetry, nil + return nil } // buildAddressStatus adds the nodeName as a HostName address, if it is not empty, and returns a sorted and deduplicated diff --git a/pkg/util/provider/machinecontroller/machine_safety.go b/pkg/util/provider/machinecontroller/machine_safety.go index 13d4bff84..28c7700f4 100644 --- a/pkg/util/provider/machinecontroller/machine_safety.go +++ b/pkg/util/provider/machinecontroller/machine_safety.go @@ -27,23 +27,23 @@ import ( func (c *controller) reconcileClusterMachineSafetyOrphanVMs(_ string) error { ctx := context.Background() reSyncAfter := c.safetyOptions.MachineSafetyOrphanVMsPeriod.Duration - defer c.machineSafetyOrphanVMsQueue.AddAfter("", reSyncAfter) klog.V(3).Infof("reconcileClusterMachineSafetyOrphanVMs: Start") - defer klog.V(3).Infof("reconcileClusterMachineSafetyOrphanVMs: End, reSync-Period: %v", reSyncAfter) - retryPeriod, err := c.checkMachineClasses(ctx) - if err != nil { + if err := c.checkMachineClasses(ctx); err != nil { klog.Errorf("reconcileClusterMachineSafetyOrphanVMs: Error occurred while checking for orphan VMs: %s", err) - c.machineSafetyOrphanVMsQueue.AddAfter("", time.Duration(retryPeriod)) + c.machineSafetyOrphanVMsQueue.AddRateLimited("") + return nil } - retryPeriod, err = c.AnnotateNodesUnmanagedByMCM(ctx) - if err != nil { + if err := c.AnnotateNodesUnmanagedByMCM(ctx); err != nil { klog.Errorf("reconcileClusterMachineSafetyOrphanVMs: Error occurred while checking for nodes not handled by MCM: %s", err) - c.machineSafetyOrphanVMsQueue.AddAfter("", time.Duration(retryPeriod)) + c.machineSafetyOrphanVMsQueue.AddRateLimited("") + return nil } + klog.V(3).Infof("reconcileClusterMachineSafetyOrphanVMs: End, reSync-Period: %v", reSyncAfter) + c.machineSafetyOrphanVMsQueue.AddAfter("", reSyncAfter) return nil } @@ -151,17 +151,17 @@ func (c *controller) isAPIServerUp(ctx context.Context) bool { } // AnnotateNodesUnmanagedByMCM checks for nodes which are not handled by MCM and annotes them -func (c *controller) AnnotateNodesUnmanagedByMCM(ctx context.Context) (machineutils.RetryPeriod, error) { +func (c *controller) AnnotateNodesUnmanagedByMCM(ctx context.Context) error { if c.nodeLister == nil { // if running without a target cluster, we don't need to check for unmanaged nodes - return machineutils.LongRetry, nil + return nil } // list all the nodes on target cluster nodes, err := c.nodeLister.List(labels.Everything()) if err != nil { klog.Errorf("Safety-Net: Error getting nodes") - return machineutils.LongRetry, err + return err } for _, node := range nodes { machine, err := c.getMachineFromNode(node.Name) @@ -190,7 +190,7 @@ func (c *controller) AnnotateNodesUnmanagedByMCM(ctx context.Context) (machineut klog.V(3).Infof("Adding NotManagedByMCM annotation to Node %q", node.Name) // err is returned only when node update fails if err := c.updateNodeWithAnnotations(ctx, nodeCopy, annotations); err != nil { - return machineutils.MediumRetry, err + return err } } } else { @@ -202,40 +202,40 @@ func (c *controller) AnnotateNodesUnmanagedByMCM(ctx context.Context) (machineut nodeCopy := node.DeepCopy() delete(nodeCopy.Annotations, machineutils.NotManagedByMCM) if err := c.updateNodeWithAnnotations(ctx, nodeCopy, nil); err != nil { - return machineutils.MediumRetry, err + return err } } } - return machineutils.LongRetry, nil + return nil } // checkCommonMachineClass checks for orphan VMs in MachinesClasses -func (c *controller) checkMachineClasses(ctx context.Context) (machineutils.RetryPeriod, error) { +func (c *controller) checkMachineClasses(ctx context.Context) error { machineClasses, err := c.machineClassLister.List(labels.Everything()) if err != nil { klog.Error("Safety-Net: Error getting machineClasses") - return machineutils.LongRetry, err + return err } for _, machineClass := range machineClasses { - retry, err := c.checkMachineClass(ctx, machineClass) + err = c.checkMachineClass(ctx, machineClass) if err != nil { - return retry, err + return err } } - return machineutils.LongRetry, nil + return nil } // checkMachineClass checks a particular machineClass for orphan instances -func (c *controller) checkMachineClass(ctx context.Context, machineClass *v1alpha1.MachineClass) (machineutils.RetryPeriod, error) { +func (c *controller) checkMachineClass(ctx context.Context, machineClass *v1alpha1.MachineClass) error { // Get secret data secretData, err := c.getSecretData(machineClass.Name, machineClass.SecretRef, machineClass.CredentialsSecretRef) if err != nil { klog.Errorf("SafetyController: Secret Data could not be computed for MachineClass: %q", machineClass.Name) - return machineutils.LongRetry, err + return err } listMachineResponse, err := c.driver.ListMachines(ctx, &driver.ListMachinesRequest{ @@ -244,7 +244,7 @@ func (c *controller) checkMachineClass(ctx context.Context, machineClass *v1alph }) if err != nil { klog.Errorf("SafetyController: Failed to LIST VMs at provider. Error: %s", err) - return machineutils.LongRetry, err + return err } // making sure cache is updated .This is for cases where a new machine object is at etcd, but cache is unaware @@ -255,7 +255,7 @@ func (c *controller) checkMachineClass(ctx context.Context, machineClass *v1alph if !cache.WaitForCacheSync(stopCh, c.machineSynced) { klog.Errorf("SafetyController: Timed out waiting for caches to sync. Error: %s", err) - return machineutils.ShortRetry, err + return err } } @@ -301,7 +301,7 @@ func (c *controller) checkMachineClass(ctx context.Context, machineClass *v1alph } } - return machineutils.LongRetry, nil + return nil } // updateMachineToSafety enqueues into machineSafetyQueue when a machine is updated to particular status diff --git a/pkg/util/provider/machinecontroller/machine_safety_test.go b/pkg/util/provider/machinecontroller/machine_safety_test.go index e1914b0f8..003c34eff 100644 --- a/pkg/util/provider/machinecontroller/machine_safety_test.go +++ b/pkg/util/provider/machinecontroller/machine_safety_test.go @@ -324,7 +324,7 @@ var _ = Describe("safety_logic", func() { waitForCacheSync(stop, c) // call checkMachineClass to delete the orphan VMs - _, _ = c.checkMachineClass(context.TODO(), testMachineClass) + _ = c.checkMachineClass(context.TODO(), testMachineClass) // after this, the testmachine in crashloopbackoff phase // should remain and the other one should @@ -538,11 +538,11 @@ var _ = Describe("safety_logic", func() { defer trackers.Stop() waitForCacheSync(stop, c) - retry, err := c.AnnotateNodesUnmanagedByMCM(context.TODO()) + err := c.AnnotateNodesUnmanagedByMCM(context.TODO()) waitForCacheSync(stop, c) - Expect(retry).To(Equal(data.expect.retry)) + // Expect(retry).To(Equal(data.expect.retry)) if data.expect.err == nil { Expect(err).ShouldNot(HaveOccurred()) diff --git a/pkg/util/provider/machinecontroller/machine_test.go b/pkg/util/provider/machinecontroller/machine_test.go index e8534c87a..210bd0728 100644 --- a/pkg/util/provider/machinecontroller/machine_test.go +++ b/pkg/util/provider/machinecontroller/machine_test.go @@ -366,7 +366,7 @@ var _ = Describe("machine", func() { defer trackers.Stop() waitForCacheSync(stop, controller) - machineClass, secretData, _, err := controller.ValidateMachineClass(context.TODO(), data.action) + machineClass, secretData, err := controller.ValidateMachineClass(context.TODO(), data.action) if data.expect.machineClass == nil { Expect(machineClass).To(BeNil()) @@ -552,7 +552,7 @@ var _ = Describe("machine", func() { secret, err := controller.controlCoreClient.CoreV1().Secrets(objMeta.Namespace).Get(context.TODO(), machineClass.SecretRef.Name, metav1.GetOptions{}) Expect(err).ToNot(HaveOccurred()) - retry, err := controller.triggerCreationFlow( + err = controller.triggerCreationFlow( context.TODO(), &driver.CreateMachineRequest{ Machine: machine, @@ -570,7 +570,7 @@ var _ = Describe("machine", func() { Expect(err).To(BeNil()) Expect(actual.Spec.ProviderID).To(Equal(data.expect.machine.Spec.ProviderID)) Expect(actual.Finalizers).To(Equal(data.expect.machine.Finalizers)) - Expect(retry).To(Equal(data.expect.retry)) + // Expect(retry).To(Equal(data.expect.retry)) Expect(actual.Status.CurrentStatus.Phase).To(Equal(data.expect.machine.Status.CurrentStatus.Phase)) Expect(actual.Status.Addresses).To(Equal(data.expect.machine.Status.Addresses)) @@ -1567,7 +1567,7 @@ var _ = Describe("machine", func() { } // Deletion of machine is triggered - retry, err := controller.triggerDeletionFlow(context.TODO(), &driver.DeleteMachineRequest{ + err = controller.triggerDeletionFlow(context.TODO(), &driver.DeleteMachineRequest{ Machine: machine, MachineClass: machineClass, Secret: secret, @@ -1575,7 +1575,7 @@ var _ = Describe("machine", func() { if err != nil || data.expect.err != nil { Expect(err).To(Equal(data.expect.err)) } - Expect(retry).To(Equal(data.expect.retry)) + // Expect(retry).To(Equal(data.expect.retry)) machine, err = controller.controlMachineClient.Machines(objMeta.Namespace).Get(context.TODO(), action.machine, metav1.GetOptions{}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/util/provider/machinecontroller/machine_util.go b/pkg/util/provider/machinecontroller/machine_util.go index 4a77317e8..e2fed3a4d 100644 --- a/pkg/util/provider/machinecontroller/machine_util.go +++ b/pkg/util/provider/machinecontroller/machine_util.go @@ -76,30 +76,29 @@ const ( ) // ValidateMachineClass validates the machine class. -func (c *controller) ValidateMachineClass(_ context.Context, classSpec *v1alpha1.ClassSpec) (*v1alpha1.MachineClass, map[string][]byte, machineutils.RetryPeriod, error) { +func (c *controller) ValidateMachineClass(_ context.Context, classSpec *v1alpha1.ClassSpec) (*v1alpha1.MachineClass, map[string][]byte, error) { var ( machineClass *v1alpha1.MachineClass err error - retry = machineutils.LongRetry ) machineClass, err = c.machineClassLister.MachineClasses(c.namespace).Get(classSpec.Name) if err != nil { klog.Errorf("MachineClass %s/%s not found. Skipping. %v", c.namespace, classSpec.Name, err) - return nil, nil, retry, err + return nil, nil, err } internalMachineClass := &machineapi.MachineClass{} err = c.internalExternalScheme.Convert(machineClass, internalMachineClass, nil) if err != nil { klog.Warning("Error in scheme conversion") - return nil, nil, retry, err + return nil, nil, err } secretData, err := c.getSecretData(machineClass.Name, machineClass.SecretRef, machineClass.CredentialsSecretRef) if err != nil { klog.V(2).Infof("Could not compute secret data: %+v", err) - return nil, nil, retry, err + return nil, nil, err } if finalizers := sets.NewString(machineClass.Finalizers...); !finalizers.Has(MCMFinalizerName) { @@ -108,16 +107,16 @@ func (c *controller) ValidateMachineClass(_ context.Context, classSpec *v1alpha1 errMessage := fmt.Sprintf("The machine class %s has no finalizers set. So not reconciling the machine.", machineClass.Name) err := errors.New(errMessage) - return nil, nil, machineutils.ShortRetry, err + return nil, nil, err } err = c.validateNodeTemplate(machineClass.NodeTemplate) if err != nil { klog.Warning(err) - return nil, nil, machineutils.ShortRetry, err + return nil, nil, err } - return machineClass, secretData, retry, nil + return machineClass, secretData, nil } func (c *controller) getSecretData(machineClassName string, secretRefs ...*v1.SecretReference) (map[string][]byte, error) { @@ -234,19 +233,19 @@ func mergeDataMaps(in map[string][]byte, dataMaps ...map[string][]byte) map[stri // syncMachineNameToNode syncs the machine name on the corresponding node object // by adding a machine name label to its metadata. -func (c *controller) syncMachineNameToNode(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) syncMachineNameToNode(ctx context.Context, machine *v1alpha1.Machine) error { node, err := c.nodeLister.Get(getNodeName(machine)) if err != nil { if apierrors.IsNotFound(err) { // Don't return error so that other steps can be executed. - return machineutils.LongRetry, nil + return nil } klog.Errorf("Error occurred while trying to fetch node object - err: %s", err) - return machineutils.ShortRetry, err + return err } if node.Labels[machineutils.MachineLabelKey] == machine.Name { - return machineutils.LongRetry, nil + return nil } nodeCopy := node.DeepCopy() @@ -257,28 +256,25 @@ func (c *controller) syncMachineNameToNode(ctx context.Context, machine *v1alpha nodeCopy.Labels[machineutils.MachineLabelKey] = machine.Name if _, err := c.targetCoreClient.CoreV1().Nodes().Update(ctx, nodeCopy, metav1.UpdateOptions{}); err != nil { - if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err - } - return machineutils.ShortRetry, err + return err } - return machineutils.LongRetry, nil + return nil } -func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machine *v1alpha1.Machine) error { node, err := c.nodeLister.Get(getNodeName(machine)) if err != nil { if apierrors.IsNotFound(err) { // Don't return error so that other steps can be executed. - return machineutils.LongRetry, nil + return nil } klog.Errorf("Error occurred while trying to fetch node object - err: %s", err) - return machineutils.ShortRetry, err + return err } if !metav1.HasLabel(node.ObjectMeta, v1alpha1.LabelKeyNodeCandidateForUpdate) { - return machineutils.LongRetry, nil + return nil } nodeCopy := node.DeepCopy() @@ -319,7 +315,7 @@ func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machin updateCondition = true } else if inPlaceCond.Reason == v1alpha1.SelectedForUpdate { // node still not has been drained - return machineutils.MediumRetry, nil + return nil } } } @@ -327,7 +323,7 @@ func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machin if !updateCondition { if nodeCopy.Labels[v1alpha1.LabelKeyNodeUpdateResult] == v1alpha1.LabelValueNodeUpdateSuccessful { if inPlaceCond != nil && inPlaceCond.Reason == v1alpha1.UpdateSuccessful { - return machineutils.LongRetry, nil + return nil } nodeCopy = nodeops.AddOrUpdateCondition(nodeCopy, v1.NodeCondition{ Type: v1alpha1.NodeInPlaceUpdate, @@ -339,7 +335,7 @@ func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machin updateCondition = true } else if nodeCopy.Labels[v1alpha1.LabelKeyNodeUpdateResult] == v1alpha1.LabelValueNodeUpdateFailed { if inPlaceCond != nil && inPlaceCond.Reason == v1alpha1.UpdateFailed { - return machineutils.LongRetry, nil + return nil } nodeCopy = nodeops.AddOrUpdateCondition(nodeCopy, v1.NodeCondition{ @@ -356,46 +352,46 @@ func (c *controller) updateNodeConditionBasedOnLabel(ctx context.Context, machin if updateCondition { if _, err := c.targetCoreClient.CoreV1().Nodes().UpdateStatus(ctx, nodeCopy, metav1.UpdateOptions{}); err != nil { if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err + return err } - return machineutils.ShortRetry, err + return err } } - return machineutils.LongRetry, nil + return nil } -func (c *controller) inPlaceUpdate(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) inPlaceUpdate(ctx context.Context, machine *v1alpha1.Machine) error { cond, err := nodeops.GetNodeCondition(ctx, c.targetCoreClient, getNodeName(machine), v1alpha1.NodeInPlaceUpdate) if err != nil { if apierrors.IsNotFound(err) { // Don't return error so that other steps can be executed. - return machineutils.LongRetry, nil + return nil } - return machineutils.ShortRetry, err + return err } if cond == nil { - return machineutils.LongRetry, nil + return nil } // if the condition is present and the reason is selected for update then drain the node if cond.Reason == v1alpha1.SelectedForUpdate { - retry, err := c.drainNodeForInPlace(ctx, machine) + err := c.drainNodeForInPlace(ctx, machine) if err != nil { - return retry, err + return err } // if the node is drained successfully then fetch the node condition again cond, err = nodeops.GetNodeCondition(ctx, c.targetCoreClient, getNodeName(machine), v1alpha1.NodeInPlaceUpdate) if err != nil { - return machineutils.ShortRetry, err + return err } } if cond.Reason == v1alpha1.ReadyForUpdate { // give machine time for update to get applied - return machineutils.MediumRetry, fmt.Errorf("node %s is ready for in-place update", getNodeName(machine)) + return fmt.Errorf("node %s is ready for in-place update", getNodeName(machine)) } // if the condition is present and the reason is drain successful then the node is ready for update @@ -404,18 +400,18 @@ func (c *controller) inPlaceUpdate(ctx context.Context, machine *v1alpha1.Machin cond.LastTransitionTime = metav1.Now() cond.Message = "Node is ready for in-place update" if err := nodeops.AddOrUpdateConditionsOnNode(ctx, c.targetCoreClient, getNodeName(machine), *cond); err != nil { - return machineutils.ShortRetry, err + return err } // give machine time for update to get applied - return machineutils.MediumRetry, fmt.Errorf("node %s is ready for in-place update", getNodeName(machine)) + return fmt.Errorf("node %s is ready for in-place update", getNodeName(machine)) } - return machineutils.LongRetry, nil + return nil } -func (c *controller) updateMachineStatusAndNodeCondition(ctx context.Context, machine *v1alpha1.Machine, description string, state v1alpha1.MachineState, drainError error) (machineutils.RetryPeriod, error) { +func (c *controller) updateMachineStatusAndNodeCondition(ctx context.Context, machine *v1alpha1.Machine, description string, state v1alpha1.MachineState, drainError error) error { if drainError != nil { - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -432,10 +428,10 @@ func (c *controller) updateMachineStatusAndNodeCondition(ctx context.Context, ma ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return machineutils.ShortRetry, drainError + return drainError } // update machine status to indicate that the machine will undergo an in-place update @@ -457,13 +453,13 @@ func (c *controller) updateMachineStatusAndNodeCondition(ctx context.Context, ma // Keep retrying across reconciles until update goes through klog.Errorf("Update of Phase/Conditions failed for machine %q. Retrying, error: %q", machine.Name, err) if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err + return err } } cond, err := nodeops.GetNodeCondition(ctx, c.targetCoreClient, getNodeName(machine), v1alpha1.NodeInPlaceUpdate) if err != nil { - return machineutils.ShortRetry, err + return err } if cond == nil { @@ -479,17 +475,17 @@ func (c *controller) updateMachineStatusAndNodeCondition(ctx context.Context, ma cond.Message = "Node draining successful" if err := nodeops.AddOrUpdateConditionsOnNode(ctx, c.targetCoreClient, getNodeName(machine), *cond); err != nil { - return machineutils.ShortRetry, err + return err } - return machineutils.ShortRetry, err + return err } // syncNodeTemplates syncs nodeTemplates between machine, machineClass and corresponding node-object. // It ensures that any nodeTemplate element available on Machine should be available on node-object. // It ensures that MachineClass.NodeTemplate.VirtualCapacity is synced to the Node's Capacity. // Although there could be more elements already available on node-object which will not be touched. -func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass) (machineutils.RetryPeriod, error) { +func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass) error { var ( initializedNodeAnnotation bool currentlyAppliedALTJSONByte []byte @@ -502,10 +498,10 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma if err != nil { if apierrors.IsNotFound(err) { // Don't return error so that other steps can be executed. - return machineutils.LongRetry, nil + return nil } klog.Errorf("Error occurred while trying to fetch node object - err: %s", err) - return machineutils.ShortRetry, err + return err } nodeCopy := node.DeepCopy() @@ -522,7 +518,7 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma err = json.Unmarshal([]byte(lastAppliedALTJSONString), &lastAppliedALT) if err != nil { klog.Errorf("Error occurred while syncing node annotations, labels & taints: %s", err) - return machineutils.ShortRetry, err + return err } } @@ -531,7 +527,7 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma err = json.Unmarshal([]byte(lastAppliedVirtualCapacityJSONString), &lastAppliedVirtualCapacity) if err != nil { klog.Errorf("Error occurred while syncing node virtual capacity: %s", err) - return machineutils.ShortRetry, err + return err } } @@ -547,7 +543,7 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma } if !initializedNodeAnnotation && !annotationsChanged && !labelsChanged && !taintsChanged && !virtualCapacityChanged { - return machineutils.LongRetry, nil + return nil } // Update node-object with latest nodeTemplate elements if elements have changed. @@ -567,7 +563,7 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma currentlyAppliedALTJSONByte, err = json.Marshal(lastAppliedALT) if err != nil { klog.Errorf("Error occurred while syncing node annotations, labels & taints: %s", err) - return machineutils.ShortRetry, err + return err } nodeCopy.Annotations[machineutils.LastAppliedALTAnnotation] = string(currentlyAppliedALTJSONByte) } @@ -578,13 +574,13 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma nodeUpdated, err := c.targetCoreClient.CoreV1().Nodes().UpdateStatus(ctx, nodeCopy, metav1.UpdateOptions{}) if err != nil { klog.Errorf("UpdateStatus failed for node %q of machine %q. error: %q", node.Name, machine.Name, err) - return machineutils.ShortRetry, err + return err } klog.V(3).Infof("node.Status.Capacity of node %q updated to: %v", node.Name, nodeUpdated.Status.Capacity) currentlyAppliedVirtualCapacityJSONByte, err = json.Marshal(desiredVirtualCapacity) if err != nil { klog.Errorf("Error occurred while syncing node virtual capacity of node %q: %v", node.Name, err) - return machineutils.ShortRetry, err + return err } nodeCopy = nodeUpdated.DeepCopy() if len(desiredVirtualCapacity) == 0 { @@ -603,11 +599,7 @@ func (c *controller) syncNodeTemplates(ctx context.Context, machine *v1alpha1.Ma err = errSuccessfulALTsync } - if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err - } - return machineutils.ShortRetry, err - + return err } // SyncMachineAnnotations syncs the annotations of the machine with node-objects. @@ -795,28 +787,25 @@ func SyncVirtualCapacity(desiredVirtualCapacity v1.ResourceList, node *v1.Node, // machineCreateErrorHandler updates the machine status based on // CreateMachineResponse and the error during the machine creation -func (c *controller) machineCreateErrorHandler(ctx context.Context, machine *v1alpha1.Machine, createMachineResponse *driver.CreateMachineResponse, err error) (machineutils.RetryPeriod, error) { +func (c *controller) machineCreateErrorHandler(ctx context.Context, machine *v1alpha1.Machine, createMachineResponse *driver.CreateMachineResponse, err error) error { var ( - retryRequired = machineutils.MediumRetry lastKnownState string ) machineErr, ok := status.FromError(err) if ok { - switch machineErr.Code() { - case codes.ResourceExhausted: - retryRequired = machineutils.LongRetry - lastKnownState = machine.Status.LastKnownState - case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable: - retryRequired = machineutils.ShortRetry - lastKnownState = machine.Status.LastKnownState - } + // switch machineErr.Code() { + // case codes.ResourceExhausted: + // lastKnownState = machine.Status.LastKnownState + // case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable: + // } + lastKnownState = machine.Status.LastKnownState } if createMachineResponse != nil && createMachineResponse.LastKnownState != "" { lastKnownState = createMachineResponse.LastKnownState } - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -834,10 +823,10 @@ func (c *controller) machineCreateErrorHandler(ctx context.Context, machine *v1a ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return retryRequired, err + return err } func (c *controller) machineStatusUpdate( @@ -846,7 +835,7 @@ func (c *controller) machineStatusUpdate( lastOperation v1alpha1.LastOperation, currentStatus v1alpha1.CurrentStatus, lastKnownState string, -) (machineutils.RetryPeriod, error) { +) error { clone := machine.DeepCopy() clone.Status.LastOperation = lastOperation clone.Status.CurrentStatus = currentStatus @@ -854,7 +843,7 @@ func (c *controller) machineStatusUpdate( if isMachineStatusSimilar(clone.Status, machine.Status) { klog.V(3).Infof("Not updating the status of the machine object %q, as the content is similar", clone.Name) - return machineutils.ShortRetry, nil + return nil } _, err := c.controlMachineClient.Machines(clone.Namespace).UpdateStatus(ctx, clone, metav1.UpdateOptions{}) @@ -865,11 +854,7 @@ func (c *controller) machineStatusUpdate( klog.V(2).Infof("Machine/status UPDATE for %q", machine.Name) } - if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err - } - - return machineutils.ShortRetry, err + return err } // isMachineStatusSimilar checks if the status of 2 machines is similar or not. @@ -913,7 +898,7 @@ func (c *controller) getCreateFailurePhase(machine *v1alpha1.Machine) v1alpha1.M // reconcileMachineHealth updates the machine object with // any change in node conditions or health -func (c *controller) reconcileMachineHealth(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) reconcileMachineHealth(ctx context.Context, machine *v1alpha1.Machine) error { var ( cloneDirty = false clone = machine.DeepCopy() @@ -926,7 +911,7 @@ func (c *controller) reconcileMachineHealth(ctx context.Context, machine *v1alph if !apierrors.IsNotFound(err) { // Any other types of errors while fetching node object klog.Errorf("Could not fetch node object for machine %q", machine.Name) - return machineutils.ShortRetry, err + return err } // Node object is not found if len(machine.Status.Conditions) > 0 && @@ -1156,19 +1141,16 @@ func (c *controller) reconcileMachineHealth(ctx context.Context, machine *v1alph if err != nil { // Keep retrying across reconciles until update goes through klog.Errorf("Update of Phase/Conditions failed for machine %q. Retrying, error: %q", machine.Name, err) - if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err - } } else { klog.V(2).Infof("Machine Phase/Conditions have been updated for %q with providerID %q and are in sync with backing node %q", machine.Name, getProviderID(machine), getNodeName(machine)) // Return error to end the reconcile err = errSuccessfulPhaseUpdate } - return machineutils.ShortRetry, err + return err } - return machineutils.LongRetry, nil + return nil } func getFormattedNodeConditions(conditions []v1.NodeCondition) string { @@ -1188,7 +1170,7 @@ func getFormattedNodeConditions(conditions []v1.NodeCondition) string { Manipulate Finalizers */ -func (c *controller) addMachineFinalizers(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) addMachineFinalizers(ctx context.Context, machine *v1alpha1.Machine) error { if finalizers := sets.NewString(machine.Finalizers...); !finalizers.Has(MCMFinalizerName) { finalizers.Insert(MCMFinalizerName) @@ -1204,13 +1186,13 @@ func (c *controller) addMachineFinalizers(ctx context.Context, machine *v1alpha1 err = fmt.Errorf("Machine creation in process. Machine finalizers are UPDATED") } - return machineutils.ShortRetry, err + return err } - return machineutils.ShortRetry, nil + return nil } -func (c *controller) deleteMachineFinalizers(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) deleteMachineFinalizers(ctx context.Context, machine *v1alpha1.Machine) error { if finalizers := sets.NewString(machine.Finalizers...); finalizers.Has(MCMFinalizerName) { finalizers.Delete(MCMFinalizerName) @@ -1220,14 +1202,14 @@ func (c *controller) deleteMachineFinalizers(ctx context.Context, machine *v1alp if err != nil { // Keep retrying until update goes through klog.Errorf("Failed to delete finalizers for machine %q: %s", machine.Name, err) - return machineutils.ShortRetry, err + return err } klog.V(2).Infof("Removed finalizer to machine %q with providerID %q and backing node %q", machine.Name, getProviderID(machine), getNodeName(machine)) - return machineutils.LongRetry, nil + return nil } - return machineutils.LongRetry, nil + return nil } /* @@ -1282,7 +1264,7 @@ func isPendingMachineWithCriticalComponentsNotReadyTaint(clone *v1alpha1.Machine */ // setMachineTerminationStatus set's the machine status to terminating -func (c *controller) setMachineTerminationStatus(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) setMachineTerminationStatus(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error { clone := deleteMachineRequest.Machine.DeepCopy() clone.Status.LastOperation = v1alpha1.LastOperation{ Description: machineutils.GetVMStatus, @@ -1306,16 +1288,12 @@ func (c *controller) setMachineTerminationStatus(ctx context.Context, deleteMach err = fmt.Errorf("Machine deletion in process. Phase set to termination") } - if apierrors.IsConflict(err) { - return machineutils.ConflictRetry, err - } - return machineutils.ShortRetry, err + return err } // updateMachineStatusAndNodeLabel tries to update the node name label if it is empty. This is required for drain to happen. -func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMachineStatusRequest *driver.GetMachineStatusRequest) (machineutils.RetryPeriod, error) { +func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMachineStatusRequest *driver.GetMachineStatusRequest) error { var ( - retry machineutils.RetryPeriod description string state v1alpha1.MachineState err error @@ -1328,7 +1306,6 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac if c.targetCoreClient == nil { description = "Running without target cluster, skipping node drain and volume attachment deletion. " + machineutils.InitiateVMDeletion state = v1alpha1.MachineStateProcessing - retry = machineutils.ShortRetry } else if nodeName != "" { isNodeLabelUpdated = true } else { @@ -1336,7 +1313,7 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac nodeName, err = c.getNodeName(ctx, getMachineStatusRequest) if err == nil { if err = c.updateMachineNodeLabel(ctx, getMachineStatusRequest.Machine, nodeName); err != nil { - return machineutils.ShortRetry, err + return err } isNodeLabelUpdated = true } else { @@ -1344,7 +1321,6 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac // Error occurred with decoding machine error status, aborting without retry. description = "Error occurred with decoding machine error status while getting VM status, aborting without retry. " + err.Error() + " " + machineutils.GetVMStatus state = v1alpha1.MachineStateFailed - retry = machineutils.LongRetry err = fmt.Errorf("machine deletion has failed. %s", description) } else { // Decoding machine error code @@ -1354,25 +1330,20 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac // In this case, try to drain and delete description = machineutils.InitiateDrain state = v1alpha1.MachineStateProcessing - retry = machineutils.ShortRetry case codes.NotFound: // VM was not found at provider, proceed to initiateDrain to ensure associated orphan resources such as NICs are deleted in the next few steps, before node object is deleted description = "VM was not found at provider. Moving forward to node drain. " + machineutils.InitiateDrain state = v1alpha1.MachineStateProcessing - retry = machineutils.ShortRetry case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable: description = "Error occurred with decoding machine error status while getting VM status, aborting with retry. " + machineutils.GetVMStatus state = v1alpha1.MachineStateFailed - retry = machineutils.ShortRetry case codes.Uninitialized: description = "VM instance was not initialized. Moving forward to node drain. " + machineutils.InitiateDrain state = v1alpha1.MachineStateProcessing - retry = machineutils.ShortRetry default: // Error occurred with decoding machine error status, abort with retry. description = "Error occurred with decoding machine error status while getting VM status, aborting without retry. machine code: " + err.Error() + " " + machineutils.GetVMStatus state = v1alpha1.MachineStateFailed - retry = machineutils.MediumRetry } } } @@ -1380,11 +1351,10 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac if isNodeLabelUpdated { description = machineutils.InitiateDrain state = v1alpha1.MachineStateProcessing - retry = machineutils.ShortRetry // Return error even when machine object is updated to ensure reconcilation is restarted err = fmt.Errorf("machine deletion in process. VM with matching ID found") } - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, getMachineStatusRequest.Machine, v1alpha1.LastOperation{ @@ -1400,9 +1370,9 @@ func (c *controller) updateMachineStatusAndNodeLabel(ctx context.Context, getMac getMachineStatusRequest.Machine.Status.LastKnownState, ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return retry, err + return err } // isConditionEmpty returns true if passed NodeCondition is empty @@ -1421,7 +1391,7 @@ func printLogInitError(s string, err *error, description *string, machine *v1alp } } -func (c *controller) drainNodeForInPlace(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) drainNodeForInPlace(ctx context.Context, machine *v1alpha1.Machine) error { var ( // Declarations node *v1.Node @@ -1551,7 +1521,7 @@ func (c *controller) drainNodeForInPlace(ctx context.Context, machine *v1alpha1. } // drainNode attempts to drain the node backed by the machine object -func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error { var ( // Declarations err error @@ -1719,7 +1689,7 @@ func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver } } - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -1736,41 +1706,38 @@ func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return machineutils.ShortRetry, err + return err } // deleteNodeVolAttachments deletes VolumeAttachment(s) for a node before moving to VM deletion stage. -func (c *controller) deleteNodeVolAttachments(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) deleteNodeVolAttachments(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error { var ( description string state v1alpha1.MachineState machine = deleteMachineRequest.Machine nodeName = machine.Labels[v1alpha1.NodeLabelKey] - retryPeriod = machineutils.ShortRetry ) node, err := c.nodeLister.Get(nodeName) if err != nil { if !apierrors.IsNotFound(err) { // an error other than NotFound, let us try again later. - return retryPeriod, err + return err } // node not found move to vm deletion description = fmt.Sprintf("Skipping deleteNodeVolAttachments due to - %s. Moving to VM Deletion. %s", err.Error(), machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateProcessing - retryPeriod = 0 } else if len(node.Status.VolumesAttached) == 0 { description = fmt.Sprintf("Node Volumes for node: %s are already detached. Moving to VM Deletion. %s", nodeName, machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateProcessing - retryPeriod = 0 } else { // case: where node.Status.VolumesAttached > 0 liveNodeVolAttachments, err := getLiveVolumeAttachmentsForNode(c.volumeAttachementLister, nodeName, machine.Name) if err != nil { klog.Errorf("(deleteNodeVolAttachments) Error obtaining VolumeAttachment(s) for node %q, machine %q: %s", nodeName, machine.Name, err) - return retryPeriod, err + return err } if len(liveNodeVolAttachments) != 0 { err = deleteVolumeAttachmentsForNode(ctx, c.targetCoreClient.StorageV1().VolumeAttachments(), nodeName, liveNodeVolAttachments) @@ -1779,14 +1746,14 @@ func (c *controller) deleteNodeVolAttachments(ctx context.Context, deleteMachine } else { klog.V(3).Infof("(deleteNodeVolAttachments) Successfully deleted all volume attachments for node %q, machine %q", nodeName, machine.Name) } - return retryPeriod, nil + return nil } description = fmt.Sprintf("No Live VolumeAttachments for node: %s. Moving to VM Deletion. %s", nodeName, machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateProcessing } now := metav1.Now() klog.V(4).Infof("(deleteVolumeAttachmentsForNode) For node %q, machine %q, set LastOperation.Description: %q", nodeName, machine.Name, description) - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -1800,17 +1767,16 @@ func (c *controller) deleteNodeVolAttachments(ctx context.Context, deleteMachine ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return retryPeriod, err + return err } // deleteVM attempts to delete the VM backed by the machine object -func (c *controller) deleteVM(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) { +func (c *controller) deleteVM(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error { var ( machine = deleteMachineRequest.Machine - retryRequired machineutils.RetryPeriod description string state v1alpha1.MachineState lastKnownState string @@ -1824,26 +1790,21 @@ func (c *controller) deleteVM(ctx context.Context, deleteMachineRequest *driver. if machineErr, ok := status.FromError(err); ok { switch machineErr.Code() { case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable: - retryRequired = machineutils.ShortRetry description = fmt.Sprintf("VM deletion failed due to - %s. However, will re-try in the next resync. %s", err.Error(), machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateFailed case codes.NotFound: - retryRequired = machineutils.ShortRetry description = fmt.Sprintf("VM not found. Continuing deletion flow. %s", machineutils.InitiateNodeDeletion) state = v1alpha1.MachineStateProcessing default: - retryRequired = machineutils.LongRetry description = fmt.Sprintf("VM deletion failed due to - %s. Aborting operation. %s", err.Error(), machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateFailed } } else { - retryRequired = machineutils.LongRetry description = fmt.Sprintf("Error occurred while decoding machine error: %s. %s", err.Error(), machineutils.InitiateVMDeletion) state = v1alpha1.MachineStateFailed } } else { - retryRequired = machineutils.ShortRetry description = fmt.Sprintf("VM deletion was successful. %s", machineutils.InitiateNodeDeletion) state = v1alpha1.MachineStateProcessing @@ -1854,7 +1815,7 @@ func (c *controller) deleteVM(ctx context.Context, deleteMachineRequest *driver. lastKnownState = deleteMachineResponse.LastKnownState } - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -1871,14 +1832,14 @@ func (c *controller) deleteVM(ctx context.Context, deleteMachineRequest *driver. ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return retryRequired, err + return err } // deleteNodeObject attempts to delete the node object backed by the machine object -func (c *controller) deleteNodeObject(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { +func (c *controller) deleteNodeObject(ctx context.Context, machine *v1alpha1.Machine) error { var ( err error description string @@ -1913,7 +1874,7 @@ func (c *controller) deleteNodeObject(ctx context.Context, machine *v1alpha1.Mac err = fmt.Errorf("Machine deletion in process. No node object found") } - updateRetryPeriod, updateErr := c.machineStatusUpdate( + updateErr := c.machineStatusUpdate( ctx, machine, v1alpha1.LastOperation{ @@ -1930,10 +1891,10 @@ func (c *controller) deleteNodeObject(ctx context.Context, machine *v1alpha1.Mac ) if updateErr != nil { - return updateRetryPeriod, updateErr + return updateErr } - return machineutils.ShortRetry, err + return err } // getEffectiveDrainTimeout returns the drainTimeout set on the machine-object, otherwise returns the timeout set using the global-flag. @@ -2155,7 +2116,7 @@ func setTerminationReasonByPhase(phase v1alpha1.MachinePhase, terminationConditi } } -func (c *controller) tryMarkingMachineFailed(ctx context.Context, machine, clone *v1alpha1.Machine, machineDeployName, description string, lockAcquireTimeout time.Duration) (machineutils.RetryPeriod, error) { +func (c *controller) tryMarkingMachineFailed(ctx context.Context, machine, clone *v1alpha1.Machine, machineDeployName, description string, lockAcquireTimeout time.Duration) error { if c.permitGiver.TryPermit(machineDeployName, lockAcquireTimeout) { defer c.permitGiver.ReleasePermit(machineDeployName) markable, err := c.canMarkMachineFailed(machineDeployName, machine.Name, machine.Namespace, maxReplacements) @@ -2177,13 +2138,13 @@ func (c *controller) tryMarkingMachineFailed(ctx context.Context, machine, clone err = fmt.Errorf("machine %q couldn't be marked FAILED, other machines are getting replaced", machine.Name) } } - return machineutils.ShortRetry, err + return err } klog.Warningf("Timedout waiting to acquire lock for machine %q", machine.Name) err := fmt.Errorf("timedout waiting to acquire lock for machine %q", machine.Name) - return machineutils.ShortRetry, err + return err } func getLiveVolumeAttachmentsForNode(volAttachLister storagelisters.VolumeAttachmentLister, nodeName string, machineName string) ([]*storagev1.VolumeAttachment, error) { diff --git a/pkg/util/provider/machinecontroller/machine_util_test.go b/pkg/util/provider/machinecontroller/machine_util_test.go index 53e92a6ba..c83d326d9 100644 --- a/pkg/util/provider/machinecontroller/machine_util_test.go +++ b/pkg/util/provider/machinecontroller/machine_util_test.go @@ -9,9 +9,10 @@ import ( "encoding/json" "errors" "fmt" - "k8s.io/klog/v2" "time" + "k8s.io/klog/v2" + machinev1 "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" "github.com/gardener/machine-controller-manager/pkg/fakeclient" "github.com/gardener/machine-controller-manager/pkg/util/nodeops" @@ -72,7 +73,7 @@ var _ = Describe("machine_util", func() { defer trackers.Stop() waitForCacheSync(stop, c) - _, err := c.syncNodeTemplates(context.TODO(), machineObject, machineClass) + err := c.syncNodeTemplates(context.TODO(), machineObject, machineClass) waitForCacheSync(stop, c) @@ -2396,9 +2397,9 @@ var _ = Describe("machine_util", func() { Expect(targetMachine).ToNot(BeNil()) - retryPeriod, err := c.reconcileMachineHealth(context.TODO(), targetMachine) + err := c.reconcileMachineHealth(context.TODO(), targetMachine) - Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) + // Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) if data.expect.err == nil { Expect(err).To(BeNil()) @@ -2857,13 +2858,13 @@ var _ = Describe("machine_util", func() { c.permitGiver.TryPermit(machineDeploy1, 1*time.Second) } - retryPeriod, err := c.reconcileMachineHealth(context.TODO(), targetMachine) + err := c.reconcileMachineHealth(context.TODO(), targetMachine) if data.setup.lockAlreadyAcquired { c.permitGiver.DeletePermits(machineDeploy1) } - Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) + // Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) if data.expect.err == nil { Expect(err).To(BeNil()) @@ -3165,9 +3166,9 @@ var _ = Describe("machine_util", func() { waitForCacheSync(stop, c) - retryPeriod, err := c.updateNodeConditionBasedOnLabel(context.TODO(), targetMachine) + err := c.updateNodeConditionBasedOnLabel(context.TODO(), targetMachine) - Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) + // Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) if data.expect.err == nil { Expect(err).To(BeNil()) } else { @@ -3577,9 +3578,9 @@ var _ = Describe("machine_util", func() { waitForCacheSync(stop, c) - retryPeriod, err := c.inPlaceUpdate(context.TODO(), data.setup.machine) + err := c.inPlaceUpdate(context.TODO(), data.setup.machine) - Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) + // Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) if data.expect.err == nil { Expect(err).To(BeNil()) } else { @@ -3786,9 +3787,9 @@ var _ = Describe("machine_util", func() { waitForCacheSync(stop, c) - retryPeriod, err := c.drainNodeForInPlace(context.TODO(), data.setup.machine) + err := c.drainNodeForInPlace(context.TODO(), data.setup.machine) - Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) + // Expect(retryPeriod).To(Equal(data.expect.retryPeriod)) if data.expect.err == nil { Expect(err).To(BeNil()) } else { diff --git a/pkg/util/provider/machinecontroller/machineclass.go b/pkg/util/provider/machinecontroller/machineclass.go index 7272670ee..0c28f2cc8 100644 --- a/pkg/util/provider/machinecontroller/machineclass.go +++ b/pkg/util/provider/machinecontroller/machineclass.go @@ -85,7 +85,7 @@ func (c *controller) reconcileClusterMachineClassKey(key string) error { err = c.reconcileClusterMachineClass(ctx, class) if err != nil { // Re-enqueue after a ShortRetry window - c.enqueueMachineClassAfter(class, time.Duration(machineutils.ShortRetry)) + c.enqueueMachineClassAfterRateLimiting(class) } else { // Re-enqueue periodically to avoid missing of events // TODO: Get ride of this logic @@ -201,3 +201,11 @@ func (c *controller) enqueueMachineClassAfter(obj any, after time.Duration) { } c.machineClassQueue.AddAfter(key, after) } + +func (c *controller) enqueueMachineClassAfterRateLimiting(obj any) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return + } + c.machineClassQueue.AddRateLimited(key) +} From eed5a475a9019e150e388b1aeef9a268f77b1312 Mon Sep 17 00:00:00 2001 From: r4mek Date: Thu, 6 Nov 2025 15:35:29 +0530 Subject: [PATCH 2/2] rebase fixes --- pkg/util/provider/machinecontroller/machine.go | 4 ++-- pkg/util/provider/machinecontroller/machine_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/util/provider/machinecontroller/machine.go b/pkg/util/provider/machinecontroller/machine.go index ae5e209d5..a942ff601 100644 --- a/pkg/util/provider/machinecontroller/machine.go +++ b/pkg/util/provider/machinecontroller/machine.go @@ -171,7 +171,7 @@ func (c *controller) reconcileClusterMachineKey(key string) error { } // Add finalizers if not present on machine object - _, err = c.addMachineFinalizers(ctx, machine) + err = c.addMachineFinalizers(ctx, machine) if err != nil { return err } @@ -805,7 +805,7 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque switch { case c.isCreationProcessing(machine): err := fmt.Errorf("machine %q is in creation flow. Deletion cannot proceed", machine.Name) - return machineutils.MediumRetry, err + return err case !finalizers.Has(MCMFinalizerName): // If Finalizers are not present on machine diff --git a/pkg/util/provider/machinecontroller/machine_test.go b/pkg/util/provider/machinecontroller/machine_test.go index 210bd0728..be6b83375 100644 --- a/pkg/util/provider/machinecontroller/machine_test.go +++ b/pkg/util/provider/machinecontroller/machine_test.go @@ -3636,7 +3636,7 @@ var _ = Describe("machine", func() { fakeDriver *driver.FakeDriver // These fields are used to change the test scenario (add to pending map, call creation/deletion flow) isCreation bool - testFunc func(setup, machineActionRequest) (machineutils.RetryPeriod, error) + testFunc func(setup, machineActionRequest) error } type expect struct { machine *v1alpha1.Machine @@ -3757,7 +3757,7 @@ var _ = Describe("machine", func() { // ****************************************************** // This is changing the setup in accordance with the test - retry, err := data.action.testFunc(data.setup, machineActionRequest{ + err = data.action.testFunc(data.setup, machineActionRequest{ machine: machine, machineClass: machineClass, secret: secret, @@ -3767,7 +3767,7 @@ var _ = Describe("machine", func() { if err != nil || data.expect.err != nil { Expect(err).To(Equal(data.expect.err)) } - Expect(retry).To(Equal(data.expect.retry)) + // Expect(retry).To(Equal(data.expect.retry)) if data.action.isCreation { _, found := data.setup.controller.pendingMachineCreationMap.Load(data.expect.machine.Name) @@ -3828,7 +3828,7 @@ var _ = Describe("machine", func() { Err: nil, }, isCreation: true, - testFunc: func(setUp setup, req machineActionRequest) (machineutils.RetryPeriod, error) { + testFunc: func(setUp setup, req machineActionRequest) error { return setUp.controller.triggerCreationFlow(context.TODO(), &driver.CreateMachineRequest{ Machine: req.machine, MachineClass: req.machineClass, @@ -3862,7 +3862,7 @@ var _ = Describe("machine", func() { Err: nil, }, isCreation: false, - testFunc: func(setUp setup, req machineActionRequest) (machineutils.RetryPeriod, error) { + testFunc: func(setUp setup, req machineActionRequest) error { setUp.controller.pendingMachineCreationMap.Store("test/machine-0", "") return setUp.controller.triggerDeletionFlow(context.TODO(), &driver.DeleteMachineRequest{ Machine: req.machine, @@ -3920,7 +3920,7 @@ var _ = Describe("machine", func() { Err: nil, }, isCreation: false, - testFunc: func(setUp setup, req machineActionRequest) (machineutils.RetryPeriod, error) { + testFunc: func(setUp setup, req machineActionRequest) error { setUp.controller.pendingMachineCreationMap.Store("machine-xyz", "") return setUp.controller.triggerDeletionFlow(context.TODO(), &driver.DeleteMachineRequest{ Machine: req.machine,