Skip to content

Commit 9c9b316

Browse files
committed
DynamicRESTMapper: add mapping for core k8s and kcp types
The mapper is now split into two controllers: * DynamicTypesController: manages dynamic, cluster-local types added through bindings or local CRDs * BuiltinTypesController: manages core k8s and kcp types, common to all clusters On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent d4dbbdb commit 9c9b316

File tree

9 files changed

+602
-146
lines changed

9 files changed

+602
-146
lines changed

pkg/reconciler/dynamicrestmapper/defaultrestmapper_mutable.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package dynamicrestmapper
1818

1919
import (
20+
"slices"
21+
2022
"k8s.io/apimachinery/pkg/api/meta"
2123
"k8s.io/apimachinery/pkg/runtime/schema"
2224
)
@@ -41,6 +43,24 @@ func (m *DefaultRESTMapper) add(typeMeta typeMeta) {
4143

4244
m.kindToPluralResource[kind] = plural
4345
m.kindToScope[kind] = meta.RESTScopeRoot
46+
47+
foundDefaultVersion := false
48+
for i := range m.defaultGroupVersions {
49+
if m.defaultGroupVersions[i].Group == typeMeta.Group {
50+
if typeMeta.Version > m.defaultGroupVersions[i].Version {
51+
m.defaultGroupVersions[i].Version = typeMeta.Version
52+
}
53+
foundDefaultVersion = true
54+
break
55+
}
56+
}
57+
58+
if !foundDefaultVersion {
59+
m.defaultGroupVersions = append(m.defaultGroupVersions, schema.GroupVersion{
60+
Group: typeMeta.Group,
61+
Version: typeMeta.Version,
62+
})
63+
}
4464
}
4565

4666
func (m *DefaultRESTMapper) remove(typeMeta typeMeta) {
@@ -56,6 +76,34 @@ func (m *DefaultRESTMapper) remove(typeMeta typeMeta) {
5676

5777
delete(m.kindToPluralResource, kind)
5878
delete(m.kindToScope, kind)
79+
80+
versionIdx := slices.IndexFunc(m.defaultGroupVersions, func(gv schema.GroupVersion) bool {
81+
return gv.Group == typeMeta.Group
82+
})
83+
if versionIdx < 0 {
84+
return
85+
}
86+
87+
// Fixup the default version too.
88+
// We don't know what the current latest version is, so we need to find it.
89+
90+
latestGroupVersion := ""
91+
for gvr := range m.pluralToSingular {
92+
if gvr.Group != typeMeta.Group {
93+
continue
94+
}
95+
if gvr.Version > latestGroupVersion {
96+
latestGroupVersion = gvr.Version
97+
}
98+
}
99+
100+
if latestGroupVersion != "" {
101+
m.defaultGroupVersions[versionIdx].Version = latestGroupVersion
102+
} else {
103+
// There are no more resources in this group.
104+
// Delete the default version too.
105+
m.defaultGroupVersions = slices.Delete(m.defaultGroupVersions, versionIdx, versionIdx+1)
106+
}
59107
}
60108

61109
func (m *DefaultRESTMapper) getGVKR(gvr schema.GroupVersionResource) typeMeta {

pkg/reconciler/dynamicrestmapper/dynamicrestmapper.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,39 @@ package dynamicrestmapper
1919
import (
2020
"sync"
2121

22-
"k8s.io/apimachinery/pkg/runtime/schema"
23-
2422
"github.com/kcp-dev/logicalcluster/v3"
2523
)
2624

2725
// DynamicRESTMapper is a thread-safe RESTMapper with per-cluster GVK/GVR mappings.
2826
// The mapping data is fed from its associated Controller (in this package), triggered
29-
// on CRDs and APIBindings (and their respective APIResourceSchemas). Additionally,
30-
// mappings for built-in types (pkg/virtual/apiexport/schemas/builtin/builtin.go) are
31-
// added for each LogicalCluster by default.
27+
// on CRDs and APIBindings (and their respective APIResourceSchemas).
3228
type DynamicRESTMapper struct {
33-
lock sync.RWMutex
34-
byCluster map[logicalcluster.Name]*DefaultRESTMapper
29+
lock sync.RWMutex
30+
// Built-in types consist of core k8s types and system CRDs.
31+
// They are present in all clusters.
32+
builtin *DefaultRESTMapper
33+
// Dynamic types consist of bound types and CRDs local to the cluster.
34+
dynamic map[logicalcluster.Name]*DefaultRESTMapper
3535
}
3636

37-
func NewDynamicRESTMapper(defaultGroupVersions []schema.GroupVersion) *DynamicRESTMapper {
37+
func NewDynamicRESTMapper() *DynamicRESTMapper {
3838
return &DynamicRESTMapper{
39-
byCluster: make(map[logicalcluster.Name]*DefaultRESTMapper),
39+
builtin: NewDefaultRESTMapper(nil),
40+
dynamic: make(map[logicalcluster.Name]*DefaultRESTMapper),
4041
}
4142
}
4243

4344
func (d *DynamicRESTMapper) deleteMappingsForCluster(clusterName logicalcluster.Name) {
4445
d.lock.Lock()
4546
defer d.lock.Unlock()
46-
delete(d.byCluster, clusterName)
47+
delete(d.dynamic, clusterName)
4748
}
4849

4950
// ForCluster returns a RESTMapper for the specified cluster name.
5051
// The method never returns nil. If the cluster doesn't exist at the time
5152
// of calling ForCluster, or if it is deleted while holding the returned
52-
// RESTMapper instance, all RESTMapper's methods will empty matches and
53-
// NoResourceMatchError error.
53+
// RESTMapper instance, all RESTMapper's methods will return empty matches
54+
// and a NoResourceMatchError error.
5455
func (d *DynamicRESTMapper) ForCluster(clusterName logicalcluster.Name) *ForCluster {
5556
return newForCluster(clusterName, d)
5657
}
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicrestmapper
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/go-logr/logr"
25+
26+
apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
27+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
28+
apierrors "k8s.io/apimachinery/pkg/api/errors"
29+
"k8s.io/apimachinery/pkg/api/meta"
30+
"k8s.io/apimachinery/pkg/runtime/schema"
31+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32+
"k8s.io/apimachinery/pkg/util/wait"
33+
"k8s.io/client-go/tools/cache"
34+
"k8s.io/client-go/util/workqueue"
35+
"k8s.io/klog/v2"
36+
37+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
38+
kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
39+
"github.com/kcp-dev/logicalcluster/v3"
40+
41+
"github.com/kcp-dev/kcp/pkg/logging"
42+
"github.com/kcp-dev/kcp/pkg/tombstone"
43+
builtinschemas "github.com/kcp-dev/kcp/pkg/virtual/apiexport/schemas/builtin"
44+
)
45+
46+
const (
47+
BuiltinTypesControllerName = "kcp-dynamicrestmapper-builtin"
48+
)
49+
50+
var systemCRDClusterName = logicalcluster.Name("system:system-crds")
51+
52+
type BuiltinTypesController struct {
53+
queue workqueue.TypedRateLimitingInterface[string]
54+
55+
state *DynamicRESTMapper
56+
groupVersions map[string]string
57+
58+
getCRD func(clusterName logicalcluster.Name, name string) (*apiextensionsv1.CustomResourceDefinition, error)
59+
}
60+
61+
func NewBuiltinTypesController(
62+
ctx context.Context,
63+
state *DynamicRESTMapper,
64+
crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer,
65+
) (*BuiltinTypesController, error) {
66+
c := &BuiltinTypesController{
67+
state: state,
68+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
69+
workqueue.DefaultTypedControllerRateLimiter[string](),
70+
workqueue.TypedRateLimitingQueueConfig[string]{
71+
Name: BuiltinTypesControllerName,
72+
},
73+
),
74+
groupVersions: make(map[string]string),
75+
getCRD: func(clusterName logicalcluster.Name, name string) (*apiextensionsv1.CustomResourceDefinition, error) {
76+
return crdInformer.Lister().Cluster(clusterName).Get(name)
77+
},
78+
}
79+
80+
// Populate the builtin RESTMapper with core types.
81+
// We assume those never change, so we add mapping for them only during initialization.
82+
83+
for i := range builtinschemas.BuiltInAPIs {
84+
group := builtinschemas.BuiltInAPIs[i].GroupVersion.Group
85+
version := builtinschemas.BuiltInAPIs[i].GroupVersion.Version
86+
if version > c.groupVersions[group] {
87+
c.groupVersions[group] = version
88+
}
89+
c.state.builtin.add(newTypeMeta(
90+
builtinschemas.BuiltInAPIs[i].GroupVersion.Group,
91+
builtinschemas.BuiltInAPIs[i].GroupVersion.Version,
92+
builtinschemas.BuiltInAPIs[i].Names.Kind,
93+
builtinschemas.BuiltInAPIs[i].Names.Singular,
94+
builtinschemas.BuiltInAPIs[i].Names.Plural,
95+
resourceScopeToRESTScope(builtinschemas.BuiltInAPIs[i].ResourceScope)),
96+
)
97+
}
98+
99+
logger := logging.WithReconciler(klog.Background(), BuiltinTypesControllerName)
100+
101+
// System CRDs could change over time, and so we build that part of the mapper dynamically.
102+
103+
// We are only interested in system CRDs.
104+
_, _ = crdInformer.Informer().Cluster("system:system-crds").AddEventHandler(cache.ResourceEventHandlerFuncs{
105+
AddFunc: func(obj interface{}) {
106+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](obj), logger)
107+
},
108+
UpdateFunc: func(oldObj, newObj interface{}) {
109+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](newObj), logger)
110+
},
111+
DeleteFunc: func(obj interface{}) {
112+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](obj), logger)
113+
},
114+
})
115+
116+
return c, nil
117+
}
118+
119+
func (c *BuiltinTypesController) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition, logger logr.Logger) {
120+
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
121+
// The CRD is not ready yet. Nothing to do, we'll get notified on the next update event.
122+
return
123+
}
124+
125+
// CRD name is enforced to be in format "<Resource>.<Group>", e.g. "cowboys.wildwest.dev".
126+
key, err := kcpcache.MetaClusterNamespaceKeyFunc(crd)
127+
if err != nil {
128+
utilruntime.HandleError(err)
129+
return
130+
}
131+
132+
logger.V(4).Info("queueing system CRD")
133+
c.queue.Add(key)
134+
}
135+
136+
func (c *BuiltinTypesController) Start(ctx context.Context, numThreads int) {
137+
defer utilruntime.HandleCrash()
138+
defer c.queue.ShutDown()
139+
140+
logger := logging.WithReconciler(klog.FromContext(ctx), BuiltinTypesControllerName)
141+
ctx = klog.NewContext(ctx, logger)
142+
logger.Info("Starting controller")
143+
defer logger.Info("Shutting down controller")
144+
145+
for range numThreads {
146+
go wait.UntilWithContext(ctx, c.startWorker, time.Second)
147+
}
148+
149+
<-ctx.Done()
150+
}
151+
152+
func (c *BuiltinTypesController) startWorker(ctx context.Context) {
153+
for c.processNextWorkItem(ctx) {
154+
}
155+
}
156+
157+
func (c *BuiltinTypesController) processNextWorkItem(ctx context.Context) bool {
158+
// Wait until there is a new item in the working queue
159+
key, quit := c.queue.Get()
160+
if quit {
161+
return false
162+
}
163+
164+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
165+
ctx = klog.NewContext(ctx, logger)
166+
logger.Info("processing key")
167+
168+
// No matter what, tell the queue we're done with this key, to unblock
169+
// other workers.
170+
defer c.queue.Done(key)
171+
172+
if err := c.process(ctx, key); err != nil {
173+
utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", DynamicTypesControllerName, key, err))
174+
c.queue.AddRateLimited(key)
175+
return true
176+
}
177+
c.queue.Forget(key)
178+
return true
179+
}
180+
181+
func (c *BuiltinTypesController) gatherGVKRsForCRD(crd *apiextensionsv1.CustomResourceDefinition) []typeMeta {
182+
if crd == nil {
183+
return nil
184+
}
185+
gvkrs := make([]typeMeta, 0, len(crd.Spec.Versions))
186+
for _, version := range crd.Spec.Versions {
187+
if !version.Served {
188+
continue
189+
}
190+
191+
gvkrs = append(gvkrs, newTypeMeta(
192+
crd.Spec.Group,
193+
version.Name,
194+
crd.Status.AcceptedNames.Kind,
195+
crd.Status.AcceptedNames.Singular,
196+
crd.Status.AcceptedNames.Plural,
197+
resourceScopeToRESTScope(crd.Spec.Scope),
198+
))
199+
}
200+
return gvkrs
201+
}
202+
203+
func (c *BuiltinTypesController) gatherGVKRsForMappedGroupResource(gr schema.GroupResource) ([]typeMeta, error) {
204+
gvkrs, err := c.state.builtin.getGVKRs(gr)
205+
if err != nil {
206+
if meta.IsNoMatchError(err) {
207+
return nil, nil
208+
}
209+
return nil, err
210+
}
211+
return gvkrs, nil
212+
}
213+
214+
func (c *BuiltinTypesController) process(ctx context.Context, key string) error {
215+
_, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
216+
if err != nil {
217+
return err
218+
}
219+
220+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
221+
222+
// CRD name is enforced to be in format "<Resource>.<Group>", e.g. "cowboys.wildwest.dev".
223+
gr := schema.ParseGroupResource(name)
224+
225+
crd, err := c.getCRD(systemCRDClusterName, name)
226+
if err != nil && !apierrors.IsNotFound(err) {
227+
return err
228+
}
229+
230+
// Remove and add the mapping -- this way we can refresh any existing mappings.
231+
typeMetaToRemove, err := c.gatherGVKRsForMappedGroupResource(gr)
232+
if err != nil {
233+
return err
234+
}
235+
typeMetaToAdd := c.gatherGVKRsForCRD(crd)
236+
237+
logger.V(4).Info("applying mappings")
238+
239+
c.state.lock.Lock()
240+
defer c.state.lock.Unlock()
241+
c.state.builtin.apply(typeMetaToRemove, typeMetaToAdd)
242+
return nil
243+
}

0 commit comments

Comments
 (0)