Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ type ControlPlane struct {
machinesPatchHelpers map[string]*patch.Helper

// MachinesNotUpToDate is the source of truth for Machines that are not up-to-date.
// It should be used to check if a Machine is up-to-date (not machinesNotUpToDateResults).
// It should be used to check if a Machine is up-to-date (not machinesUpToDateResults).
MachinesNotUpToDate collections.Machines
// machinesNotUpToDateResults is used to store the result of the UpToDate call for all Machines
// machinesUpToDateResults is used to store the result of the UpToDate call for all Machines
// (even for Machines that are up-to-date).
// MachinesNotUpToDate should always be used instead to check if a Machine is up-to-date.
machinesNotUpToDateResults map[string]NotUpToDateResult
machinesUpToDateResults map[string]UpToDateResult

// reconciliationTime is the time of the current reconciliation, and should be used for all "now" calculations
reconciliationTime metav1.Time
Expand Down Expand Up @@ -122,9 +122,9 @@ func NewControlPlane(ctx context.Context, managementCluster ManagementCluster, c
// Select machines that should be rolled out because of an outdated configuration or because rolloutAfter/Before expired.
reconciliationTime := metav1.Now()
machinesNotUptoDate := make(collections.Machines, len(ownedMachines))
machinesNotUpToDateResults := map[string]NotUpToDateResult{}
machinesUpToDateResults := map[string]UpToDateResult{}
for _, m := range ownedMachines {
upToDate, notUpToDateResult, err := UpToDate(ctx, client, cluster, m, kcp, &reconciliationTime, infraMachines, kubeadmConfigs)
upToDate, upToDateResult, err := UpToDate(ctx, client, cluster, m, kcp, &reconciliationTime, infraMachines, kubeadmConfigs)
if err != nil {
return nil, err
}
Expand All @@ -133,20 +133,20 @@ func NewControlPlane(ctx context.Context, managementCluster ManagementCluster, c
}
// Set this even if machine is UpToDate. This is needed to complete triggering in-place updates
// MachinesNotUpToDate should always be used instead to check if a Machine is up-to-date.
machinesNotUpToDateResults[m.Name] = *notUpToDateResult
machinesUpToDateResults[m.Name] = *upToDateResult
}

return &ControlPlane{
KCP: kcp,
Cluster: cluster,
Machines: ownedMachines,
machinesPatchHelpers: patchHelpers,
MachinesNotUpToDate: machinesNotUptoDate,
machinesNotUpToDateResults: machinesNotUpToDateResults,
KubeadmConfigs: kubeadmConfigs,
InfraResources: infraMachines,
reconciliationTime: reconciliationTime,
managementCluster: managementCluster,
KCP: kcp,
Cluster: cluster,
Machines: ownedMachines,
machinesPatchHelpers: patchHelpers,
MachinesNotUpToDate: machinesNotUptoDate,
machinesUpToDateResults: machinesUpToDateResults,
KubeadmConfigs: kubeadmConfigs,
InfraResources: infraMachines,
reconciliationTime: reconciliationTime,
managementCluster: managementCluster,
}, nil
}

Expand Down Expand Up @@ -240,15 +240,15 @@ func (c *ControlPlane) GetKubeadmConfig(machineName string) (*bootstrapv1.Kubead
}

// MachinesNeedingRollout return a list of machines that need to be rolled out.
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]NotUpToDateResult) {
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]UpToDateResult) {
// Note: Machines already deleted are dropped because they will be replaced by new machines after deletion completes.
return c.MachinesNotUpToDate.Filter(collections.Not(collections.HasDeletionTimestamp)), c.machinesNotUpToDateResults
return c.MachinesNotUpToDate.Filter(collections.Not(collections.HasDeletionTimestamp)), c.machinesUpToDateResults
}

// NotUpToDateMachines return a list of machines that are not up to date with the control
// plane's configuration.
func (c *ControlPlane) NotUpToDateMachines() (collections.Machines, map[string]NotUpToDateResult) {
return c.MachinesNotUpToDate, c.machinesNotUpToDateResults
func (c *ControlPlane) NotUpToDateMachines() (collections.Machines, map[string]UpToDateResult) {
return c.MachinesNotUpToDate, c.machinesUpToDateResults
}

// UpToDateMachines returns the machines that are up to date with the control
Expand Down
20 changes: 10 additions & 10 deletions controlplane/kubeadm/internal/control_plane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,19 @@ func TestControlPlane(t *testing.T) {

g.Expect(controlPlane.Machines).To(HaveLen(5))

machinesNotUptoDate, machinesNotUpToDateResults := controlPlane.NotUpToDateMachines()
machinesNotUptoDate, machinesUpToDateResults := controlPlane.NotUpToDateMachines()
g.Expect(machinesNotUptoDate.Names()).To(ConsistOf("m2", "m3"))
// machinesNotUpToDateResults contains results for all Machines (including up-to-date Machines).
g.Expect(machinesNotUpToDateResults).To(HaveLen(5))
g.Expect(machinesNotUpToDateResults["m2"].ConditionMessages).To(Equal([]string{"Version v1.29.0, v1.31.0 required"}))
g.Expect(machinesNotUpToDateResults["m3"].ConditionMessages).To(Equal([]string{"Version v1.29.3, v1.31.0 required"}))
// machinesUpToDateResults contains results for all Machines (including up-to-date Machines).
g.Expect(machinesUpToDateResults).To(HaveLen(5))
g.Expect(machinesUpToDateResults["m2"].ConditionMessages).To(Equal([]string{"Version v1.29.0, v1.31.0 required"}))
g.Expect(machinesUpToDateResults["m3"].ConditionMessages).To(Equal([]string{"Version v1.29.3, v1.31.0 required"}))

machinesNeedingRollout, machinesNotUpToDateResults := controlPlane.MachinesNeedingRollout()
machinesNeedingRollout, machinesUpToDateResults := controlPlane.MachinesNeedingRollout()
g.Expect(machinesNeedingRollout.Names()).To(ConsistOf("m2"))
// machinesNotUpToDateResults contains results for all Machines (including up-to-date Machines).
g.Expect(machinesNotUpToDateResults).To(HaveLen(5))
g.Expect(machinesNotUpToDateResults["m2"].LogMessages).To(Equal([]string{"Machine version \"v1.29.0\" is not equal to KCP version \"v1.31.0\""}))
g.Expect(machinesNotUpToDateResults["m3"].LogMessages).To(Equal([]string{"Machine version \"v1.29.3\" is not equal to KCP version \"v1.31.0\""}))
// machinesUpToDateResults contains results for all Machines (including up-to-date Machines).
g.Expect(machinesUpToDateResults).To(HaveLen(5))
g.Expect(machinesUpToDateResults["m2"].LogMessages).To(Equal([]string{"Machine version \"v1.29.0\" is not equal to KCP version \"v1.31.0\""}))
g.Expect(machinesUpToDateResults["m3"].LogMessages).To(Equal([]string{"Machine version \"v1.29.3\" is not equal to KCP version \"v1.31.0\""}))

upToDateMachines := controlPlane.UpToDateMachines()
g.Expect(upToDateMachines).To(HaveLen(3))
Expand Down
26 changes: 18 additions & 8 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type KubeadmControlPlaneReconciler struct {
managementCluster internal.ManagementCluster
managementClusterUncached internal.ManagementCluster
ssaCache ssa.Cache

// Only used for testing
overrideTryInPlaceUpdateFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToInPlaceUpdate *clusterv1.Machine, machineUpToDateResult internal.UpToDateResult) (bool, ctrl.Result, error)
overrideScaleUpControlPlaneFunc func(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error)
overrideScaleDownControlPlaneFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToDelete *clusterv1.Machine) (ctrl.Result, error)
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down Expand Up @@ -467,16 +472,16 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
machinesNeedingRollout, machinesNeedingRolloutResults := controlPlane.MachinesNeedingRollout()
machinesNeedingRollout, machinesUpToDateResults := controlPlane.MachinesNeedingRollout()
switch {
case len(machinesNeedingRollout) > 0:
var allMessages []string
for machine, machinesNeedingRolloutResult := range machinesNeedingRolloutResults {
allMessages = append(allMessages, fmt.Sprintf("Machine %s needs rollout: %s", machine, strings.Join(machinesNeedingRolloutResult.LogMessages, ",")))
for machine, machineUpToDateResult := range machinesUpToDateResults {
allMessages = append(allMessages, fmt.Sprintf("Machine %s needs rollout: %s", machine, strings.Join(machineUpToDateResult.LogMessages, ",")))
}
log.Info(fmt.Sprintf("Rolling out Control Plane machines: %s", strings.Join(allMessages, ",")), "machinesNeedingRollout", machinesNeedingRollout.Names())
v1beta1conditions.MarkFalse(controlPlane.KCP, controlplanev1.MachinesSpecUpToDateV1Beta1Condition, controlplanev1.RollingUpdateInProgressV1Beta1Reason, clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(machinesNeedingRollout), len(controlPlane.Machines)-len(machinesNeedingRollout))
return r.upgradeControlPlane(ctx, controlPlane, machinesNeedingRollout)
return r.updateControlPlane(ctx, controlPlane, machinesNeedingRollout, machinesUpToDateResults)
default:
// make sure last upgrade operation is marked as completed.
// NOTE: we are checking the condition already exists in order to avoid to set this condition at the first
Expand Down Expand Up @@ -506,7 +511,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
case numMachines > desiredReplicas:
log.Info("Scaling down control plane", "desired", desiredReplicas, "existing", numMachines)
// The last parameter (i.e. machines needing to be rolled out) should always be empty here.
return r.scaleDownControlPlane(ctx, controlPlane, collections.Machines{})
// Pick the Machine that we should scale down.
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, collections.Machines{})
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
}
return r.scaleDownControlPlane(ctx, controlPlane, machineToDelete)
}

// Get the workload cluster client.
Expand Down Expand Up @@ -975,16 +985,16 @@ func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneAndMachinesConditio
}

func reconcileMachineUpToDateCondition(_ context.Context, controlPlane *internal.ControlPlane) {
machinesNotUptoDate, machinesNotUpToDateResults := controlPlane.NotUpToDateMachines()
machinesNotUptoDate, machinesUpToDateResults := controlPlane.NotUpToDateMachines()
machinesNotUptoDateNames := sets.New(machinesNotUptoDate.Names()...)

for _, machine := range controlPlane.Machines {
if machinesNotUptoDateNames.Has(machine.Name) {
// Note: the code computing the message for KCP's RolloutOut condition is making assumptions on the format/content of this message.
message := ""
if machinesNotUpToDateResult, ok := machinesNotUpToDateResults[machine.Name]; ok && len(machinesNotUpToDateResult.ConditionMessages) > 0 {
if machineUpToDateResult, ok := machinesUpToDateResults[machine.Name]; ok && len(machineUpToDateResult.ConditionMessages) > 0 {
var reasons []string
for _, conditionMessage := range machinesNotUpToDateResult.ConditionMessages {
for _, conditionMessage := range machineUpToDateResult.ConditionMessages {
reasons = append(reasons, fmt.Sprintf("* %s", conditionMessage))
}
message = strings.Join(reasons, "\n")
Expand Down
40 changes: 40 additions & 0 deletions controlplane/kubeadm/internal/controllers/inplace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"

ctrl "sigs.k8s.io/controller-runtime"

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
)

func (r *KubeadmControlPlaneReconciler) tryInPlaceUpdate(
ctx context.Context,
controlPlane *internal.ControlPlane,
machineToInPlaceUpdate *clusterv1.Machine,
machineUpToDateResult internal.UpToDateResult,
) (fallbackToScaleDown bool, _ ctrl.Result, _ error) {
if r.overrideTryInPlaceUpdateFunc != nil {
return r.overrideTryInPlaceUpdateFunc(ctx, controlPlane, machineToInPlaceUpdate, machineUpToDateResult)
}

// Always fallback to scale down until in-place is implemented.
return true, ctrl.Result{}, nil
}
29 changes: 17 additions & 12 deletions controlplane/kubeadm/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
if r.overrideScaleUpControlPlaneFunc != nil {
return r.overrideScaleUpControlPlaneFunc(ctx, controlPlane)
}

log := ctrl.LoggerFrom(ctx)

// Run preflight checks to ensure that the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
Expand Down Expand Up @@ -95,16 +99,14 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,
func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
ctx context.Context,
controlPlane *internal.ControlPlane,
outdatedMachines collections.Machines,
machineToDelete *clusterv1.Machine,
) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Pick the Machine that we should scale down.
machineToDelete, err := selectMachineForScaleDown(ctx, controlPlane, outdatedMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
if r.overrideScaleDownControlPlaneFunc != nil {
return r.overrideScaleDownControlPlaneFunc(ctx, controlPlane, machineToDelete)
}

log := ctrl.LoggerFrom(ctx)

// Run preflight checks ensuring the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
// Given that we're scaling down, we can exclude the machineToDelete from the preflight checks.
if result, err := r.preflightChecks(ctx, controlPlane, machineToDelete); err != nil || !result.IsZero() {
Expand Down Expand Up @@ -265,7 +267,8 @@ func preflightCheckCondition(kind string, obj *clusterv1.Machine, conditionType
return nil
}

// selectMachineForScaleDown select a machine candidate for scaling down. The selection is a two phase process:
// selectMachineForInPlaceUpdateOrScaleDown select a machine candidate for scaling down or for in-place update.
// The selection is a two phase process:
//
// In the first phase it selects a subset of machines eligible for deletion:
// - if there are outdated machines with the delete machine annotation, use them as eligible subset (priority to user requests, part 1)
Expand All @@ -276,18 +279,20 @@ func preflightCheckCondition(kind string, obj *clusterv1.Machine, conditionType
//
// Once the subset of machines eligible for deletion is identified, one machine is picked out of this subset by
// selecting the machine in the failure domain with most machines (including both eligible and not eligible machines).
func selectMachineForScaleDown(ctx context.Context, controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
func selectMachineForInPlaceUpdateOrScaleDown(ctx context.Context, controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
// Select the subset of machines eligible for scale down.
eligibleMachines := controlPlane.Machines
var eligibleMachines collections.Machines
switch {
case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0:
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(outdatedMachines)
case controlPlane.MachineWithDeleteAnnotation(eligibleMachines).Len() > 0:
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(eligibleMachines)
case controlPlane.MachineWithDeleteAnnotation(controlPlane.Machines).Len() > 0:
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(controlPlane.Machines)
case controlPlane.UnhealthyMachinesWithUnhealthyControlPlaneComponents(outdatedMachines).Len() > 0:
eligibleMachines = controlPlane.UnhealthyMachinesWithUnhealthyControlPlaneComponents(outdatedMachines)
case outdatedMachines.Len() > 0:
eligibleMachines = outdatedMachines
default:
eligibleMachines = controlPlane.Machines
}

// Pick an eligible machine from the failure domain with most machines in (including both eligible and not eligible machines)
Expand Down
16 changes: 11 additions & 5 deletions controlplane/kubeadm/internal/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
}
controlPlane.InjectTestManagementCluster(r.managementCluster)

result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
g.Expect(err).ToNot(HaveOccurred())
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(result).To(BeComparableTo(ctrl.Result{Requeue: true}))

Expand Down Expand Up @@ -326,7 +328,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
}
controlPlane.InjectTestManagementCluster(r.managementCluster)

result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
g.Expect(err).ToNot(HaveOccurred())
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(result).To(BeComparableTo(ctrl.Result{Requeue: true}))

Expand Down Expand Up @@ -364,7 +368,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
}
controlPlane.InjectTestManagementCluster(r.managementCluster)

result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
g.Expect(err).ToNot(HaveOccurred())
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(result).To(BeComparableTo(ctrl.Result{RequeueAfter: preflightFailedRequeueAfter}))

Expand All @@ -374,7 +380,7 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
})
}

func TestSelectMachineForScaleDown(t *testing.T) {
func TestSelectMachineForInPlaceUpdateOrScaleDown(t *testing.T) {
kcp := controlplanev1.KubeadmControlPlane{
Spec: controlplanev1.KubeadmControlPlaneSpec{},
}
Expand Down Expand Up @@ -503,7 +509,7 @@ func TestSelectMachineForScaleDown(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

selectedMachine, err := selectMachineForScaleDown(ctx, tc.cp, tc.outDatedMachines)
selectedMachine, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, tc.cp, tc.outDatedMachines)

if tc.expectErr {
g.Expect(err).To(HaveOccurred())
Expand Down
Loading