Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4992123
DRA: extract interacting with the scheduler framework out of Predicat…
towca Sep 26, 2024
2d55ff2
DRA: introduce internal NodeInfo/PodInfo with DRA objects attached
towca Sep 27, 2024
7c1f8d5
DRA: migrate all of CA to use the new internal NodeInfo/PodInfo
towca Sep 27, 2024
dfd0234
DRA: remove AddNodeWithPods from ClusterSnapshot, replace uses with A…
towca Sep 30, 2024
fafb78a
DRA: add Initialize to ClusterSnapshot, remove AddNodes
towca Sep 30, 2024
c249f46
DRA: remove redundant IsPVCUsedByPods from ClusterSnapshot
towca Sep 30, 2024
f876a51
DRA: remove AddNode from ClusterSnapshot
towca Sep 30, 2024
bb87555
DRA: refactor utils related to NodeInfos
towca Sep 30, 2024
fad6868
DRA: propagate schedulerframework handle and DRA feature flag to Clus…
towca Sep 30, 2024
26e4787
DRA: Implement a Snapshot of DRA objects, its Provider, and utils
towca Sep 26, 2024
9e32e07
DRA: grab a snapshot of DRA objects and plumb to ClusterSnapshot.Init…
towca Sep 30, 2024
006685c
DRA: propagate DRA objects through NodeInfos in node_info utils
towca Sep 30, 2024
c5edd3b
DRA: rename ClusterSnapshot methods to better reflect their purpose
towca Oct 1, 2024
bdef0a7
DRA: extend ClusterSnapshot.SchedulePod, propagate scheduling state f…
towca Oct 1, 2024
0e055c4
DRA: plumb the DRA snapshot into scheduler framework through ClusterS…
towca Oct 1, 2024
0a11e9c
DRA: implement calculating utilization for DRA resources
towca Oct 1, 2024
ef9d420
DRA: integrate BasicClusterSnapshot with the DRA snapshot
towca Oct 1, 2024
38fb034
DRA: add integration tests
towca Sep 26, 2024
7e70b41
DRA: handle expendable pods using DRA
towca Oct 3, 2024
3544bb4
DRA: handle duplicating unschedulable pods using DRA
towca Oct 4, 2024
2e7eeea
DRA TMP: vendor in the required scheduler framework channges
towca Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
}

gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, a.ctx.EnableDynamicResources, gpuConfig, time.Now())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
}

gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, context.EnableDynamicResources, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}
Expand Down
19 changes: 19 additions & 0 deletions cluster-autoscaler/dynamicresources/resource_claim_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/utils/ptr"
)

Expand Down Expand Up @@ -97,6 +98,24 @@ func AddPodReservationIfNeededInPlace(claim *resourceapi.ResourceClaim, pod *api
return nil
}

// NodeInfoResourceClaims returns all ResourceClaims contained in the PodInfos in this NodeInfo. Shared claims
// are taken into account, each claim should only be returned once.
func NodeInfoResourceClaims(nodeInfo *framework.NodeInfo) []*resourceapi.ResourceClaim {
processedClaims := map[resourceClaimRef]bool{}
var result []*resourceapi.ResourceClaim
for _, pod := range nodeInfo.Pods {
for _, claim := range pod.NeededResourceClaims {
if processedClaims[resourceClaimRef{Namespace: claim.Namespace, Name: claim.Name}] {
// Shared claim, already grouped.
continue
}
result = append(result, claim)
processedClaims[resourceClaimRef{Namespace: claim.Namespace, Name: claim.Name}] = true
}
}
return result
}

func claimConsumerReferenceMatchesPod(pod *apiv1.Pod, ref resourceapi.ResourceClaimConsumerReference) bool {
return ref.APIGroup == "" && ref.Resource == "pods" && ref.Name == pod.Name && ref.UID == pod.UID
}
Expand Down
87 changes: 87 additions & 0 deletions cluster-autoscaler/dynamicresources/resource_slice_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)

// GetAllDevices aggregates all Devices from the provided ResourceSlices into one list.
Expand Down Expand Up @@ -75,6 +76,92 @@ func AllCurrentGenSlices(slices []*resourceapi.ResourceSlice) ([]*resourceapi.Re
return maxGenSlices, nil
}

// GroupAllocatedDevices groups the devices from claim allocations by their driver and pool. Returns an error
// if any of the claims isn't allocated.
func GroupAllocatedDevices(claims []*resourceapi.ResourceClaim) (map[string]map[string][]string, error) {
result := map[string]map[string][]string{}
for _, claim := range claims {
alloc := claim.Status.Allocation
if alloc == nil {
return nil, fmt.Errorf("claim %s/%s not allocated", claim.Namespace, claim.Name)
}

for _, deviceAlloc := range alloc.Devices.Results {
if result[deviceAlloc.Driver] == nil {
result[deviceAlloc.Driver] = map[string][]string{}
}
result[deviceAlloc.Driver][deviceAlloc.Pool] = append(result[deviceAlloc.Driver][deviceAlloc.Pool], deviceAlloc.Device)
}
}
return result, nil
}

// CalculateDynamicResourceUtils calculates a map of ResourceSlice pool utilization grouped by the driver and pool. Returns
// an error if the NodeInfo doesn't have all ResourceSlices from a pool.
func CalculateDynamicResourceUtils(nodeInfo *framework.NodeInfo) (map[string]map[string]float64, error) {
result := map[string]map[string]float64{}
claims := NodeInfoResourceClaims(nodeInfo)
allocatedDevices, err := GroupAllocatedDevices(claims)
if err != nil {
return nil, err
}
for driverName, slicesByPool := range GroupSlices(nodeInfo.LocalResourceSlices) {
result[driverName] = map[string]float64{}
for poolName, poolSlices := range slicesByPool {
currentSlices, err := AllCurrentGenSlices(poolSlices)
if err != nil {
return nil, fmt.Errorf("pool %q error: %v", poolName, err)
}
poolDevices := GetAllDevices(currentSlices)
allocatedDeviceNames := allocatedDevices[driverName][poolName]
unallocated, allocated := splitDevicesByAllocation(poolDevices, allocatedDeviceNames)
result[driverName][poolName] = calculatePoolUtil(unallocated, allocated)
}
}
return result, nil
}

// HighestDynamicResourceUtil returns the ResourceSlice driver and pool with the highest utilization.
func HighestDynamicResourceUtil(nodeInfo *framework.NodeInfo) (v1.ResourceName, float64, error) {
utils, err := CalculateDynamicResourceUtils(nodeInfo)
if err != nil {
return "", 0, err
}

highestUtil := 0.0
var highestResourceName v1.ResourceName
for driverName, utilsByPool := range utils {
for poolName, util := range utilsByPool {
if util >= highestUtil {
highestUtil = util
highestResourceName = v1.ResourceName(driverName + "/" + poolName)
}
}
}
return highestResourceName, highestUtil, nil
}

func calculatePoolUtil(unallocated, allocated []resourceapi.Device) float64 {
numAllocated := float64(len(allocated))
numUnallocated := float64(len(unallocated))
return numAllocated / (numAllocated + numUnallocated)
}

func splitDevicesByAllocation(devices []resourceapi.Device, allocatedNames []string) (unallocated, allocated []resourceapi.Device) {
allocatedNamesSet := map[string]bool{}
for _, allocatedName := range allocatedNames {
allocatedNamesSet[allocatedName] = true
}
for _, device := range devices {
if allocatedNamesSet[device.Name] {
allocated = append(allocated, device)
} else {
unallocated = append(unallocated, device)
}
}
return unallocated, allocated
}

func nodeSelectorSingleNode(selector *v1.NodeSelector) string {
if selector == nil {
// Nil selector means all nodes, so not a single node.
Expand Down
18 changes: 14 additions & 4 deletions cluster-autoscaler/simulator/utilization/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/dynamicresources"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
Expand All @@ -34,9 +35,10 @@ import (

// Info contains utilization information for a node.
type Info struct {
CpuUtil float64
MemUtil float64
GpuUtil float64
CpuUtil float64
MemUtil float64
GpuUtil float64
DynamicResourceUtil float64
// Resource name of highest utilization resource
ResourceName apiv1.ResourceName
// Max(CpuUtil, MemUtil) or GpuUtils
Expand All @@ -47,7 +49,7 @@ type Info struct {
// memory) or gpu utilization based on if the node has GPU or not. Per resource
// utilization is the sum of requests for it divided by allocatable. It also
// returns the individual cpu, memory and gpu utilization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth also mentioning DRA in this comment alongside CPU/GPU/memory?

func Calculate(nodeInfo *framework.NodeInfo, skipDaemonSetPods, skipMirrorPods bool, gpuConfig *cloudprovider.GpuConfig, currentTime time.Time) (utilInfo Info, err error) {
func Calculate(nodeInfo *framework.NodeInfo, skipDaemonSetPods, skipMirrorPods, draEnabled bool, gpuConfig *cloudprovider.GpuConfig, currentTime time.Time) (utilInfo Info, err error) {
if gpuConfig != nil {
gpuUtil, err := CalculateUtilizationOfResource(nodeInfo, gpuConfig.ResourceName, skipDaemonSetPods, skipMirrorPods, currentTime)
if err != nil {
Expand All @@ -59,6 +61,14 @@ func Calculate(nodeInfo *framework.NodeInfo, skipDaemonSetPods, skipMirrorPods b
return Info{GpuUtil: gpuUtil, ResourceName: gpuConfig.ResourceName, Utilization: gpuUtil}, err
}

if draEnabled && len(nodeInfo.LocalResourceSlices) > 0 {
resourceName, highestUtil, err := dynamicresources.HighestDynamicResourceUtil(nodeInfo)
if err != nil {
return Info{}, err
}
return Info{DynamicResourceUtil: highestUtil, Utilization: highestUtil, ResourceName: resourceName}, nil
}

cpu, err := CalculateUtilizationOfResource(nodeInfo, apiv1.ResourceCPU, skipDaemonSetPods, skipMirrorPods, currentTime)
if err != nil {
return Info{}, err
Expand Down
24 changes: 13 additions & 11 deletions cluster-autoscaler/simulator/utilization/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/stretchr/testify/assert"
)

// TODO(DRA): Add DRA-specific test cases.

func TestCalculate(t *testing.T) {
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
pod := BuildTestPod("p1", 100, 200000)
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestCalculate(t *testing.T) {
nodeInfo := framework.NewTestNodeInfo(node, pod, pod, pod2)

gpuConfig := GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err := Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err := Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)
assert.Equal(t, 0.1, utilInfo.CpuUtil)
Expand All @@ -86,15 +88,15 @@ func TestCalculate(t *testing.T) {
nodeInfo = framework.NewTestNodeInfo(node2, pod, pod, pod2)

gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
_, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
_, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.Error(t, err)

node3 := BuildTestNode("node3", 2000, 2000000)
SetNodeReadyState(node3, true, time.Time{})
nodeInfo = framework.NewTestNodeInfo(node3, pod, podWithInitContainers, podWithLargeNonRestartableInitContainers)

gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 50.25, utilInfo.Utilization, 0.01)
assert.Equal(t, 25.125, utilInfo.CpuUtil)
Expand All @@ -108,21 +110,21 @@ func TestCalculate(t *testing.T) {

nodeInfo = framework.NewTestNodeInfo(node, pod, pod, pod2, daemonSetPod3, daemonSetPod4)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, true, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, true, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.5/10, utilInfo.Utilization, 0.01)

nodeInfo = framework.NewTestNodeInfo(node, pod, pod2, daemonSetPod3)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)

terminatedPod := BuildTestPod("podTerminated", 100, 200000)
terminatedPod.DeletionTimestamp = &metav1.Time{Time: testTime.Add(-10 * time.Minute)}
nodeInfo = framework.NewTestNodeInfo(node, pod, pod, pod2, terminatedPod)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)

Expand All @@ -133,19 +135,19 @@ func TestCalculate(t *testing.T) {

nodeInfo = framework.NewTestNodeInfo(node, pod, pod, pod2, mirrorPod)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, true, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, true, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/9.0, utilInfo.Utilization, 0.01)

nodeInfo = framework.NewTestNodeInfo(node, pod, pod2, mirrorPod)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)

nodeInfo = framework.NewTestNodeInfo(node, pod, mirrorPod, daemonSetPod3)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, true, true, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, true, true, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 1.0/8.0, utilInfo.Utilization, 0.01)

Expand All @@ -156,7 +158,7 @@ func TestCalculate(t *testing.T) {
TolerateGpuForPod(gpuPod)
nodeInfo = framework.NewTestNodeInfo(gpuNode, pod, pod, gpuPod)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.InEpsilon(t, 1/1, utilInfo.Utilization, 0.01)

Expand All @@ -165,7 +167,7 @@ func TestCalculate(t *testing.T) {
AddGpuLabelToNode(gpuNode)
nodeInfo = framework.NewTestNodeInfo(gpuNode, pod, pod)
gpuConfig = GetGpuConfigFromNode(nodeInfo.Node())
utilInfo, err = Calculate(nodeInfo, false, false, gpuConfig, testTime)
utilInfo, err = Calculate(nodeInfo, false, false, false, gpuConfig, testTime)
assert.NoError(t, err)
assert.Zero(t, utilInfo.Utilization)
}