Skip to content

Commit f67db62

Browse files
committed
CA: rename ClusterSnapshot AddPod, RemovePod, RemoveNode
RemoveNode is renamed to RemoveNodeInfo for consistency with other NodeInfo methods. For DRA, the snapshot will have to potentially allocate ResourceClaims when adding a Pod to a Node, and deallocate them when removing a Pod from a Node. This will happen in new methods added to ClusterSnapshot in later commits - SchedulePod and UnschedulePod. These new methods should be the "default" way of moving pods around the snapshot going forward. However, we'll still need to be able to add and remove pods from the snapshot "forcefully" to handle some corner cases (e.g. expendable pods). AddPod is renamed to ForceAddPod, and RemovePod to ForceRemovePod to highlight that these are no longer the "default" methods of moving pods around the snapshot, and are bypassing something important.
1 parent a81aa5c commit f67db62

File tree

11 files changed

+58
-56
lines changed

11 files changed

+58
-56
lines changed

cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"k8s.io/autoscaler/cluster-autoscaler/context"
2424
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
2525
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
26-
klog "k8s.io/klog/v2"
26+
"k8s.io/klog/v2"
2727
)
2828

2929
type filterOutExpendable struct {
@@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
5656
// CA logic from before migration to scheduler framework. So let's keep it for now
5757
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
5858
for _, p := range pods {
59-
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
59+
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
6060
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
6161
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
6262
}

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
464464
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
465465
// Remove the nodes from the snapshot as well so that the state is consistent.
466466
for _, notStartedNodeName := range allRegisteredUpcoming {
467-
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
467+
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
468468
if err != nil {
469469
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
470470
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
@@ -660,16 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
660660
nodeGroups := a.nodeGroupsById()
661661
upcomingNodeGroups := make(map[string]int)
662662
upcomingNodesFromUpcomingNodeGroups := 0
663-
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
663+
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
664664
nodeGroup := nodeGroups[nodeGroupName]
665665
if nodeGroup == nil {
666666
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
667667
}
668668
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
669-
for _, upcomingNode := range upcomingNodes {
670-
err := a.ClusterSnapshot.AddNodeInfo(upcomingNode)
669+
for _, upcomingNodeInfo := range upcomingNodeInfos {
670+
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
671671
if err != nil {
672-
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
672+
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
673673
}
674674
if isUpcomingNodeGroup {
675675
upcomingNodesFromUpcomingNodeGroups++

cluster-autoscaler/estimator/binpacking_estimator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2626
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
2727
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
28-
klog "k8s.io/klog/v2"
28+
"k8s.io/klog/v2"
2929
)
3030

3131
// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
@@ -225,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
225225
pod *apiv1.Pod,
226226
nodeName string,
227227
) error {
228-
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
228+
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
229229
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
230230
}
231231
estimationState.newNodesWithPods[nodeName] = true

cluster-autoscaler/simulator/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3333
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
3434

35-
klog "k8s.io/klog/v2"
35+
"k8s.io/klog/v2"
3636
)
3737

3838
// NodeToBeRemoved contain information about a node that can be removed.
@@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n
223223

224224
// remove pods from clusterSnapshot first
225225
for _, pod := range pods {
226-
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
226+
if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
227227
// just log error
228228
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
229229
}

cluster-autoscaler/simulator/clustersnapshot/basic.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
153153
return nil
154154
}
155155

156-
func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
156+
func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error {
157157
if _, found := data.nodeInfoMap[nodeName]; !found {
158158
return ErrNodeNotFound
159159
}
@@ -253,18 +253,18 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
253253
return nil
254254
}
255255

256-
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
257-
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
258-
return snapshot.getInternalData().removeNode(nodeName)
256+
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
257+
func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error {
258+
return snapshot.getInternalData().removeNodeInfo(nodeName)
259259
}
260260

261-
// AddPod adds pod to the snapshot and schedules it to given node.
262-
func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
261+
// ForceAddPod adds pod to the snapshot and schedules it to given node.
262+
func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
263263
return snapshot.getInternalData().addPod(pod, nodeName)
264264
}
265265

266-
// RemovePod removes pod from the snapshot.
267-
func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
266+
// ForceRemovePod removes pod from the snapshot.
267+
func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
268268
return snapshot.getInternalData().removePod(namespace, podName, nodeName)
269269
}
270270

cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,17 @@ type ClusterSnapshot interface {
3434
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
3535
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error
3636

37-
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
38-
RemoveNode(nodeName string) error
39-
// AddPod adds pod to the snapshot and schedules it to given node.
40-
AddPod(pod *apiv1.Pod, nodeName string) error
41-
// RemovePod removes pod from the snapshot.
42-
RemovePod(namespace string, podName string, nodeName string) error
37+
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot.
38+
ForceAddPod(pod *apiv1.Pod, nodeName string) error
39+
// ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot.
40+
ForceRemovePod(namespace string, podName string, nodeName string) error
4341

4442
// AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as
4543
// any DRA objects passed along them.
4644
AddNodeInfo(nodeInfo *framework.NodeInfo) error
45+
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
46+
// any DRA objects owned by them.
47+
RemoveNodeInfo(nodeName string) error
4748
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
4849
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
4950
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.

cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func BenchmarkAddPods(b *testing.B) {
133133
err := clusterSnapshot.SetClusterState(nodes, nil)
134134
assert.NoError(b, err)
135135
b.ResetTimer()
136-
b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
136+
b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
137137
for i := 0; i < b.N; i++ {
138138
b.StopTimer()
139139

@@ -143,7 +143,7 @@ func BenchmarkAddPods(b *testing.B) {
143143
}
144144
b.StartTimer()
145145
for _, pod := range pods {
146-
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
146+
err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName)
147147
if err != nil {
148148
assert.NoError(b, err)
149149
}

cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,22 @@ func validTestCases(t *testing.T) []modificationTestCase {
115115
},
116116
},
117117
{
118-
name: "remove node",
118+
name: "remove nodeInfo",
119119
state: snapshotState{
120120
nodes: []*apiv1.Node{node},
121121
},
122122
op: func(snapshot ClusterSnapshot) {
123-
err := snapshot.RemoveNode(node.Name)
123+
err := snapshot.RemoveNodeInfo(node.Name)
124124
assert.NoError(t, err)
125125
},
126126
},
127127
{
128-
name: "remove node, then add it back",
128+
name: "remove nodeInfo, then add it back",
129129
state: snapshotState{
130130
nodes: []*apiv1.Node{node},
131131
},
132132
op: func(snapshot ClusterSnapshot) {
133-
err := snapshot.RemoveNode(node.Name)
133+
err := snapshot.RemoveNodeInfo(node.Name)
134134
assert.NoError(t, err)
135135

136136
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
@@ -141,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase {
141141
},
142142
},
143143
{
144-
name: "add pod, then remove node",
144+
name: "add pod, then remove nodeInfo",
145145
state: snapshotState{
146146
nodes: []*apiv1.Node{node},
147147
},
148148
op: func(snapshot ClusterSnapshot) {
149-
err := snapshot.AddPod(pod, node.Name)
149+
err := snapshot.ForceAddPod(pod, node.Name)
150150
assert.NoError(t, err)
151-
err = snapshot.RemoveNode(node.Name)
151+
err = snapshot.RemoveNodeInfo(node.Name)
152152
assert.NoError(t, err)
153153
},
154154
},
@@ -326,7 +326,7 @@ func TestClear(t *testing.T) {
326326
}
327327

328328
for _, pod := range extraPods {
329-
err := snapshot.AddPod(pod, pod.Spec.NodeName)
329+
err := snapshot.ForceAddPod(pod, pod.Spec.NodeName)
330330
assert.NoError(t, err)
331331
}
332332

@@ -349,17 +349,17 @@ func TestNode404(t *testing.T) {
349349
op func(ClusterSnapshot) error
350350
}{
351351
{"add pod", func(snapshot ClusterSnapshot) error {
352-
return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node")
352+
return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node")
353353
}},
354354
{"remove pod", func(snapshot ClusterSnapshot) error {
355-
return snapshot.RemovePod("default", "p1", "node")
355+
return snapshot.ForceRemovePod("default", "p1", "node")
356356
}},
357357
{"get node", func(snapshot ClusterSnapshot) error {
358358
_, err := snapshot.NodeInfos().Get("node")
359359
return err
360360
}},
361-
{"remove node", func(snapshot ClusterSnapshot) error {
362-
return snapshot.RemoveNode("node")
361+
{"remove nodeInfo", func(snapshot ClusterSnapshot) error {
362+
return snapshot.RemoveNodeInfo("node")
363363
}},
364364
}
365365

@@ -385,7 +385,7 @@ func TestNode404(t *testing.T) {
385385
snapshot.Fork()
386386
assert.NoError(t, err)
387387

388-
err = snapshot.RemoveNode("node")
388+
err = snapshot.RemoveNodeInfo("node")
389389
assert.NoError(t, err)
390390

391391
// Node deleted after fork - shouldn't be able to operate on it.
@@ -408,7 +408,7 @@ func TestNode404(t *testing.T) {
408408
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
409409
assert.NoError(t, err)
410410

411-
err = snapshot.RemoveNode("node")
411+
err = snapshot.RemoveNodeInfo("node")
412412
assert.NoError(t, err)
413413

414414
// Node deleted from base - shouldn't be able to operate on it.
@@ -625,7 +625,7 @@ func TestPVCUsedByPods(t *testing.T) {
625625
assert.Equal(t, tc.exists, volumeExists)
626626

627627
if tc.removePod != "" {
628-
err = snapshot.RemovePod("default", tc.removePod, "node")
628+
err = snapshot.ForceRemovePod("default", tc.removePod, "node")
629629
assert.NoError(t, err)
630630

631631
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
@@ -698,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) {
698698
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
699699
assert.Equal(t, true, volumeExists)
700700

701-
err = snapshot.AddPod(pod2, "node")
701+
err = snapshot.ForceAddPod(pod2, "node")
702702
assert.NoError(t, err)
703703

704704
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))

cluster-autoscaler/simulator/clustersnapshot/delta.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() {
177177
data.pvcNamespaceMap = nil
178178
}
179179

180-
func (data *internalDeltaSnapshotData) removeNode(nodeName string) error {
180+
func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error {
181181
_, foundInDelta := data.addedNodeInfoMap[nodeName]
182182
if foundInDelta {
183183
// If node was added within this delta, delete this change.
@@ -296,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err
296296
return data, nil
297297
}
298298
for node := range data.deletedNodeInfos {
299-
if err := data.baseData.removeNode(node); err != nil {
299+
if err := data.baseData.removeNodeInfo(node); err != nil {
300300
return nil, err
301301
}
302302
}
303303
for _, node := range data.modifiedNodeInfoMap {
304-
if err := data.baseData.removeNode(node.Node().Name); err != nil {
304+
if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil {
305305
return nil, err
306306
}
307307
if err := data.baseData.addNodeInfo(node); err != nil {
@@ -442,18 +442,18 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
442442
return nil
443443
}
444444

445-
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
446-
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
447-
return snapshot.data.removeNode(nodeName)
445+
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
446+
func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error {
447+
return snapshot.data.removeNodeInfo(nodeName)
448448
}
449449

450-
// AddPod adds pod to the snapshot and schedules it to given node.
451-
func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
450+
// ForceAddPod adds pod to the snapshot and schedules it to given node.
451+
func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
452452
return snapshot.data.addPod(pod, nodeName)
453453
}
454454

455-
// RemovePod removes pod from the snapshot.
456-
func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
455+
// ForceRemovePod removes pod from the snapshot.
456+
func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
457457
return snapshot.data.removePod(namespace, podName, nodeName)
458458
}
459459

cluster-autoscaler/simulator/clustersnapshot/test_utils.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"testing"
2121

2222
"github.com/stretchr/testify/assert"
23+
2324
apiv1 "k8s.io/api/core/v1"
2425
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2526
)
@@ -42,10 +43,10 @@ func InitializeClusterSnapshotOrDie(
4243

4344
for _, pod := range pods {
4445
if pod.Spec.NodeName != "" {
45-
err = snapshot.AddPod(pod, pod.Spec.NodeName)
46+
err = snapshot.ForceAddPod(pod, pod.Spec.NodeName)
4647
assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
4748
} else if pod.Status.NominatedNodeName != "" {
48-
err = snapshot.AddPod(pod, pod.Status.NominatedNodeName)
49+
err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName)
4950
assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName)
5051
} else {
5152
assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name)

0 commit comments

Comments
 (0)