-
Notifications
You must be signed in to change notification settings - Fork 9
Withdraw site resource when vm creation is failed #300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 21 commits
74f1776
207f229
86cde93
c75e438
0c14cbe
bba5311
7773eeb
3b5a1c3
f1e1542
fe2ba14
ec4735b
4a49283
80033e6
b6e04d9
37f0e04
d05b4f9
414d752
0416df5
504fe0b
baa3a96
3d3664a
ed4c2b1
0dfb890
ea63fd4
1f6026d
c174688
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,11 +26,13 @@ import ( | |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| apitypes "k8s.io/apimachinery/pkg/types" | ||
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
| //"k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/tools/cache" | ||
| "k8s.io/klog" | ||
| clusterv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/cluster/v1" | ||
| "k8s.io/kubernetes/globalscheduler/pkg/scheduler/common/constants" | ||
| "k8s.io/kubernetes/globalscheduler/pkg/scheduler/types" | ||
| "k8s.io/kubernetes/globalscheduler/pkg/scheduler/utils" | ||
| "k8s.io/kubernetes/pkg/controller" | ||
| statusutil "k8s.io/kubernetes/pkg/util/pod" | ||
| ) | ||
|
|
@@ -113,6 +115,31 @@ func AddAllEventHandlers(sched *Scheduler) { | |
| }, | ||
| }, | ||
| ) | ||
| // failed pod queue | ||
| sched.PodInformer.Informer().AddEventHandler( | ||
| cache.FilteringResourceEventHandler{ | ||
| FilterFunc: func(obj interface{}) bool { | ||
| switch t := obj.(type) { | ||
| case *v1.Pod: | ||
| return failedToSchedule(t) && responsibleForPod(t, sched.SchedulerName) | ||
| case cache.DeletedFinalStateUnknown: | ||
| if pod, ok := t.Obj.(*v1.Pod); ok { | ||
| return failedToSchedule(pod) && responsibleForPod(pod, sched.SchedulerName) | ||
| } | ||
| utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) | ||
| return false | ||
| default: | ||
| utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) | ||
| return false | ||
| } | ||
| }, | ||
| Handler: cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: sched.addPodWithdrawResource, | ||
| UpdateFunc: sched.updatePodWithdrawResource, | ||
| DeleteFunc: sched.deletePodWithdrawResource, | ||
| }, | ||
| }, | ||
| ) | ||
| sched.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: sched.addCluster, | ||
| UpdateFunc: sched.updateCluster, | ||
|
|
@@ -135,10 +162,14 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool { | |
| return schedulerName == pod.Status.AssignedScheduler.Name | ||
| } | ||
|
|
||
| // failedToSchedule selects pods that scheduled but failed to create vm | ||
| func failedToSchedule(pod *v1.Pod) bool { | ||
| return pod.Status.Phase == v1.PodFailed | ||
| } | ||
|
|
||
| // addPodToCache add pod to the stack cache of the scheduler | ||
| func (sched *Scheduler) addPodToCache(obj interface{}) { | ||
| pod, ok := obj.(*v1.Pod) | ||
| klog.Infof("Add a pod: %v", pod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert to *v1.Pod: %v", obj) | ||
| return | ||
|
|
@@ -160,7 +191,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { | |
| return | ||
| } | ||
| newPod, ok := newObj.(*v1.Pod) | ||
| klog.Infof("Update a pod: %v", newPod) | ||
| klog.V(4).Infof("Update a pod: %v", newPod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) | ||
| return | ||
|
|
@@ -178,7 +209,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { | |
| switch t := obj.(type) { | ||
| case *v1.Pod: | ||
| pod = t | ||
| klog.Infof("Delete a pod: %v", pod) | ||
| klog.V(4).Infof("Delete a pod: %v", pod.Name) | ||
| case cache.DeletedFinalStateUnknown: | ||
| var ok bool | ||
| pod, ok = t.Obj.(*v1.Pod) | ||
|
|
@@ -301,15 +332,13 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { | |
| return | ||
| } | ||
| newPod, ok := newObj.(*v1.Pod) | ||
| klog.Infof("updatePodToSchedulingQueue : %v", newPod) | ||
| klog.V(4).Infof("updatePodToSchedulingQueue : %v", newPod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) | ||
| return | ||
| } | ||
|
|
||
| oldStack := getStackFromPod(oldPod) | ||
| newStack := getStackFromPod(newPod) | ||
|
|
||
| if sched.skipStackUpdate(newStack) { | ||
| return | ||
| } | ||
|
|
@@ -323,7 +352,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { | |
| switch t := obj.(type) { | ||
| case *v1.Pod: | ||
| pod = obj.(*v1.Pod) | ||
| klog.Infof("deletePodToSchedulingQueue : %v", pod) | ||
| klog.V(4).Infof("deletePodToSchedulingQueue : %v", pod.Name) | ||
| case cache.DeletedFinalStateUnknown: | ||
| var ok bool | ||
| pod, ok = t.Obj.(*v1.Pod) | ||
|
|
@@ -372,14 +401,14 @@ func (sched *Scheduler) skipStackUpdate(stack *types.Stack) bool { | |
| if !reflect.DeepEqual(assumedStackCopy, stackCopy) { | ||
| return false | ||
| } | ||
| klog.V(3).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName) | ||
| klog.V(4).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName) | ||
| return true | ||
| } | ||
|
|
||
| func (sched *Scheduler) bindStacks(assumedStacks []types.Stack) { | ||
| klog.Infof("assumedStacks: %v", assumedStacks) | ||
| klog.V(4).Infof("assumedStacks: %v", assumedStacks) | ||
| for _, newStack := range assumedStacks { | ||
| klog.Infof("newStack: %v", newStack) | ||
| klog.V(4).Infof("newStack: %v", newStack) | ||
| clusterName := newStack.Selected.ClusterName | ||
| sched.bindToSite(clusterName, &newStack) | ||
| } | ||
|
|
@@ -398,15 +427,15 @@ func (sched *Scheduler) setPodScheduleErr(reqStack *types.Stack) error { | |
| newStatus := v1.PodStatus{ | ||
| Phase: v1.PodNoSchedule, | ||
| } | ||
| klog.Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus) | ||
| klog.V(4).Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus) | ||
| _, _, err = statusutil.PatchPodStatus(sched.Client, reqStack.Tenant, reqStack.PodNamespace, reqStack.PodName, pod.Status, newStatus) | ||
| if err != nil { | ||
| klog.Warningf("PatchPodStatus for pod %q: %v", reqStack.PodName+"/"+reqStack.PodNamespace+"/"+ | ||
| reqStack.Tenant+"/"+reqStack.UID, err) | ||
| return err | ||
| } | ||
|
|
||
| klog.Infof("Update pod status from %v to %v success", pod.Status, newStatus) | ||
| klog.V(4).Infof("Update pod status from %v to %v success", pod.Status, newStatus) | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -423,36 +452,35 @@ func (sched *Scheduler) bindToSite(clusterName string, assumedStack *types.Stack | |
| Name: clusterName, | ||
| }, | ||
| } | ||
|
|
||
| klog.V(3).Infof("binding: %v", binding) | ||
| klog.V(4).Infof("binding: %v", binding) | ||
| // do api server update here | ||
| klog.Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) | ||
| klog.V(4).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) | ||
| err := sched.Client.CoreV1().PodsWithMultiTenancy(binding.Namespace, binding.Tenant).Bind(binding) | ||
| if err != nil { | ||
| klog.Errorf("Failed to bind stack: %v/%v/%v", assumedStack.Tenant, assumedStack.PodNamespace, | ||
| assumedStack.PodName) | ||
| if err := sched.SchedulerCache.ForgetStack(assumedStack); err != nil { | ||
| klog.Errorf("scheduler cache ForgetStack failed: %v", err) | ||
| } | ||
|
|
||
| return err | ||
| } | ||
| // | ||
jshaofuturewei marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil | ||
| } | ||
|
|
||
| func (sched *Scheduler) addCluster(object interface{}) { | ||
| resource := object.(*clusterv1.Cluster) | ||
| clusterCopy := resource.DeepCopy() | ||
| if sched.verifyClusterInfo(clusterCopy) == false { | ||
| klog.Infof(" Cluster data is not correct: %v", clusterCopy) | ||
| klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy) | ||
| } | ||
| key, err := controller.KeyFunc(object) | ||
| if err != nil { | ||
| utilruntime.HandleError(fmt.Errorf("couldn't get key for object: %v, error: %v", object, err)) | ||
| return | ||
| } | ||
| sched.Enqueue(key, EventType_Create) | ||
| klog.Infof("Enqueue Create cluster: %v", key) | ||
| klog.V(4).Infof("Enqueue Create cluster: %v", key) | ||
| } | ||
|
|
||
| func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { | ||
|
|
@@ -461,7 +489,7 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { | |
| oldClusterCopy := oldResource.DeepCopy() | ||
| newClusterCopy := newResource.DeepCopy() | ||
| if sched.verifyClusterInfo(newClusterCopy) { | ||
| klog.Infof(" Cluster data is not correct: %v", newResource) | ||
| klog.V(4).Infof(" Cluster data is not correct: %v", newResource) | ||
| } | ||
| key1, err1 := controller.KeyFunc(oldObject) | ||
| key2, err2 := controller.KeyFunc(newObject) | ||
|
|
@@ -478,13 +506,13 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { | |
| switch eventType { | ||
| case ClusterUpdateNo: | ||
| { | ||
| klog.Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name) | ||
| klog.V(4).Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name) | ||
| break | ||
| } | ||
| case ClusterUpdateYes: | ||
| { | ||
| sched.Enqueue(key2, EventType_Update) | ||
| klog.Infof("Enqueue Update Cluster: %v", key2) | ||
| klog.V(4).Infof("Enqueue Update Cluster: %v", key2) | ||
| break | ||
| } | ||
| default: | ||
|
|
@@ -499,7 +527,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) { | |
| resource := object.(*clusterv1.Cluster) | ||
| clusterCopy := resource.DeepCopy() | ||
| if sched.verifyClusterInfo(clusterCopy) == false { | ||
| klog.Infof(" Cluster data is not correct: %v", clusterCopy) | ||
| klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy) | ||
| return | ||
| } | ||
| key, err := controller.KeyFunc(object) | ||
|
|
@@ -510,7 +538,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) { | |
| sched.Enqueue(key, EventType_Delete) | ||
| siteID := clusterCopy.Spec.Region.Region + constants.SiteDelimiter + clusterCopy.Spec.Region.AvailabilityZone | ||
| sched.deletedClusters[key] = siteID | ||
| klog.Infof("Enqueue Delete Cluster: %v", key) | ||
| klog.V(4).Infof("Enqueue Delete Cluster: %v", key) | ||
| } | ||
|
|
||
| // Enqueue puts key of the cluster object in the work queue | ||
|
|
@@ -532,3 +560,96 @@ func (sched *Scheduler) verifyClusterInfo(cluster *clusterv1.Cluster) (verified | |
| verified = true | ||
| return verified | ||
| } | ||
|
|
||
| func (sched *Scheduler) verifyPodInfo(pod *v1.Pod) (verified bool) { | ||
| verified = false | ||
| name := pod.Name | ||
| if pod.Name == "" { | ||
| klog.Errorf("pod name:%s is empty", name) | ||
| return verified | ||
| } | ||
| verified = true | ||
| return verified | ||
| } | ||
|
|
||
| func (sched *Scheduler) addPodWithdrawResource(object interface{}) { | ||
| pod, ok := object.(*v1.Pod) | ||
| klog.V(4).Infof("Add a pod to withdraw resource: %v", pod.Name) | ||
| if !ok { | ||
| klog.Errorf("cannot convert to *v1.Pod: %v", object) | ||
| return | ||
| } | ||
| podCopy := pod.DeepCopy() | ||
| if sched.verifyPodInfo(podCopy) == false { | ||
| klog.V(4).Infof(" Pod data is not correct: %v", podCopy) | ||
| } | ||
| err := sched.withdrawResource(pod.Name) | ||
| if err != nil { | ||
| klog.Errorf("withdraw resource of pod %s failed", pod.Name) | ||
| } | ||
| } | ||
|
|
||
| func (sched *Scheduler) updatePodWithdrawResource(oldObj, newObj interface{}) { | ||
| oldPod, ok := oldObj.(*v1.Pod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) | ||
| return | ||
| } | ||
| newPod, ok := newObj.(*v1.Pod) | ||
| klog.V(4).Infof("Update a pod: %v", newPod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) | ||
| return | ||
| } | ||
| if oldPod.Name != newPod.Name { | ||
| klog.Errorf("old pod name and new pod name should be equal: %s, %s", oldPod.Name, newPod.Name) | ||
| return | ||
| } | ||
| err := sched.withdrawResource(newPod.Name) | ||
| if err != nil { | ||
| klog.Errorf("withdraw resource of pod %s failed", oldPod.Name) | ||
| } | ||
| } | ||
|
|
||
| func (sched *Scheduler) deletePodWithdrawResource(obj interface{}) { | ||
| var pod *v1.Pod | ||
| switch t := obj.(type) { | ||
| case *v1.Pod: | ||
| pod = t | ||
| klog.V(4).Infof("Delete a pod: %v", pod.Name) | ||
| case cache.DeletedFinalStateUnknown: | ||
| var ok bool | ||
| pod, ok = t.Obj.(*v1.Pod) | ||
| if !ok { | ||
| klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj) | ||
| return | ||
| } | ||
| default: | ||
| klog.Errorf("cannot convert to *v1.Pod: %v", t) | ||
| return | ||
| } | ||
| err := sched.withdrawResource(pod.Name) | ||
| if err != nil { | ||
| klog.Errorf("withdraw resource of pod %s failed", pod.Name) | ||
| } | ||
| } | ||
|
|
||
| //withdraw reserved resources to a pod & add it to cache to other pods | ||
| func (sched *Scheduler) withdrawResource(podName string) error { | ||
| resource := sched.PodSiteResourceMap[podName] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. map is not thread safe. When multiple events of one pod are triggered, how can we prevent synchronization issues here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is that a scheduled job will sync resources(cpu & mem) from openstack. Before informers get failed pods from apiserver, the scheduled job has already synchronized from openstack, which means the resource has already claimed back. In that case, is it possible that the failed pod adds resources back twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reply to my comments instead of editing them directly. The assumption is based on informers take fewer than 60 seconds to trigger the events. However, if at that time the resource map is not empty but the scheduled resource synchronization job happens to fetch resource information from openstack, the resources might be wrong. => resource map is empty, so the conflict won't happen. If this function creates too much issue, we should think again if we remove this function. It will be better to remove this 60 seconds gap. or resource collector should collect information in advance before waiting 60 seconds. 60 secnds is resource collector's issue mainly. Resource collection has to keep collecting information to remove this gap. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reply to my comments instead of editing them directly. For example, when you get failed pod event and want to withdraw resources, we assume to update resource based on resource collector does not update openstack resource information at that time. But if it does, we withdraw resource information twice. We can happen to get a failed pod event just after resource collector updates the most recent openstack resource information, right? Please correct me if I am wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reply to my comments instead of editing them directly. Resource collector used to be the only source we know the resource information (mem & cpu). If you introduce the current codes to update resources, please make them work with each other. Thanks => It will be better for you to check resource collector's requirement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reply to my comments instead of editing them directly. Can you tell us besides resource collector, are the pull request to reduce/withdraw resources to update the openstack resources? Thanks |
||
| if resource == nil { | ||
| klog.V(4).Infof("there is no preserved resource for pod: %s", podName) | ||
| return nil | ||
| } | ||
| allResInfo := resource.Resource | ||
| regionName := utils.GetRegionName(resource.SiteID) | ||
| regionFlavor, err := sched.siteCacheInfoSnapshot.GetRegionFlavors(regionName) | ||
| if err != nil { | ||
| klog.Errorf("there is no valid flavor for region: %s", regionName) | ||
| return err | ||
| } | ||
| siteCacheInfo := sched.siteCacheInfoSnapshot.SiteCacheInfoMap[resource.SiteID] | ||
| siteCacheInfo.UpdateSiteResInfo(allResInfo, regionFlavor, false) | ||
| delete(sched.PodSiteResourceMap, podName) | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.