Skip to content

Commit 0f0f15e

Browse files
committed
KCP: Extend rollout logic for in-place updates
Signed-off-by: Stefan Büringer [email protected]
1 parent 3fe6f96 commit 0f0f15e

File tree

8 files changed

+415
-37
lines changed

8 files changed

+415
-37
lines changed

controlplane/kubeadm/internal/controllers/controller.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ type KubeadmControlPlaneReconciler struct {
9595
managementCluster internal.ManagementCluster
9696
managementClusterUncached internal.ManagementCluster
9797
ssaCache ssa.Cache
98+
99+
// Only used for testing
100+
overrideTryInPlaceUpdateFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToInPlaceUpdate *clusterv1.Machine, machinesNeedingRolloutResult internal.NotUpToDateResult) (bool, ctrl.Result, error)
101+
overrideScaleUpControlPlaneFunc func(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error)
102+
overrideScaleDownControlPlaneFunc func(ctx context.Context, controlPlane *internal.ControlPlane, machineToDelete *clusterv1.Machine) (ctrl.Result, error)
98103
}
99104

100105
func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
@@ -476,7 +481,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
476481
}
477482
log.Info(fmt.Sprintf("Rolling out Control Plane machines: %s", strings.Join(allMessages, ",")), "machinesNeedingRollout", machinesNeedingRollout.Names())
478483
v1beta1conditions.MarkFalse(controlPlane.KCP, controlplanev1.MachinesSpecUpToDateV1Beta1Condition, controlplanev1.RollingUpdateInProgressV1Beta1Reason, clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(machinesNeedingRollout), len(controlPlane.Machines)-len(machinesNeedingRollout))
479-
return r.upgradeControlPlane(ctx, controlPlane, machinesNeedingRollout)
484+
return r.updateControlPlane(ctx, controlPlane, machinesNeedingRollout, machinesNeedingRolloutResults)
480485
default:
481486
// make sure last upgrade operation is marked as completed.
482487
// NOTE: we are checking the condition already exists in order to avoid to set this condition at the first
@@ -506,7 +511,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
506511
case numMachines > desiredReplicas:
507512
log.Info("Scaling down control plane", "desired", desiredReplicas, "existing", numMachines)
508513
// The last parameter (i.e. machines needing to be rolled out) should always be empty here.
509-
return r.scaleDownControlPlane(ctx, controlPlane, collections.Machines{})
514+
// Pick the Machine that we should scale down.
515+
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, collections.Machines{})
516+
if err != nil {
517+
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
518+
}
519+
return r.scaleDownControlPlane(ctx, controlPlane, machineToDelete)
510520
}
511521

512522
// Get the workload cluster client.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
22+
"github.com/pkg/errors"
23+
ctrl "sigs.k8s.io/controller-runtime"
24+
25+
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
26+
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
27+
)
28+
29+
func (r *KubeadmControlPlaneReconciler) tryInPlaceUpdate(
30+
ctx context.Context,
31+
controlPlane *internal.ControlPlane,
32+
machineToInPlaceUpdate *clusterv1.Machine,
33+
machinesNeedingRolloutResult internal.NotUpToDateResult,
34+
) (fallbackToScaleDown bool, _ ctrl.Result, _ error) {
35+
if r.overrideTryInPlaceUpdateFunc != nil {
36+
return r.overrideTryInPlaceUpdateFunc(ctx, controlPlane, machineToInPlaceUpdate, machinesNeedingRolloutResult)
37+
}
38+
39+
return false, ctrl.Result{}, errors.Errorf("in-place updates is not implemented yet")
40+
}

controlplane/kubeadm/internal/controllers/scale.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
6363
}
6464

6565
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
66+
if r.overrideScaleUpControlPlaneFunc != nil {
67+
return r.overrideScaleUpControlPlaneFunc(ctx, controlPlane)
68+
}
69+
6670
log := ctrl.LoggerFrom(ctx)
6771

6872
// Run preflight checks to ensure that the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
@@ -95,16 +99,14 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,
9599
func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
96100
ctx context.Context,
97101
controlPlane *internal.ControlPlane,
98-
outdatedMachines collections.Machines,
102+
machineToDelete *clusterv1.Machine,
99103
) (ctrl.Result, error) {
100-
log := ctrl.LoggerFrom(ctx)
101-
102-
// Pick the Machine that we should scale down.
103-
machineToDelete, err := selectMachineForScaleDown(ctx, controlPlane, outdatedMachines)
104-
if err != nil {
105-
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
104+
if r.overrideScaleDownControlPlaneFunc != nil {
105+
return r.overrideScaleDownControlPlaneFunc(ctx, controlPlane, machineToDelete)
106106
}
107107

108+
log := ctrl.LoggerFrom(ctx)
109+
108110
// Run preflight checks ensuring the control plane is stable before proceeding with a scale up/scale down operation; if not, wait.
109111
// Given that we're scaling down, we can exclude the machineToDelete from the preflight checks.
110112
if result, err := r.preflightChecks(ctx, controlPlane, machineToDelete); err != nil || !result.IsZero() {
@@ -265,7 +267,8 @@ func preflightCheckCondition(kind string, obj *clusterv1.Machine, conditionType
265267
return nil
266268
}
267269

268-
// selectMachineForScaleDown select a machine candidate for scaling down. The selection is a two phase process:
270+
// selectMachineForInPlaceUpdateOrScaleDown select a machine candidate for scaling down or for in-place update.
271+
// The selection is a two phase process:
269272
//
270273
// In the first phase it selects a subset of machines eligible for deletion:
271274
// - if there are outdated machines with the delete machine annotation, use them as eligible subset (priority to user requests, part 1)
@@ -276,18 +279,20 @@ func preflightCheckCondition(kind string, obj *clusterv1.Machine, conditionType
276279
//
277280
// Once the subset of machines eligible for deletion is identified, one machine is picked out of this subset by
278281
// selecting the machine in the failure domain with most machines (including both eligible and not eligible machines).
279-
func selectMachineForScaleDown(ctx context.Context, controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
282+
func selectMachineForInPlaceUpdateOrScaleDown(ctx context.Context, controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
280283
// Select the subset of machines eligible for scale down.
281-
eligibleMachines := controlPlane.Machines
284+
var eligibleMachines collections.Machines
282285
switch {
283286
case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0:
284287
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(outdatedMachines)
285-
case controlPlane.MachineWithDeleteAnnotation(eligibleMachines).Len() > 0:
286-
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(eligibleMachines)
288+
case controlPlane.MachineWithDeleteAnnotation(controlPlane.Machines).Len() > 0:
289+
eligibleMachines = controlPlane.MachineWithDeleteAnnotation(controlPlane.Machines)
287290
case controlPlane.UnhealthyMachinesWithUnhealthyControlPlaneComponents(outdatedMachines).Len() > 0:
288291
eligibleMachines = controlPlane.UnhealthyMachinesWithUnhealthyControlPlaneComponents(outdatedMachines)
289292
case outdatedMachines.Len() > 0:
290293
eligibleMachines = outdatedMachines
294+
default:
295+
eligibleMachines = controlPlane.Machines
291296
}
292297

293298
// Pick an eligible machine from the failure domain with most machines in (including both eligible and not eligible machines)

controlplane/kubeadm/internal/controllers/scale_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
284284
}
285285
controlPlane.InjectTestManagementCluster(r.managementCluster)
286286

287-
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
287+
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
288+
g.Expect(err).ToNot(HaveOccurred())
289+
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
288290
g.Expect(err).ToNot(HaveOccurred())
289291
g.Expect(result).To(BeComparableTo(ctrl.Result{Requeue: true}))
290292

@@ -326,7 +328,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
326328
}
327329
controlPlane.InjectTestManagementCluster(r.managementCluster)
328330

329-
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
331+
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
332+
g.Expect(err).ToNot(HaveOccurred())
333+
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
330334
g.Expect(err).ToNot(HaveOccurred())
331335
g.Expect(result).To(BeComparableTo(ctrl.Result{Requeue: true}))
332336

@@ -364,7 +368,9 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
364368
}
365369
controlPlane.InjectTestManagementCluster(r.managementCluster)
366370

367-
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, controlPlane.Machines)
371+
machineToDelete, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, controlPlane.Machines)
372+
g.Expect(err).ToNot(HaveOccurred())
373+
result, err := r.scaleDownControlPlane(context.Background(), controlPlane, machineToDelete)
368374
g.Expect(err).ToNot(HaveOccurred())
369375
g.Expect(result).To(BeComparableTo(ctrl.Result{RequeueAfter: preflightFailedRequeueAfter}))
370376

@@ -374,7 +380,7 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
374380
})
375381
}
376382

377-
func TestSelectMachineForScaleDown(t *testing.T) {
383+
func TestSelectMachineForInPlaceUpdateOrScaleDown(t *testing.T) {
378384
kcp := controlplanev1.KubeadmControlPlane{
379385
Spec: controlplanev1.KubeadmControlPlaneSpec{},
380386
}
@@ -503,7 +509,7 @@ func TestSelectMachineForScaleDown(t *testing.T) {
503509
t.Run(tc.name, func(t *testing.T) {
504510
g := NewWithT(t)
505511

506-
selectedMachine, err := selectMachineForScaleDown(ctx, tc.cp, tc.outDatedMachines)
512+
selectedMachine, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, tc.cp, tc.outDatedMachines)
507513

508514
if tc.expectErr {
509515
g.Expect(err).To(HaveOccurred())

controlplane/kubeadm/internal/controllers/upgrade.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import (
2727
bootstrapv1 "sigs.k8s.io/cluster-api/api/bootstrap/kubeadm/v1beta2"
2828
controlplanev1 "sigs.k8s.io/cluster-api/api/controlplane/kubeadm/v1beta2"
2929
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
30+
"sigs.k8s.io/cluster-api/feature"
3031
"sigs.k8s.io/cluster-api/util/collections"
3132
)
3233

33-
func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(
34+
func (r *KubeadmControlPlaneReconciler) updateControlPlane(
3435
ctx context.Context,
3536
controlPlane *internal.ControlPlane,
36-
machinesRequireUpgrade collections.Machines,
37+
machinesNeedingRollout collections.Machines,
38+
machinesNeedingRolloutResults map[string]internal.NotUpToDateResult,
3739
) (ctrl.Result, error) {
3840
log := ctrl.LoggerFrom(ctx)
3941

@@ -42,17 +44,17 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(
4244
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
4345
if err != nil {
4446
log.Error(err, "failed to get remote client for workload cluster", "Cluster", klog.KObj(controlPlane.Cluster))
45-
return ctrl.Result{}, err
47+
return ctrl.Result{}, errors.Wrapf(err, "failed to update control plane")
4648
}
4749

4850
parsedVersion, err := semver.ParseTolerant(controlPlane.KCP.Spec.Version)
4951
if err != nil {
50-
return ctrl.Result{}, errors.Wrapf(err, "failed to parse kubernetes version %q", controlPlane.KCP.Spec.Version)
52+
return ctrl.Result{}, errors.Wrapf(err, "failed to update control plane: failed to parse Kubernetes version %q", controlPlane.KCP.Spec.Version)
5153
}
5254

5355
// Ensure kubeadm clusterRoleBinding for v1.29+ as per https://github.com/kubernetes/kubernetes/pull/121305
5456
if err := workloadCluster.AllowClusterAdminPermissions(ctx, parsedVersion); err != nil {
55-
return ctrl.Result{}, errors.Wrap(err, "failed to set cluster-admin ClusterRoleBinding for kubeadm")
57+
return ctrl.Result{}, errors.Wrap(err, "failed to update control plane: failed to set cluster-admin ClusterRoleBinding for kubeadm")
5658
}
5759

5860
kubeadmCMMutators := make([]func(*bootstrapv1.ClusterConfiguration), 0)
@@ -81,21 +83,75 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(
8183

8284
// collectively update Kubeadm config map
8385
if err = workloadCluster.UpdateClusterConfiguration(ctx, parsedVersion, kubeadmCMMutators...); err != nil {
84-
return ctrl.Result{}, err
86+
return ctrl.Result{}, errors.Wrapf(err, "failed to update control plane")
8587
}
8688

8789
switch controlPlane.KCP.Spec.Rollout.Strategy.Type {
8890
case controlplanev1.RollingUpdateStrategyType:
8991
// RolloutStrategy is currently defaulted and validated to be RollingUpdate
90-
// We can ignore MaxUnavailable because we are enforcing health checks before we get here.
91-
maxNodes := *controlPlane.KCP.Spec.Replicas + int32(controlPlane.KCP.Spec.Rollout.Strategy.RollingUpdate.MaxSurge.IntValue())
92-
if int32(controlPlane.Machines.Len()) < maxNodes {
93-
// scaleUp ensures that we don't continue scaling up while waiting for Machines to have NodeRefs
94-
return r.scaleUpControlPlane(ctx, controlPlane)
92+
res, err := r.rollingUpdate(ctx, controlPlane, machinesNeedingRollout, machinesNeedingRolloutResults)
93+
if err != nil {
94+
return ctrl.Result{}, errors.Wrapf(err, "failed to update control plane")
9595
}
96-
return r.scaleDownControlPlane(ctx, controlPlane, machinesRequireUpgrade)
96+
return res, nil
9797
default:
9898
log.Info("RolloutStrategy type is not set to RollingUpdate, unable to determine the strategy for rolling out machines")
9999
return ctrl.Result{}, nil
100100
}
101101
}
102+
103+
func (r *KubeadmControlPlaneReconciler) rollingUpdate(
104+
ctx context.Context,
105+
controlPlane *internal.ControlPlane,
106+
machinesNeedingRollout collections.Machines,
107+
machinesNeedingRolloutResults map[string]internal.NotUpToDateResult,
108+
) (ctrl.Result, error) {
109+
currentReplicas := int32(controlPlane.Machines.Len())
110+
currentUpToDateReplicas := int32(len(controlPlane.UpToDateMachines()))
111+
desiredReplicas := *controlPlane.KCP.Spec.Replicas
112+
maxSurge := int32(controlPlane.KCP.Spec.Rollout.Strategy.RollingUpdate.MaxSurge.IntValue())
113+
// Note: As MaxSurge is validated to be either 0 or 1, maxReplicas will be either desiredReplicas or desiredReplicas+1
114+
maxReplicas := desiredReplicas + maxSurge
115+
116+
// If currentReplicas < maxReplicas we have to scale up
117+
// Note: This is done to ensure we have as many Machines as allowed during rollout to maximize fault tolerance.
118+
if currentReplicas < maxReplicas {
119+
// Note: scaleUpControlPlane ensures that we don't continue scale up while waiting for Machines to have NodeRefs.
120+
return r.scaleUpControlPlane(ctx, controlPlane)
121+
}
122+
123+
// If currentReplicas >= maxReplicas we have to scale down
124+
// Note: If we are already at or above the maximum Machines we have to in-place update or delete a Machine
125+
// to make progress with the update (as we cannot create additional new Machines above the maximum).
126+
127+
// Pick the Machine that we should in-place update or scale down.
128+
machineToInPlaceUpdateOrScaleDown, err := selectMachineForInPlaceUpdateOrScaleDown(ctx, controlPlane, machinesNeedingRollout)
129+
if err != nil {
130+
return ctrl.Result{}, errors.Wrap(err, "failed to select next Machine for rollout")
131+
}
132+
machinesNeedingRolloutResult, ok := machinesNeedingRolloutResults[machineToInPlaceUpdateOrScaleDown.Name]
133+
if !ok {
134+
// Note: This should never happen as we store results for all Machines in machinesNeedingRolloutResults.
135+
return ctrl.Result{}, errors.Errorf("failed to check if Machine is UpToDate %s", machineToInPlaceUpdateOrScaleDown.Name)
136+
}
137+
138+
// If the selected Machine is eligible for in-place update and we don't already have enough up-to-date replicas, try in-place update.
139+
// Note: To be safe we only try an in-place update when we would otherwise delete a Machine. This ensures we could
140+
// afford if the in-place update fails and the Machine becomes unavailable (and eventually MHC kicks in and the Machine is recreated).
141+
if feature.Gates.Enabled(feature.InPlaceUpdates) &&
142+
machinesNeedingRolloutResult.EligibleForInPlaceUpdate && currentUpToDateReplicas < desiredReplicas {
143+
fallbackToScaleDown, res, err := r.tryInPlaceUpdate(ctx, controlPlane, machineToInPlaceUpdateOrScaleDown, machinesNeedingRolloutResult)
144+
if err != nil {
145+
return ctrl.Result{}, err
146+
}
147+
if !res.IsZero() {
148+
return res, nil
149+
}
150+
if fallbackToScaleDown {
151+
return r.scaleDownControlPlane(ctx, controlPlane, machineToInPlaceUpdateOrScaleDown)
152+
}
153+
// In-place update triggered
154+
return ctrl.Result{}, nil // Note: Requeue is not needed, changes to Machines trigger another reconcile.
155+
}
156+
return r.scaleDownControlPlane(ctx, controlPlane, machineToInPlaceUpdateOrScaleDown)
157+
}

0 commit comments

Comments
 (0)