Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/util/provider/machinecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/gardener/machine-controller-manager/pkg/util/provider/driver"
"github.com/gardener/machine-controller-manager/pkg/util/provider/options"
"github.com/gardener/machine-controller-manager/pkg/util/worker"
"golang.org/x/time/rate"

machineinternal "github.com/gardener/machine-controller-manager/pkg/apis/machine"
machinev1alpha1 "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
Expand Down Expand Up @@ -361,3 +362,14 @@ func (dc *controller) Run(workers int, stopCh <-chan struct{}) {

waitGroup.Wait()
}

// CustomTypedControllerRateLimiter is a constructor for a custom rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential.
// It is more configurable than DefaultTypedControllerRateLimiter and takes in machineHealthTimeout to be used as limit for item rate-limiter
func CustomTypedControllerRateLimiter[T comparable](maxDelay time.Duration) workqueue.TypedRateLimiter[T] {
return workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, maxDelay),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
133 changes: 73 additions & 60 deletions pkg/util/provider/machinecontroller/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func (c *controller) enqueueMachineAfter(obj any, after time.Duration, reason st
}
}

func (c *controller) enqueueMachineAfterRateLimiting(obj any, reason string) {
if key, ok := c.getKeyForObj(obj); ok {
klog.V(3).Infof("Adding machine object to queue %q after rate-limiting, reason: %s", key, reason)
c.machineQueue.AddRateLimited(key)
}
}

func (c *controller) enqueueMachineTermination(machine *v1alpha1.Machine, reason string) {

if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil {
Expand All @@ -135,6 +142,16 @@ func (c *controller) enqueueMachineTerminationAfter(machine *v1alpha1.Machine, a
}
}

func (c *controller) enqueueMachineTerminationAfterRateLimiting(machine *v1alpha1.Machine, reason string) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", machine, err))
return
} else {
klog.V(3).Infof("Adding machine object to termination queue %q after rate-limiting, reason: %s", key, reason)
c.machineTerminationQueue.AddRateLimited(key)
}
}

func (c *controller) reconcileClusterMachineKey(key string) error {
ctx := context.Background()

Expand All @@ -154,7 +171,7 @@ func (c *controller) reconcileClusterMachineKey(key string) error {
}

// Add finalizers if not present on machine object
_, err = c.addMachineFinalizers(ctx, machine)
err = c.addMachineFinalizers(ctx, machine)
if err != nil {
return err
}
Expand All @@ -165,19 +182,17 @@ func (c *controller) reconcileClusterMachineKey(key string) error {
return nil
}

retryPeriod, err := c.reconcileClusterMachine(ctx, machine)

var reEnqueReason = "periodic reconcile"
if err != nil {
reEnqueReason = err.Error()
err = c.reconcileClusterMachine(ctx, machine)
if err == nil {
c.enqueueMachineAfter(machine, time.Duration(machineutils.LongRetry), "periodic reconcile")
} else {
c.enqueueMachineAfterRateLimiting(machine, err.Error())
}

c.enqueueMachineAfter(machine, time.Duration(retryPeriod), reEnqueReason)

return nil
}

func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) {
func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alpha1.Machine) error {
klog.V(2).Infof("reconcileClusterMachine: Start for %q with phase:%q, description:%q", machine.Name, machine.Status.CurrentStatus.Phase, machine.Status.LastOperation.Description)
defer klog.V(2).Infof("reconcileClusterMachine: Stop for %q", machine.Name)

Expand All @@ -186,54 +201,54 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp
// machine is not set for termination don't process it
err := fmt.Errorf("Machine controller has frozen. Retrying reconcile after resync period")
klog.Error(err)
return machineutils.LongRetry, err
return err
}

internalMachine := &machineapi.Machine{}
if err := c.internalExternalScheme.Convert(machine, internalMachine, nil); err != nil {
klog.Error(err)
return machineutils.LongRetry, err
return err
}

validationerr := validation.ValidateMachine(internalMachine)
if validationerr.ToAggregate() != nil && len(validationerr.ToAggregate().Errors()) > 0 {
err := fmt.Errorf("validation of Machine failed %s", validationerr.ToAggregate().Error())
klog.Error(err)
return machineutils.LongRetry, err
return err
}

// Validate MachineClass
machineClass, secretData, retry, err := c.ValidateMachineClass(ctx, &machine.Spec.Class)
machineClass, secretData, err := c.ValidateMachineClass(ctx, &machine.Spec.Class)
if err != nil {
klog.Errorf("cannot reconcile machine %s: %s", machine.Name, err)
return retry, err
return err
}

if machine.Labels[v1alpha1.NodeLabelKey] != "" && machine.Status.CurrentStatus.Phase != "" {
// If reference to node object exists execute the below
retry, err := c.reconcileMachineHealth(ctx, machine)
err := c.reconcileMachineHealth(ctx, machine)
if err != nil {
return retry, err
return err
}

retry, err = c.syncMachineNameToNode(ctx, machine)
err = c.syncMachineNameToNode(ctx, machine)
if err != nil {
return retry, err
return err
}

retry, err = c.syncNodeTemplates(ctx, machine, machineClass)
err = c.syncNodeTemplates(ctx, machine, machineClass)
if err != nil {
return retry, err
return err
}

retry, err = c.updateNodeConditionBasedOnLabel(ctx, machine)
err = c.updateNodeConditionBasedOnLabel(ctx, machine)
if err != nil {
return retry, err
return err
}

retry, err = c.inPlaceUpdate(ctx, machine)
err = c.inPlaceUpdate(ctx, machine)
if err != nil {
return retry, err
return err
}
}

Expand All @@ -248,7 +263,7 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp
)
}

return machineutils.LongRetry, nil
return nil
}

func (c *controller) reconcileClusterMachineTermination(key string) error {
Expand All @@ -273,15 +288,15 @@ func (c *controller) reconcileClusterMachineTermination(key string) error {
machine.Name, machine.Status.CurrentStatus.Phase, machine.Status.LastOperation.Description)
defer klog.V(2).Infof("reconcileClusterMachineTermination: Stop for %q", machine.Name)

machineClass, secretData, retry, err := c.ValidateMachineClass(ctx, &machine.Spec.Class)
machineClass, secretData, err := c.ValidateMachineClass(ctx, &machine.Spec.Class)
if err != nil {
klog.Errorf("cannot reconcile machine %q: %s", machine.Name, err)
c.enqueueMachineTerminationAfter(machine, time.Duration(retry), err.Error())
c.enqueueMachineTerminationAfterRateLimiting(machine, err.Error())
return err
}

// Process a delete event
retryPeriod, err := c.triggerDeletionFlow(
err = c.triggerDeletionFlow(
ctx,
&driver.DeleteMachineRequest{
Machine: machine,
Expand All @@ -291,16 +306,15 @@ func (c *controller) reconcileClusterMachineTermination(key string) error {
)

if err != nil {
c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), err.Error())
return err
c.enqueueMachineTerminationAfterRateLimiting(machine, err.Error())
} else {
// If the informer loses connection to the API server it may need to resync.
// If a resource is deleted while the watch is down, the informer won’t get
// delete event because the object is already gone. To avoid this edge-case,
// a requeue is scheduled post machine deletion as well.
c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), "post-deletion reconcile")
return nil
c.enqueueMachineTerminationAfter(machine, time.Duration(machineutils.LongRetry), "post-deletion reconcile")
}
return err
}

/*
Expand Down Expand Up @@ -484,7 +498,7 @@ func addedOrRemovedEssentialTaints(oldNode, node *corev1.Node, taintKeys []strin
Machine operations - Create, Delete
*/

func (c *controller) triggerCreationFlow(ctx context.Context, createMachineRequest *driver.CreateMachineRequest) (machineutils.RetryPeriod, error) {
func (c *controller) triggerCreationFlow(ctx context.Context, createMachineRequest *driver.CreateMachineRequest) error {
var (
// Declarations
nodeName, providerID string
Expand All @@ -508,10 +522,10 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
// we should avoid mutating Secret, since it goes all the way into the Informer's store
secretCopy := createMachineRequest.Secret.DeepCopy()
if err := c.addBootstrapTokenToUserData(ctx, machine, secretCopy); err != nil {
return machineutils.ShortRetry, err
return err
}
if err := c.addMachineNameToUserData(machine, secretCopy); err != nil {
return machineutils.ShortRetry, err
return err
}
createMachineRequest.Secret = secretCopy

Expand All @@ -530,7 +544,7 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
if !ok {
// Error occurred with decoding machine error status, abort with retry.
klog.Errorf("Error occurred while decoding machine error for machine %q: %s", machine.Name, err)
return machineutils.MediumRetry, err
return err
}
klog.Warningf("For machine %q, obtained VM error status as: %s", machineName, machineErr)
// Decoding machine error code
Expand Down Expand Up @@ -586,7 +600,7 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
}

// machine obj marked Failed for double security
updateRetryPeriod, updateErr := c.machineStatusUpdate(
updateErr := c.machineStatusUpdate(
ctx,
machine,
v1alpha1.LastOperation{
Expand All @@ -603,11 +617,11 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
)

if updateErr != nil {
return updateRetryPeriod, updateErr
return updateErr
}

klog.V(2).Infof("Machine %q marked Failed as VM was referring to a stale node object", machine.Name)
return machineutils.ShortRetry, err
return err
}
}
uninitializedMachine = true
Expand Down Expand Up @@ -644,11 +658,10 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
}
//initialize VM if not initialized
if uninitializedMachine {
var retryPeriod machineutils.RetryPeriod
var initResponse *driver.InitializeMachineResponse
initResponse, retryPeriod, err = c.initializeMachine(ctx, clone, createMachineRequest.MachineClass, createMachineRequest.Secret)
initResponse, err = c.initializeMachine(ctx, clone, createMachineRequest.MachineClass, createMachineRequest.Secret)
if err != nil {
return retryPeriod, err
return err
}

if c.targetCoreClient == nil {
Expand All @@ -659,16 +672,16 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
}
clone.Status.Addresses = buildAddressStatus(addresses, nodeName)
if _, err := c.controlMachineClient.Machines(clone.Namespace).UpdateStatus(ctx, clone, metav1.UpdateOptions{}); err != nil {
return machineutils.ShortRetry, fmt.Errorf("failed to persist status addresses after initialization was successful: %w", err)
return fmt.Errorf("failed to persist status addresses after initialization was successful: %w", err)
}
}

// Return error even when machine object is updated
err = fmt.Errorf("machine creation in process. Machine initialization (if required) is successful")
return machineutils.ShortRetry, err
return err
}
if err != nil {
return machineutils.ShortRetry, err
return err
}

if machine.Status.CurrentStatus.Phase == "" || machine.Status.CurrentStatus.Phase == v1alpha1.MachineCrashLoopBackOff {
Expand Down Expand Up @@ -703,9 +716,9 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
// Return error even when machine object is updated
err = fmt.Errorf("machine creation in process. Machine/Status UPDATE successful")
}
return machineutils.ShortRetry, err
return err
}
return machineutils.LongRetry, nil
return nil
}

func (c *controller) updateLabels(ctx context.Context, machine *v1alpha1.Machine, nodeName, providerID string) (clone *v1alpha1.Machine, err error) {
Expand Down Expand Up @@ -739,7 +752,7 @@ func (c *controller) updateLabels(ctx context.Context, machine *v1alpha1.Machine
return clone, err
}

func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass, secret *corev1.Secret) (resp *driver.InitializeMachineResponse, retry machineutils.RetryPeriod, err error) {
func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Machine, machineClass *v1alpha1.MachineClass, secret *corev1.Secret) (resp *driver.InitializeMachineResponse, err error) {
req := &driver.InitializeMachineRequest{
Machine: machine,
MachineClass: machineClass,
Expand All @@ -751,14 +764,14 @@ func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Ma
errStatus, ok := status.FromError(err)
if !ok {
klog.Errorf("Cannot decode Driver error for machine %q: %s. Unexpected behaviour as Driver errors are expected to be of type status.Status", machine.Name, err)
return nil, machineutils.LongRetry, err
return nil, err
}
if errStatus.Code() == codes.Unimplemented {
klog.V(3).Infof("Provider does not support Driver.InitializeMachine - skipping VM instance initialization for %q.", machine.Name)
return nil, 0, nil
return nil, nil
}
klog.Errorf("Error occurred while initializing VM instance for machine %q: %s", machine.Name, err)
updateRetryPeriod, updateErr := c.machineStatusUpdate(
updateErr := c.machineStatusUpdate(
ctx,
machine,
v1alpha1.LastOperation{
Expand All @@ -775,15 +788,15 @@ func (c *controller) initializeMachine(ctx context.Context, machine *v1alpha1.Ma
machine.Status.LastKnownState,
)
if updateErr != nil {
return nil, updateRetryPeriod, updateErr
return nil, updateErr
}
return nil, machineutils.ShortRetry, err
return nil, err
}
klog.V(3).Infof("VM instance %q for machine %q was initialized", resp.ProviderID, machine.Name)
return resp, 0, nil
return resp, nil
}

func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) (machineutils.RetryPeriod, error) {
func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineRequest *driver.DeleteMachineRequest) error {
var (
machine = deleteMachineRequest.Machine
finalizers = sets.NewString(machine.Finalizers...)
Expand All @@ -792,12 +805,12 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque
switch {
case c.isCreationProcessing(machine):
err := fmt.Errorf("machine %q is in creation flow. Deletion cannot proceed", machine.Name)
return machineutils.MediumRetry, err
return err

case !finalizers.Has(MCMFinalizerName):
// If Finalizers are not present on machine
err := fmt.Errorf("Machine %q is missing finalizers. Deletion cannot proceed", machine.Name)
return machineutils.LongRetry, err
return err

case machine.Status.CurrentStatus.Phase != v1alpha1.MachineTerminating:
return c.setMachineTerminationStatus(ctx, deleteMachineRequest)
Expand All @@ -824,11 +837,11 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque
return c.deleteNodeObject(ctx, machine)

case strings.Contains(machine.Status.LastOperation.Description, machineutils.InitiateFinalizerRemoval):
_, err := c.deleteMachineFinalizers(ctx, machine)
err := c.deleteMachineFinalizers(ctx, machine)
if err != nil {
// Keep retrying until update goes through
klog.Errorf("Machine finalizer REMOVAL failed for machine %q. Retrying, error: %s", machine.Name, err)
return machineutils.ShortRetry, err
return err
}

default:
Expand All @@ -848,7 +861,7 @@ func (c *controller) triggerDeletionFlow(ctx context.Context, deleteMachineReque
*/

klog.V(2).Infof("Machine %q with providerID %q and nodeName %q deleted successfully", machine.Name, getProviderID(machine), getNodeName(machine))
return machineutils.LongRetry, nil
return nil
}

// buildAddressStatus adds the nodeName as a HostName address, if it is not empty, and returns a sorted and deduplicated
Expand Down
Loading
Loading