Skip to content

Commit 30e57c9

Browse files
authored
Merge pull request #7466 from towca/jtuznik/dra-snapshot-cleanup
CA: refactor ClusterSnapshot methods
2 parents 4c37ff3 + 473a1a8 commit 30e57c9

File tree

20 files changed

+199
-287
lines changed

20 files changed

+199
-287
lines changed

cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"k8s.io/autoscaler/cluster-autoscaler/context"
2424
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
2525
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
26-
klog "k8s.io/klog/v2"
26+
"k8s.io/klog/v2"
2727
)
2828

2929
type filterOutExpendable struct {
@@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
5656
// CA logic from before migration to scheduler framework. So let's keep it for now
5757
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
5858
for _, p := range pods {
59-
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
59+
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
6060
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
6161
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
6262
}

cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222

2323
"github.com/stretchr/testify/assert"
24+
2425
apiv1 "k8s.io/api/core/v1"
2526
"k8s.io/autoscaler/cluster-autoscaler/config"
2627
"k8s.io/autoscaler/cluster-autoscaler/context"
@@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) {
109110
t.Run(tc.name, func(t *testing.T) {
110111
processor := NewFilterOutExpendablePodListProcessor()
111112
snapshot := clustersnapshot.NewBasicClusterSnapshot()
112-
snapshot.AddNodes(tc.nodes)
113+
err := snapshot.SetClusterState(tc.nodes, nil)
114+
assert.NoError(t, err)
113115

114116
pods, err := processor.Process(&context.AutoscalingContext{
115117
ClusterSnapshot: snapshot,

cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/stretchr/testify/assert"
25+
2526
apiv1 "k8s.io/api/core/v1"
2627
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2728
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
@@ -183,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) {
183184
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)
184185

185186
for node, pods := range tc.nodesWithPods {
186-
err := clusterSnapshot.AddNode(node)
187-
assert.NoError(t, err)
188-
189187
for _, pod := range pods {
190188
pod.Spec.NodeName = node.Name
191-
err = clusterSnapshot.AddPod(pod, node.Name)
192-
assert.NoError(t, err)
193-
194189
allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
195190
}
191+
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
192+
assert.NoError(t, err)
196193
}
197194

198195
clusterSnapshot.Fork()
@@ -286,15 +283,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
286283
assert.NoError(b, err)
287284

288285
clusterSnapshot := snapshotFactory()
289-
if err := clusterSnapshot.AddNodes(nodes); err != nil {
286+
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
290287
assert.NoError(b, err)
291288
}
292289

293-
for _, pod := range scheduledPods {
294-
if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
295-
assert.NoError(b, err)
296-
}
297-
}
298290
b.ResetTimer()
299291

300292
for i := 0; i < b.N; i++ {

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
356356
}
357357

358358
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
359-
knownNodes := make(map[string]bool)
360359
snapshot := clustersnapshot.NewBasicClusterSnapshot()
361360
pods, err := a.ctx.AllPodLister().List()
362361
if err != nil {
@@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
366365
scheduledPods := kube_util.ScheduledPods(pods)
367366
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
368367

369-
for _, node := range nodes {
370-
if err := snapshot.AddNode(node); err != nil {
371-
return nil, err
372-
}
373-
374-
knownNodes[node.Name] = true
375-
}
376-
377-
for _, pod := range nonExpendableScheduledPods {
378-
if knownNodes[pod.Spec.NodeName] {
379-
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
380-
return nil, err
381-
}
382-
}
368+
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
369+
if err != nil {
370+
return nil, err
383371
}
384-
385372
return snapshot, nil
386373
}
387374

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
4444
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
4545
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
46+
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
4647
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
4748
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
4849
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
@@ -1159,7 +1160,7 @@ func TestStartDeletion(t *testing.T) {
11591160
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
11601161
for _, bucket := range emptyNodeGroupViews {
11611162
for _, node := range bucket.Nodes {
1162-
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
1163+
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...))
11631164
if err != nil {
11641165
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
11651166
}
@@ -1171,7 +1172,7 @@ func TestStartDeletion(t *testing.T) {
11711172
if !found {
11721173
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
11731174
}
1174-
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
1175+
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
11751176
if err != nil {
11761177
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
11771178
}

cluster-autoscaler/core/scaledown/actuation/drain_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
3838
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
3939
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
40+
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
4041
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
4142
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
4243
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -612,7 +613,7 @@ func TestPodsToEvict(t *testing.T) {
612613
t.Run(tn, func(t *testing.T) {
613614
snapshot := clustersnapshot.NewBasicClusterSnapshot()
614615
node := BuildTestNode("test-node", 1000, 1000)
615-
err := snapshot.AddNodeWithPods(node, tc.pods)
616+
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...))
616617
if err != nil {
617618
t.Errorf("AddNodeWithPods unexpected error: %v", err)
618619
}

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -569,11 +569,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
569569
defer o.autoscalingContext.ClusterSnapshot.Revert()
570570

571571
// Add test node to snapshot.
572-
var allPods []*apiv1.Pod
573-
for _, podInfo := range nodeInfo.Pods() {
574-
allPods = append(allPods, podInfo.Pod)
575-
}
576-
if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil {
572+
if err := o.autoscalingContext.ClusterSnapshot.AddNodeInfo(nodeInfo); err != nil {
577573
klog.Errorf("Error while adding test Node: %v", err)
578574
return []estimator.PodEquivalenceGroup{}
579575
}

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
3535
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3636
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
37-
orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
37+
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
3838
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
3939
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
4040
"k8s.io/autoscaler/cluster-autoscaler/estimator"
@@ -58,7 +58,7 @@ import (
5858

5959
apiv1 "k8s.io/api/core/v1"
6060
"k8s.io/apimachinery/pkg/labels"
61-
klog "k8s.io/klog/v2"
61+
"k8s.io/klog/v2"
6262
)
6363

6464
const (
@@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
242242
a.initialized = true
243243
}
244244

245-
func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
246-
a.ClusterSnapshot.Clear()
247-
248-
knownNodes := make(map[string]bool)
249-
for _, node := range nodes {
250-
if err := a.ClusterSnapshot.AddNode(node); err != nil {
251-
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
252-
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
253-
}
254-
knownNodes[node.Name] = true
255-
}
256-
for _, pod := range scheduledPods {
257-
if knownNodes[pod.Spec.NodeName] {
258-
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
259-
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
260-
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
261-
}
262-
}
263-
}
264-
return nil
265-
}
266-
267245
func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError {
268246
a.RemainingPdbTracker.Clear()
269247

@@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
361339
}
362340
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
363341
// Initialize cluster state to ClusterSnapshot
364-
if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil {
365-
return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ")
342+
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
343+
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
366344
}
367345
// Initialize Pod Disruption Budget tracking
368346
if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
@@ -486,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
486464
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
487465
// Remove the nodes from the snapshot as well so that the state is consistent.
488466
for _, notStartedNodeName := range allRegisteredUpcoming {
489-
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
467+
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
490468
if err != nil {
491469
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
492470
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
@@ -682,20 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
682660
nodeGroups := a.nodeGroupsById()
683661
upcomingNodeGroups := make(map[string]int)
684662
upcomingNodesFromUpcomingNodeGroups := 0
685-
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
663+
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
686664
nodeGroup := nodeGroups[nodeGroupName]
687665
if nodeGroup == nil {
688666
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
689667
}
690668
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
691-
for _, upcomingNode := range upcomingNodes {
692-
var pods []*apiv1.Pod
693-
for _, podInfo := range upcomingNode.Pods() {
694-
pods = append(pods, podInfo.Pod)
695-
}
696-
err := a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods)
669+
for _, upcomingNodeInfo := range upcomingNodeInfos {
670+
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
697671
if err != nil {
698-
return fmt.Errorf("Failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
672+
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
699673
}
700674
if isUpcomingNodeGroup {
701675
upcomingNodesFromUpcomingNodeGroups++

cluster-autoscaler/estimator/binpacking_estimator.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
2626
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
2727
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
28-
klog "k8s.io/klog/v2"
28+
"k8s.io/klog/v2"
2929
)
3030

3131
// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
@@ -211,11 +211,7 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
211211
template *framework.NodeInfo,
212212
) error {
213213
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
214-
var pods []*apiv1.Pod
215-
for _, podInfo := range newNodeInfo.Pods() {
216-
pods = append(pods, podInfo.Pod)
217-
}
218-
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
214+
if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil {
219215
return err
220216
}
221217
estimationState.newNodeNameIndex++
@@ -229,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
229225
pod *apiv1.Pod,
230226
nodeName string,
231227
) error {
232-
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
228+
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
233229
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
234230
}
235231
estimationState.newNodesWithPods[nodeName] = true

cluster-autoscaler/estimator/binpacking_estimator_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) {
214214
t.Run(tc.name, func(t *testing.T) {
215215
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
216216
// Add one node in different zone to trigger topology spread constraints
217-
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
217+
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
218+
assert.NoError(t, err)
218219

219220
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
220221
assert.NoError(t, err)
@@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) {
268269

269270
for i := 0; i < b.N; i++ {
270271
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
271-
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
272+
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
273+
assert.NoError(b, err)
272274

273275
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
274276
assert.NoError(b, err)

0 commit comments

Comments
 (0)