diff --git a/Dockerfile b/Dockerfile index 3ba57d6c..7bf13904 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ ARG BASE_IMAGE ARG BUILD_IMAGE -ARG GORUNNER_VERSION=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-32-16 +ARG GORUNNER_VERSION=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-34-latest ARG ARCH # Build the controller binary FROM $BUILD_IMAGE AS builder diff --git a/config/controller/controller.yaml b/config/controller/controller.yaml index c4afcff5..0a0b13b8 100644 --- a/config/controller/controller.yaml +++ b/config/controller/controller.yaml @@ -35,6 +35,7 @@ spec: - --metrics-bind-address=:8443 - --introspect-bind-addr=:22775 - --vpc-id=VPC_ID + - --aws-region=AWS_REGION image: controller:latest name: controller resources: diff --git a/controllers/crds/cninode_controller.go b/controllers/crds/cninode_controller.go index 18d03978..1eded890 100644 --- a/controllers/crds/cninode_controller.go +++ b/controllers/crds/cninode_controller.go @@ -54,6 +54,23 @@ var ( Help: "The number of requests that failed when controller tried to recreate the CNINode", }, ) + cninodeOperationLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cninode_operation_latency", + Help: "The latency of CNINode operation", + Buckets: prometheus.DefBuckets, + }, []string{"operation"}) +) + +type CleanupTask struct { + cniNode *v1alpha1.CNINode + retryAfter time.Duration + hasRetried int +} + +const ( + cleanupTaskRetryFactor = 2 + cleanupTaskMaxRetry = 5 + initalRetryDelay = 20 ) func prometheusRegister() { @@ -61,7 +78,9 @@ func prometheusRegister() { metrics.Registry.MustRegister( recreateCNINodeCallCount, - recreateCNINodeErrCount) + recreateCNINodeErrCount, + cninodeOperationLatency, + ) prometheusRegistered = true } @@ -79,6 +98,7 @@ type CNINodeReconciler struct { finalizerManager k8s.FinalizerManager deletePool *semaphore.Weighted newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner + cleanupChan chan any } func NewCNINodeReconciler( @@ -106,6 +126,9 @@ func NewCNINodeReconciler( finalizerManager: finalizerManager, deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)), newResourceCleaner: newResourceCleaner, + // use 200% workers to high throughput + // TODO: tune this value based on UX + cleanupChan: make(chan any, maxConcurrentWorkers*2), } } @@ -134,10 +157,11 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if cniNode.GetDeletionTimestamp().IsZero() { cniNodeCopy := cniNode.DeepCopy() - shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node) - shouldPatch = controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) || shouldPatch + shouldPatchTags, err := r.ensureTagsAndLabels(cniNodeCopy, node) + shouldPatchFinalizer := controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) + createAt := time.Now() - if shouldPatch { + if shouldPatchTags || shouldPatchFinalizer { r.log.Info("patching CNINode to add fields Tags, Labels and finalizer", "cninode", cniNode.Name) if err := r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{})); err != nil { if apierrors.IsConflict(err) { @@ -146,29 +170,25 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } return ctrl.Result{}, err } + if shouldPatchTags { + cninodeOperationLatency.WithLabelValues("add_tag").Observe(time.Since(createAt).Seconds()) + } + if shouldPatchFinalizer { + cninodeOperationLatency.WithLabelValues("add_finalizer").Observe(time.Since(createAt).Seconds()) + } } return ctrl.Result{}, err } else { // CNINode is marked for deletion + startAt := time.Now() if !nodeFound { // node is also deleted, proceed with running the cleanup routine and remove the finalizer // run cleanup for Linux nodes only if val, ok := cniNode.ObjectMeta.Labels[config.NodeLabelOS]; ok && val == config.OSLinux { - r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name) - // run cleanup when node id is present - if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" { - if !r.deletePool.TryAcquire(1) { - r.log.Info("d, will requeue request") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - go func(nodeID string) { - defer r.deletePool.Release(1) - childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout) - defer cancel() - if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil { - r.log.Error(err, "failed to cleanup resources during node termination") - ec2API.NodeTerminationENICleanupFailure.Inc() - } - }(nodeID) + // add the CNINode to the cleanup channel to run the cleanup routine + r.cleanupChan <- CleanupTask{ + cniNode: cniNode, + retryAfter: initalRetryDelay * time.Millisecond, + hasRetried: 0, } } @@ -179,6 +199,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } return ctrl.Result{}, err } + cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds()) return ctrl.Result{}, nil } else { // node exists, do not run the cleanup routine(periodic cleanup routine will delete leaked ENIs), remove the finalizer, @@ -200,6 +221,8 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct r.log.Error(err, "failed to remove finalizer on CNINode, will retry") return ctrl.Result{}, err } + cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds()) + // wait till CNINode is deleted before recreation as the new object will be created with same name to avoid "object already exists" error if err := r.waitTillCNINodeDeleted(client.ObjectKeyFromObject(newCNINode)); err != nil { // raise event if CNINode was not deleted after removing the finalizer @@ -219,6 +242,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // return nil as object is deleted and we cannot recreate the object now return ctrl.Result{}, nil } + cninodeOperationLatency.WithLabelValues("re_create").Observe(time.Since(startAt).Seconds()) r.log.Info("successfully recreated CNINode", "cniNode", newCNINode.Name) } } @@ -230,12 +254,58 @@ func (r *CNINodeReconciler) SetupWithManager(mgr ctrl.Manager, maxNodeConcurrent if !prometheusRegistered { prometheusRegister() } + + // start a watching goroutine for taking cninode cleanup tasks + go r.watchCleanupTasks() + return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.CNINode{}). WithOptions(controller.Options{MaxConcurrentReconciles: maxNodeConcurrentReconciles}). Complete(r) } +func (r *CNINodeReconciler) watchCleanupTasks() { + for { + select { + case task := <-r.cleanupChan: + r.processCleanupTasks(r.context, task.(CleanupTask)) + case <-r.context.Done(): + r.log.Info("context cancelled and stop cninodes cleanup task") + return + } + } +} + +func (r *CNINodeReconciler) processCleanupTasks(ctx context.Context, task CleanupTask) { + log := r.log.WithValues("cniNode", task.cniNode.Name) + log.Info("running the finalizer routine on cniNode", "cniNode", task.cniNode.Name) + // run cleanup when node id is present + if nodeID, ok := task.cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" { + if !r.deletePool.TryAcquire(1) { + if task.hasRetried >= cleanupTaskMaxRetry { + log.Info("will not requeue request as max retries are already done") + return + } + log.Info("will requeue request after", "after", task.retryAfter) + time.Sleep(task.retryAfter) + task.retryAfter *= cleanupTaskRetryFactor + task.hasRetried += 1 + r.cleanupChan <- task + return + } + go func(nodeID string) { + defer r.deletePool.Release(1) + childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout) + defer cancel() + if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil { + log.Error(err, "failed to cleanup resources during node termination") + ec2API.NodeTerminationENICleanupFailure.Inc() + } + log.Info("successfully cleaned up resources during node termination", "nodeID", nodeID) + }(nodeID) + } +} + // waitTillCNINodeDeleted waits for CNINode to be deleted with timeout and returns error func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.NamespacedName) error { oldCNINode := &v1alpha1.CNINode{} diff --git a/controllers/crds/cninode_controller_test.go b/controllers/crds/cninode_controller_test.go index 3d54e32d..a1b2ead9 100644 --- a/controllers/crds/cninode_controller_test.go +++ b/controllers/crds/cninode_controller_test.go @@ -14,7 +14,6 @@ import ( "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -56,14 +55,7 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN _ = v1alpha1.AddToScheme(scheme) client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build() return &CNINodeMock{ - Reconciler: CNINodeReconciler{ - Client: client, - scheme: scheme, - log: zap.New(), - clusterName: mockClusterName, - vpcId: "vpc-000000000000", - deletePool: semaphore.NewWeighted(10), - }, + Reconciler: *NewCNINodeReconciler(client, scheme, context.Background(), zap.New(), nil, nil, mockClusterName, "vpc-000000000000", nil, 10, nil), } } @@ -210,6 +202,7 @@ func TestCNINodeReconcile(t *testing.T) { if tt.prepare != nil { tt.prepare(&f) } + go mock.Reconciler.watchCleanupTasks() res, err := mock.Reconciler.Reconcile(context.Background(), reconcileRequest) cniNode := &v1alpha1.CNINode{} diff --git a/go.mod b/go.mod index a6139c4d..b31abbc2 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/aws/amazon-vpc-resource-controller-k8s -go 1.24.1 - -toolchain go1.24.2 +go 1.24.6 require ( github.com/aws/amazon-vpc-cni-k8s v1.19.4 @@ -51,6 +49,7 @@ require ( github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/x448/float16 v0.8.4 // indirect + golang.org/x/sync v0.13.0 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect sigs.k8s.io/randfill v1.0.0 // indirect ) diff --git a/pkg/aws/ec2/api/wrapper.go b/pkg/aws/ec2/api/wrapper.go index 19169713..b5ab30c3 100644 --- a/pkg/aws/ec2/api/wrapper.go +++ b/pkg/aws/ec2/api/wrapper.go @@ -22,12 +22,14 @@ import ( vpc_rc_config "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils" + "github.com/samber/lo" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/version" smithymiddleware "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/arn" awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware" "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/aws-sdk-go-v2/config" @@ -441,7 +443,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta ec2Wrapper := &ec2Wrapper{log: log} - cfg, err := ec2Wrapper.getInstanceConfig() + cfg, err := ec2Wrapper.getInstanceConfig(region, lo.Must1(arn.Parse(roleARN)).AccountID) if err != nil { return nil, err } @@ -481,7 +483,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta return ec2Wrapper, nil } -func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) { +func (e *ec2Wrapper) getInstanceConfig(regionStr, accountID string) (*aws.Config, error) { // Create a new config cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithAPIOptions([]func(stack *smithymiddleware.Stack) error{ awsmiddleware.AddUserAgentKeyValue(AppName, version.GitVersion), @@ -491,17 +493,27 @@ func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) { } ec2Metadata := imds.NewFromConfig(cfg) - region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{}) - if err != nil { - return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err) + if regionStr == "" { + region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{}) + if err != nil { + return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err) + } + cfg.Region = region.Region + } else { + cfg.Region = regionStr } - cfg.Region = region.Region - instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{}) - if err != nil { - return nil, fmt.Errorf("failed to get the instance identity document %v", err) + + if accountID == "" { + instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{}) + if err != nil { + return nil, fmt.Errorf("failed to get the instance identity document %v", err) + } + // Set the Account ID + e.accountID = instanceIdentity.AccountID + } else { + e.accountID = accountID } - // Set the Account ID - e.accountID = instanceIdentity.AccountID + return &cfg, nil }