Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
dead945
UPSTREAM: 113151: Revert: Revert: Clean up CRD conversion code structure
sttts Sep 11, 2023
d157aea
README
sttts Sep 1, 2023
a0f3d68
UPSTREAM: <carry>: controlplane/apiserver: add miniaggregator
sttts Sep 27, 2023
7234f81
UPSTREAM: <carry>: clusterize controllers
sttts Sep 1, 2023
28ac217
UPSTREAM: <carry>: re-add GC worker locks
gman0 Feb 3, 2025
570c6ed
UPSTREAM: <carry>: Add kcp patchers
sttts Oct 8, 2023
c93e304
UPSTREAM: <carry>: re-generated pkg/controller/resourcequota/resource…
gman0 Jan 26, 2025
2016601
UPSTREAM: <carry>: re-generated pkg/controller/garbagecollector/garba…
gman0 Feb 11, 2025
f87a202
UPSTREAM: <carry>: storage: etcd cluster key computation
sttts Sep 27, 2023
aad4a42
UPSTREAM: <carry>: clusterize serviceaccounts
sttts Sep 1, 2023
86eb6a8
UPSTREAM: <carry>: registry/core/serviceaccount: do not crash without…
sttts Sep 27, 2023
0e41242
UPSTREAM: <carry>: kube-aggregator
sttts Sep 1, 2023
006648f
UPSTREAM: <carry>: INTERESTING: clusterize admission
sttts Sep 1, 2023
ba55bcf
UPSTREAM: <carry>: Clusterize ValidatingAdmissionPolicy admission plu…
embik Jul 24, 2024
aaa0e92
UPSTREAM: <carry>: clusterize storage hash
sttts Sep 27, 2023
f2ad525
UPSTREAM: <carry>: endpoints: add Cluster struct for ctx
sttts Sep 27, 2023
4ed5b98
UPSTREAM: <carry>: endpoints/patch: wire openapi for CRD strategic me…
sttts Sep 27, 2023
4739ef6
UPSTREAM: <carry>: storage/etcd3: clusterize
sttts Sep 27, 2023
ab335e9
UPSTREAM: <carry>: apiserver: partial wildcard metadata request accro…
sttts Sep 27, 2023
d868a62
UPSTREAM: <carry>: endpoints: set kcp.io/original-api-version on wild…
sttts Sep 27, 2023
7f28105
UPSTREAM: <carry>: watch(er/cache): clusterize
sttts Sep 27, 2023
e271562
UPSTREAM: <carry>: apiserver: split chain into pre and post authz
sttts Sep 27, 2023
4a2b5f9
UPSTREAM: <carry>: apiserver: clusterize listed paths
sttts Sep 27, 2023
b9669e3
UPSTREAM: <carry>: apiserver: clusterize OpenAPI v2
sttts Sep 27, 2023
b5939d8
UPSTREAM: <carry>: clusterize BuiltInAuthenticationOptions
sttts Sep 27, 2023
bf1a2a1
UPSTREAM: <carry>: storage: add UseResourceAsPrefixDefault for legacy…
sttts Sep 27, 2023
879e485
UPSTREAM: <carry>: apiextensions-apiserver
sttts Sep 27, 2023
0203da1
UPSTREAM: <carry>: cache-server: wire shard name into storage
sttts Sep 27, 2023
db825b7
UPSTREAM: <carry>: add client and informer hacks
sttts Sep 27, 2023
1159daa
UPSTREAM: <carry>: controlplane: wire informers and clients
sttts Sep 11, 2023
4c9c3df
UPSTREAM: <carry>: generic cleanup
mjudeikis Sep 3, 2024
ab917a9
UPSTREAM: <carry>: controlplane/apiserver: disable protobuf for loopback
sttts Sep 25, 2023
e142cc7
UPSTREAM: <carry>: pass system:admin clients and informers in generic…
embik Jun 13, 2024
a71d65e
UPSTREAM: <carry>: remove REST mapper from admission plugins
embik Jun 20, 2024
68b54d6
UPSTREAM: <carry>: provide supportedMediaTypes for custom resoure han…
embik Jun 21, 2024
50c9ea5
UPSTREAM: <carry>: prevent NPE if no authorization is set
embik Jun 21, 2024
2034abe
UPSTREAM: <carry>: wrap CRD group into packagePrefix for OpenAPIV3 bu…
embik Jul 12, 2024
8a207b8
UPSTREAM: <CARRY>: clusterize validatingadmissionpolicystatus controller
embik Jul 23, 2024
7a27a28
UPSTREAM: <carry>: apiserver cleaning
mjudeikis Sep 3, 2024
5c809ee
UPSTREAM: <squash>: remove syncer custom code from apiextensions-apis…
sttts Sep 18, 2024
fe0acf7
UPSTREAM: <carry>: include cluster name in authz SubjectAccessReview …
xrstf Nov 13, 2024
d57c237
UPSTREAM: <squash>: prevent conflict with pre-existing auth annotation
xrstf Nov 14, 2024
455d854
CARRY: split auth/authz chains even more
mjudeikis Dec 10, 2024
1a37a19
UPSTREAM: <carry>: authz: add scoping to default rule resolver
sttts Aug 23, 2024
b6ac17f
UPSTREAM: <carry>: authz: add warrants to default rule resolver
sttts Aug 23, 2024
dc03406
UPSTREAM: <squash>: authz: add warrants to default rule resolver: glo…
sttts Jan 24, 2025
b06d2a8
UPSTREAM: <squash>: authz: add warrants to default rule resolver: hel…
sttts Jan 24, 2025
1b0b146
UPSTREAM: <squash>: authz: add warrants to default rule resolver: glo…
sttts Jan 25, 2025
0ced631
UPSTREAM: <squash>: authz: add warrants to default rule resolver: for…
sttts Jan 25, 2025
bffed1d
UPSTREAM: <squash>: authz: add warrants to default rule resolver: sim…
sttts Jan 25, 2025
8fd95d6
UPSTREAM: <carry>: endpoints: wire in ctx to watchListTransformer
gman0 Jan 22, 2025
8befcaf
UPSTREAM: <carry>: Clusterize MutatingAdmissionPolicy admission plugi…
gman0 Jan 23, 2025
8fc9059
UPSTREAM: <carry>: don't use --prefers-protobuf with client-gen
gman0 Feb 10, 2025
03a3c1b
UPSTREAM: <drop>: Pinned kcp dependencies
gman0 Feb 12, 2025
204c308
UPSTREAM: <drop>: Updated vendor modules
gman0 Feb 12, 2025
c6ebc59
UPSTREAM: <drop>: Re-generated client code
gman0 Feb 12, 2025
89e1549
UPSTREAM 130180: Make disable lookups of SA related artifacts working
mjudeikis Feb 14, 2025
3b193fc
Merge pull request #163 from mjudeikis/mjudeikis/cherry.sa.lookup
mjudeikis Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (a customResourceStrategy) PrepareForCreate(ctx context.Context, obj runtim
}

accessor, _ := meta.Accessor(obj)
if _, found := accessor.GetAnnotations()[genericapirequest.AnnotationKey]; found {
if _, found := accessor.GetAnnotations()[genericapirequest.ShardAnnotationKey]; found {
// to avoid an additional UPDATE request (mismatch on the generation field) replicated objects have the generation field already set
return
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (a customResourceStrategy) PrepareForUpdate(ctx context.Context, obj, old r
if !apiequality.Semantic.DeepEqual(newCopyContent, oldCopyContent) {
oldAccessor, _ := meta.Accessor(oldCustomResourceObject)
newAccessor, _ := meta.Accessor(newCustomResourceObject)
if _, found := oldAccessor.GetAnnotations()[genericapirequest.AnnotationKey]; found {
if _, found := oldAccessor.GetAnnotations()[genericapirequest.ShardAnnotationKey]; found {
// the presence of the annotation indicates the object is from the cache server.
// since the objects from the cache should not be modified in any way, just return early.
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2022 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package request

import (
"context"
)

type shardKey int

const (
// shardKey is the context key for the request.
shardContextKey shardKey = iota

// ShardAnnotationKey is the name of the annotation key used to denote an object's shard name.
ShardAnnotationKey = "kcp.io/shard"
)

// Shard describes a shard
type Shard string

// Empty returns true if the name of the shard is empty.
func (s Shard) Empty() bool {
return s == ""
}

// Wildcard checks if the given shard name matches wildcard.
// If true the query applies to all shards.
func (s Shard) Wildcard() bool {
return s == "*"
}

// String casts Shard to string type
func (s Shard) String() string {
return string(s)
}

// WithShard returns a context that holds the given shard.
func WithShard(parent context.Context, shard Shard) context.Context {
return context.WithValue(parent, shardContextKey, shard)
}

// ShardFrom returns the value of the shard key in the context, or an empty value if there is no shard key.
func ShardFrom(ctx context.Context) Shard {
shard, ok := ctx.Value(shardContextKey).(Shard)
if !ok {
return ""
}
return shard
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,18 @@ const (
// to resource directories enforcing namespace rules.
func NoNamespaceKeyRootFunc(ctx context.Context, prefix string) string {
key := prefix
shard := genericapirequest.ShardFrom(ctx)
if shard.Wildcard() {
return key
}
cluster, err := genericapirequest.ValidClusterFrom(ctx)
if err != nil {
klog.Errorf("invalid context cluster value: %v", err)
return key
}
if !shard.Empty() {
key += "/" + shard.String()
}
if !cluster.Wildcard {
key += "/" + cluster.Name.String()
}
Expand Down Expand Up @@ -514,6 +521,12 @@ func (e *Store) create(ctx context.Context, obj runtime.Object, createValidation
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}

if _, found := objectMeta.GetAnnotations()[genericapirequest.ShardAnnotationKey]; found {
// Remove the shard annotation so it is not persisted
delete(objectMeta.GetAnnotations(), genericapirequest.ShardAnnotationKey)
}

// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
Expand Down Expand Up @@ -1081,6 +1094,13 @@ func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name
if err != nil {
return nil, err
}

// the following annotation key indicates that the request is from the cache server
// in that case we've decided not to require finalization, the object will be deleted immediately
if _, hasShardAnnotation := existingAccessor.GetAnnotations()[genericapirequest.ShardAnnotationKey]; hasShardAnnotation {
return existing, nil
}

needsUpdate, newFinalizers := deletionFinalizersForGarbageCollection(ctx, e, existingAccessor, options)
if needsUpdate {
existingAccessor.SetFinalizers(newFinalizers)
Expand Down
11 changes: 11 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/registry/rest/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
)

// WipeObjectMetaSystemFields erases fields that are managed by the system on ObjectMeta.
func WipeObjectMetaSystemFields(meta metav1.Object) {
if _, found := meta.GetAnnotations()[genericapirequest.ShardAnnotationKey]; found {
// Do not wipe system fields if we are storing a cached object
return
}
meta.SetCreationTimestamp(metav1.Time{})
meta.SetUID("")
meta.SetDeletionTimestamp(nil)
Expand All @@ -34,6 +39,12 @@ func WipeObjectMetaSystemFields(meta metav1.Object) {

// FillObjectMetaSystemFields populates fields that are managed by the system on ObjectMeta.
func FillObjectMetaSystemFields(meta metav1.Object) {
if _, found := meta.GetAnnotations()[genericapirequest.ShardAnnotationKey]; found {
// In general the shard annotation is not attached to objects. Instead, it is assigned by the storage layer on the fly.
// To avoid an additional UPDATE request (mismatch on the creationTime and UID fields) replicated objects have those fields already set.
// Thus all we have to do is to simply return early.
return
}
meta.SetCreationTimestamp(metav1.Now())
meta.SetUID(uuid.NewUUID())
}
Expand Down
7 changes: 6 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/registry/rest/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ func BeforeUpdate(strategy RESTUpdateStrategy, ctx context.Context, obj, old run
if err != nil {
return err
}
objectMeta.SetGeneration(oldMeta.GetGeneration())
if len(oldMeta.GetAnnotations()[genericapirequest.ShardAnnotationKey]) == 0 || objectMeta.GetGeneration() == 0 {
// the absence of the annotation indicates the object is NOT from the cache server,
// if the new object doesn't have its generation set, just rewrite it from the old object
// otherwise we are dealing with an object from the cache server that wants its generation to be updated
objectMeta.SetGeneration(oldMeta.GetGeneration())
}

strategy.PrepareForUpdate(ctx, obj, old)

Expand Down
56 changes: 30 additions & 26 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return storage.NewInternalError(err)
}

err = decode(s.codec, s.versioner, data, out, getResp.KV.ModRevision, clusterName)
shardName := endpointsrequest.ShardFrom(ctx)
err = decode(s.codec, s.versioner, data, out, getResp.KV.ModRevision, clusterName, shardName)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
Expand Down Expand Up @@ -306,7 +307,8 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}

if out != nil {
err = decode(s.codec, s.versioner, data, out, txnResp.Revision, clusterName)
shardName := endpointsrequest.ShardFrom(ctx)
err = decode(s.codec, s.versioner, data, out, txnResp.Revision, clusterName, shardName)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
Expand Down Expand Up @@ -344,7 +346,9 @@ func (s *store) conditionalDelete(
if err != nil {
klog.Errorf("No cluster defined in conditionalDelete action for key %s : %s", key, err.Error())
}
getCurrentState := s.getCurrentState(ctx, key, v, false, skipTransformDecode, clusterName)
shardName := endpointsrequest.ShardFrom(ctx)

getCurrentState := s.getCurrentState(ctx, key, v, false, skipTransformDecode, clusterName, shardName)

var origState *objState
var origStateIsCurrent bool
Expand Down Expand Up @@ -422,7 +426,7 @@ func (s *store) conditionalDelete(
}
if !txnResp.Succeeded {
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode, clusterName)
origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode, clusterName, shardName)
if err != nil {
return err
}
Expand All @@ -431,14 +435,7 @@ func (s *store) conditionalDelete(
}

if !skipTransformDecode {
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
}
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision, clusterName)
err = decode(s.codec, s.versioner, origState.data, out, txnResp.Revision, clusterName, shardName)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we remember why this is different from upstream?
@sttts maybe you recall?

Copy link

@gman0 gman0 Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstream's conditionalDelete was the same in v1.31.0 too: https://github.com/kubernetes/kubernetes/blob/v1.31.0/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L349-L385

Now, instead of explicitly doing Txn-If-Then-Else-Commit, there is a dedicated OptimisticDelete, and so deleteResp is not used anymore. Is this what you meant, @mjudeikis ?

if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
Expand All @@ -456,6 +453,8 @@ func (s *store) GuaranteedUpdate(
if err != nil {
klog.Errorf("No cluster defined in GuaranteedUpdate action for key %s : %s", key, err.Error())
}
shardName := endpointsrequest.ShardFrom(ctx)

preparedKey, err := s.prepareKey(key)
if err != nil {
return err
Expand All @@ -473,7 +472,7 @@ func (s *store) GuaranteedUpdate(
}

skipTransformDecode := false
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound, skipTransformDecode, clusterName)
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound, skipTransformDecode, clusterName, shardName)

var origState *objState
var origStateIsCurrent bool
Expand Down Expand Up @@ -559,7 +558,7 @@ func (s *store) GuaranteedUpdate(
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev, clusterName)
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev, clusterName, shardName)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
Expand Down Expand Up @@ -599,7 +598,7 @@ func (s *store) GuaranteedUpdate(
span.AddEvent("Transaction committed")
if !txnResp.Succeeded {
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode, clusterName)
origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode, clusterName, shardName)
if err != nil {
return err
}
Expand All @@ -608,7 +607,7 @@ func (s *store) GuaranteedUpdate(
continue
}

err = decode(s.codec, s.versioner, data, destination, txnResp.Revision, clusterName)
err = decode(s.codec, s.versioner, data, destination, txnResp.Revision, clusterName, shardName)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
Expand Down Expand Up @@ -757,7 +756,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return storage.NewInternalError(fmt.Errorf("unable to get cluster for list key %q: %v", keyPrefix, err))
}
shard := endpointsrequest.ShardFrom(ctx)
crdIndicator := kcp.CustomResourceIndicatorFrom(ctx)
// end kcp

// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
Expand Down Expand Up @@ -835,8 +836,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
default:
}

clusterName := adjustClusterNameIfWildcard(cluster, keyPrefix, string(kv.Key))
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc, clusterName)
// kcp
clusterName := adjustClusterNameIfWildcard(shard, cluster, crdIndicator, keyPrefix, string(kv.Key))
shardName := adjustShardNameIfWildcard(shard, keyPrefix, string(kv.Key))
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc, clusterName, shardName)
if err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
if done := aggregator.Aggregate(string(kv.Key), err); done {
Expand Down Expand Up @@ -967,15 +970,15 @@ func (s *store) watchContext(ctx context.Context) context.Context {
return clientv3.WithRequireLeader(ctx)
}

func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool, clusterName logicalcluster.Name) func() (*objState, error) {
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool, clusterName logicalcluster.Name, shardName endpointsrequest.Shard) func() (*objState, error) {
return func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode, clusterName)
return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode, clusterName, shardName)
}
}

Expand All @@ -985,7 +988,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
// storage will be transformed and decoded.
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
// of the objState will be nil, and 'stale' will be set to true.
func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool, clusterName logicalcluster.Name) (*objState, error) {
func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool, clusterName logicalcluster.Name, shardName endpointsrequest.Shard) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},
}
Expand Down Expand Up @@ -1021,7 +1024,7 @@ func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v

state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev, clusterName); err != nil {
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev, clusterName, shardName); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err
}
Expand Down Expand Up @@ -1117,7 +1120,7 @@ func (s *store) prepareKey(key string) (string, error) {

// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64, clusterName logicalcluster.Name) error {
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64, clusterName logicalcluster.Name, shardName endpointsrequest.Shard) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
Expand All @@ -1131,13 +1134,13 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
}

// kcp: apply clusterName to the decoded object, as the name is not persisted in storage.
annotateDecodedObjectWith(objPtr, clusterName)
annotateDecodedObjectWith(objPtr, clusterName, shardName)

return nil
}

// decodeListItem decodes bytes value in array into object.
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object, clusterName logicalcluster.Name, shardName endpointsrequest.Shard) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
Expand All @@ -1152,7 +1155,8 @@ func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.
klog.Errorf("failed to update object version: %v", err)
}

annotateDecodedObjectWith(obj, clusterName)
// kcp: apply clusterName and shardName to the decoded object, as the name is not persisted in storage.
annotateDecodedObjectWith(obj, clusterName, shardName)

return obj, nil
}
Expand Down
8 changes: 6 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (

// adjustClusterNameIfWildcard determines the logical cluster name. If this is not a cluster-wildcard list/watch request,
// the cluster name is returned unmodified. Otherwise, the cluster name is extracted from the key based on whether it is
// - a shard-wildcard request: <prefix>/shardName/clusterName/<remainder>
// - CR partial metadata request: <prefix>/identity/clusterName/<remainder>
// - any other request: <prefix>/clusterName/<remainder>.
func adjustClusterNameIfWildcard(cluster *genericapirequest.Cluster, crdRequest bool, keyPrefix, key string) logicalcluster.Name {
func adjustClusterNameIfWildcard(shard genericapirequest.Shard, cluster *genericapirequest.Cluster, crdRequest bool, keyPrefix, key string) logicalcluster.Name {
if !cluster.Wildcard {
return cluster.Name
}
Expand All @@ -51,6 +52,9 @@ func adjustClusterNameIfWildcard(cluster *genericapirequest.Cluster, crdRequest
// expecting 2699f4d273d342adccdc8a32663408226ecf66de7d191113ed3d4dc9bccec2f2/root:org:ws/<remainder>
// OR customresources/root:org:ws/<remainder>
return extract(3, 1)
case shard.Wildcard():
// expecting shardName/clusterName/<remainder>
return extract(3, 1)
default:
// expecting root:org:ws/<remainder>
return extract(2, 0)
Expand Down Expand Up @@ -101,7 +105,7 @@ func annotateDecodedObjectWith(obj interface{}, clusterName logicalcluster.Name,
}
annotations[logicalcluster.AnnotationKey] = clusterName.String()
if !shardName.Empty() {
annotations[genericapirequest.AnnotationKey] = shardName.String()
annotations[genericapirequest.ShardAnnotationKey] = shardName.String()
}
s.SetAnnotations(annotations)
}
Expand Down
Loading