Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG BASE_IMAGE
ARG BUILD_IMAGE
ARG GORUNNER_VERSION=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-32-16
ARG GORUNNER_VERSION=public.ecr.aws/eks-distro/kubernetes/go-runner:v0.18.0-eks-1-34-latest
ARG ARCH
# Build the controller binary
FROM $BUILD_IMAGE AS builder
Expand Down
1 change: 1 addition & 0 deletions config/controller/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
- --metrics-bind-address=:8443
- --introspect-bind-addr=:22775
- --vpc-id=VPC_ID
- --aws-region=AWS_REGION
image: controller:latest
name: controller
resources:
Expand Down
110 changes: 90 additions & 20 deletions controllers/crds/cninode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,33 @@ var (
Help: "The number of requests that failed when controller tried to recreate the CNINode",
},
)
cninodeOperationLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "cninode_operation_latency",
Help: "The latency of CNINode operation",
Buckets: prometheus.DefBuckets,
}, []string{"operation"})
)

type CleanupTask struct {
cniNode *v1alpha1.CNINode
retryAfter time.Duration
hasRetried int
}

const (
cleanupTaskRetryFactor = 2
cleanupTaskMaxRetry = 5
initalRetryDelay = 20
)

func prometheusRegister() {
prometheusRegistered = true

metrics.Registry.MustRegister(
recreateCNINodeCallCount,
recreateCNINodeErrCount)
recreateCNINodeErrCount,
cninodeOperationLatency,
)

prometheusRegistered = true
}
Expand All @@ -79,6 +98,7 @@ type CNINodeReconciler struct {
finalizerManager k8s.FinalizerManager
deletePool *semaphore.Weighted
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
cleanupChan chan any
}

func NewCNINodeReconciler(
Expand Down Expand Up @@ -106,6 +126,9 @@ func NewCNINodeReconciler(
finalizerManager: finalizerManager,
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
newResourceCleaner: newResourceCleaner,
// use 200% workers to high throughput
// TODO: tune this value based on UX
cleanupChan: make(chan any, maxConcurrentWorkers*2),
}
}

Expand Down Expand Up @@ -134,10 +157,11 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

if cniNode.GetDeletionTimestamp().IsZero() {
cniNodeCopy := cniNode.DeepCopy()
shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node)
shouldPatch = controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) || shouldPatch
shouldPatchTags, err := r.ensureTagsAndLabels(cniNodeCopy, node)
shouldPatchFinalizer := controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer)
createAt := time.Now()

if shouldPatch {
if shouldPatchTags || shouldPatchFinalizer {
r.log.Info("patching CNINode to add fields Tags, Labels and finalizer", "cninode", cniNode.Name)
if err := r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{})); err != nil {
if apierrors.IsConflict(err) {
Expand All @@ -146,29 +170,25 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
return ctrl.Result{}, err
}
if shouldPatchTags {
cninodeOperationLatency.WithLabelValues("add_tag").Observe(time.Since(createAt).Seconds())
}
if shouldPatchFinalizer {
cninodeOperationLatency.WithLabelValues("add_finalizer").Observe(time.Since(createAt).Seconds())
}
}
return ctrl.Result{}, err
} else { // CNINode is marked for deletion
startAt := time.Now()
if !nodeFound {
// node is also deleted, proceed with running the cleanup routine and remove the finalizer
// run cleanup for Linux nodes only
if val, ok := cniNode.ObjectMeta.Labels[config.NodeLabelOS]; ok && val == config.OSLinux {
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
// run cleanup when node id is present
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
if !r.deletePool.TryAcquire(1) {
r.log.Info("d, will requeue request")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
go func(nodeID string) {
defer r.deletePool.Release(1)
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
defer cancel()
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
r.log.Error(err, "failed to cleanup resources during node termination")
ec2API.NodeTerminationENICleanupFailure.Inc()
}
}(nodeID)
// add the CNINode to the cleanup channel to run the cleanup routine
r.cleanupChan <- CleanupTask{
cniNode: cniNode,
retryAfter: initalRetryDelay * time.Millisecond,
hasRetried: 0,
}
}

Expand All @@ -179,6 +199,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
return ctrl.Result{}, err
}
cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds())
return ctrl.Result{}, nil
} else {
// node exists, do not run the cleanup routine(periodic cleanup routine will delete leaked ENIs), remove the finalizer,
Expand All @@ -200,6 +221,8 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
r.log.Error(err, "failed to remove finalizer on CNINode, will retry")
return ctrl.Result{}, err
}
cninodeOperationLatency.WithLabelValues("remove_finalizer").Observe(time.Since(startAt).Seconds())

// wait till CNINode is deleted before recreation as the new object will be created with same name to avoid "object already exists" error
if err := r.waitTillCNINodeDeleted(client.ObjectKeyFromObject(newCNINode)); err != nil {
// raise event if CNINode was not deleted after removing the finalizer
Expand All @@ -219,6 +242,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// return nil as object is deleted and we cannot recreate the object now
return ctrl.Result{}, nil
}
cninodeOperationLatency.WithLabelValues("re_create").Observe(time.Since(startAt).Seconds())
r.log.Info("successfully recreated CNINode", "cniNode", newCNINode.Name)
}
}
Expand All @@ -230,12 +254,58 @@ func (r *CNINodeReconciler) SetupWithManager(mgr ctrl.Manager, maxNodeConcurrent
if !prometheusRegistered {
prometheusRegister()
}

// start a watching goroutine for taking cninode cleanup tasks
go r.watchCleanupTasks()

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.CNINode{}).
WithOptions(controller.Options{MaxConcurrentReconciles: maxNodeConcurrentReconciles}).
Complete(r)
}

func (r *CNINodeReconciler) watchCleanupTasks() {
for {
select {
case task := <-r.cleanupChan:
r.processCleanupTasks(r.context, task.(CleanupTask))
case <-r.context.Done():
r.log.Info("context cancelled and stop cninodes cleanup task")
return
}
}
}

func (r *CNINodeReconciler) processCleanupTasks(ctx context.Context, task CleanupTask) {
log := r.log.WithValues("cniNode", task.cniNode.Name)
log.Info("running the finalizer routine on cniNode", "cniNode", task.cniNode.Name)
// run cleanup when node id is present
if nodeID, ok := task.cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
if !r.deletePool.TryAcquire(1) {
if task.hasRetried >= cleanupTaskMaxRetry {
log.Info("will not requeue request as max retries are already done")
return
}
log.Info("will requeue request after", "after", task.retryAfter)
time.Sleep(task.retryAfter)
task.retryAfter *= cleanupTaskRetryFactor
task.hasRetried += 1
r.cleanupChan <- task
return
}
go func(nodeID string) {
defer r.deletePool.Release(1)
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
defer cancel()
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
log.Error(err, "failed to cleanup resources during node termination")
ec2API.NodeTerminationENICleanupFailure.Inc()
}
log.Info("successfully cleaned up resources during node termination", "nodeID", nodeID)
}(nodeID)
}
}

// waitTillCNINodeDeleted waits for CNINode to be deleted with timeout and returns error
func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.NamespacedName) error {
oldCNINode := &v1alpha1.CNINode{}
Expand Down
11 changes: 2 additions & 9 deletions controllers/crds/cninode_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/semaphore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -56,14 +55,7 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
_ = v1alpha1.AddToScheme(scheme)
client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build()
return &CNINodeMock{
Reconciler: CNINodeReconciler{
Client: client,
scheme: scheme,
log: zap.New(),
clusterName: mockClusterName,
vpcId: "vpc-000000000000",
deletePool: semaphore.NewWeighted(10),
},
Reconciler: *NewCNINodeReconciler(client, scheme, context.Background(), zap.New(), nil, nil, mockClusterName, "vpc-000000000000", nil, 10, nil),
}
}

Expand Down Expand Up @@ -210,6 +202,7 @@ func TestCNINodeReconcile(t *testing.T) {
if tt.prepare != nil {
tt.prepare(&f)
}
go mock.Reconciler.watchCleanupTasks()
res, err := mock.Reconciler.Reconcile(context.Background(), reconcileRequest)

cniNode := &v1alpha1.CNINode{}
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/aws/amazon-vpc-resource-controller-k8s

go 1.24.1

toolchain go1.24.2
go 1.24.6

require (
github.com/aws/amazon-vpc-cni-k8s v1.19.4
Expand Down Expand Up @@ -51,6 +49,7 @@ require (
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/sync v0.13.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
)
Expand Down
34 changes: 23 additions & 11 deletions pkg/aws/ec2/api/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (

vpc_rc_config "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
"github.com/samber/lo"

"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/version"
smithymiddleware "github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/arn"
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
Expand Down Expand Up @@ -441,7 +443,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta

ec2Wrapper := &ec2Wrapper{log: log}

cfg, err := ec2Wrapper.getInstanceConfig()
cfg, err := ec2Wrapper.getInstanceConfig(region, lo.Must1(arn.Parse(roleARN)).AccountID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -481,7 +483,7 @@ func NewEC2Wrapper(roleARN, clusterName, region string, instanceClientQPS, insta
return ec2Wrapper, nil
}

func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) {
func (e *ec2Wrapper) getInstanceConfig(regionStr, accountID string) (*aws.Config, error) {
// Create a new config
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithAPIOptions([]func(stack *smithymiddleware.Stack) error{
awsmiddleware.AddUserAgentKeyValue(AppName, version.GitVersion),
Expand All @@ -491,17 +493,27 @@ func (e *ec2Wrapper) getInstanceConfig() (*aws.Config, error) {
}

ec2Metadata := imds.NewFromConfig(cfg)
region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{})
if err != nil {
return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err)
if regionStr == "" {
region, err := ec2Metadata.GetRegion(context.TODO(), &imds.GetRegionInput{})
if err != nil {
return nil, fmt.Errorf("failed to find the region from ec2 metadata: %v", err)
}
cfg.Region = region.Region
} else {
cfg.Region = regionStr
}
cfg.Region = region.Region
instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
return nil, fmt.Errorf("failed to get the instance identity document %v", err)

if accountID == "" {
instanceIdentity, err := ec2Metadata.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
return nil, fmt.Errorf("failed to get the instance identity document %v", err)
}
// Set the Account ID
e.accountID = instanceIdentity.AccountID
} else {
e.accountID = accountID
}
// Set the Account ID
e.accountID = instanceIdentity.AccountID

return &cfg, nil
}

Expand Down
Loading