Skip to content

Commit c7eac4e

Browse files
committed
DynamicRESTMapper: add mapping for core k8s and kcp types
The mapper is now split into two controllers: * DynamicTypesController: manages dynamic, cluster-local types added through bindings or local CRDs * BuiltinTypesController: manages core k8s and kcp types, common to all clusters On-behalf-of: @SAP [email protected] Signed-off-by: Robert Vasek <[email protected]>
1 parent 0f97ee0 commit c7eac4e

File tree

9 files changed

+501
-142
lines changed

9 files changed

+501
-142
lines changed

pkg/reconciler/dynamicrestmapper/defaultrestmapper_mutable.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package dynamicrestmapper
1818

1919
import (
20+
"slices"
21+
2022
"k8s.io/apimachinery/pkg/api/meta"
2123
"k8s.io/apimachinery/pkg/runtime/schema"
2224
)
@@ -41,6 +43,24 @@ func (m *DefaultRESTMapper) add(typeMeta typeMeta) {
4143

4244
m.kindToPluralResource[kind] = plural
4345
m.kindToScope[kind] = meta.RESTScopeRoot
46+
47+
foundDefaultVersion := false
48+
for i := range m.defaultGroupVersions {
49+
if m.defaultGroupVersions[i].Group == typeMeta.Group {
50+
if typeMeta.Version > m.defaultGroupVersions[i].Version {
51+
m.defaultGroupVersions[i].Version = typeMeta.Version
52+
}
53+
foundDefaultVersion = true
54+
break
55+
}
56+
}
57+
58+
if !foundDefaultVersion {
59+
m.defaultGroupVersions = append(m.defaultGroupVersions, schema.GroupVersion{
60+
Group: typeMeta.Group,
61+
Version: typeMeta.Version,
62+
})
63+
}
4464
}
4565

4666
func (m *DefaultRESTMapper) remove(typeMeta typeMeta) {
@@ -56,6 +76,37 @@ func (m *DefaultRESTMapper) remove(typeMeta typeMeta) {
5676

5777
delete(m.kindToPluralResource, kind)
5878
delete(m.kindToScope, kind)
79+
80+
versionIdx := slices.IndexFunc(m.defaultGroupVersions, func(gv schema.GroupVersion) bool {
81+
return gv.Group == typeMeta.Group
82+
})
83+
if versionIdx < 0 {
84+
return
85+
}
86+
87+
// Fixup the default version too.
88+
89+
// First, try to find resources with an older version in this group.
90+
// In case there are any, we'll need to "downgrade", i.e. set the default
91+
// group version to that.
92+
93+
latestGroupVersion := ""
94+
for gvr := range m.pluralToSingular {
95+
if gvr.Group != typeMeta.Group {
96+
continue
97+
}
98+
if gvr.Version > latestGroupVersion {
99+
latestGroupVersion = gvr.Version
100+
}
101+
}
102+
103+
if latestGroupVersion != "" {
104+
m.defaultGroupVersions[versionIdx].Version = latestGroupVersion
105+
} else {
106+
// There are no more resources in this group.
107+
// Delete the default version too.
108+
m.defaultGroupVersions = slices.Delete(m.defaultGroupVersions, versionIdx, versionIdx+1)
109+
}
59110
}
60111

61112
func (m *DefaultRESTMapper) getGVKR(gvr schema.GroupVersionResource) typeMeta {

pkg/reconciler/dynamicrestmapper/dynamicrestmapper.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,39 @@ package dynamicrestmapper
1919
import (
2020
"sync"
2121

22-
"k8s.io/apimachinery/pkg/runtime/schema"
23-
2422
"github.com/kcp-dev/logicalcluster/v3"
2523
)
2624

2725
// DynamicRESTMapper is a thread-safe RESTMapper with per-cluster GVK/GVR mappings.
2826
// The mapping data is fed from its associated Controller (in this package), triggered
29-
// on CRDs and APIBindings (and their respective APIResourceSchemas). Additionally,
30-
// mappings for built-in types (pkg/virtual/apiexport/schemas/builtin/builtin.go) are
31-
// added for each LogicalCluster by default.
27+
// on CRDs and APIBindings (and their respective APIResourceSchemas).
3228
type DynamicRESTMapper struct {
33-
lock sync.RWMutex
34-
byCluster map[logicalcluster.Name]*DefaultRESTMapper
29+
lock sync.RWMutex
30+
// Built-in types consist of core k8s types and system CRDs.
31+
// They are present in all clusters.
32+
builtin *DefaultRESTMapper
33+
// Dynamic types consist of bound types and CRDs local to the cluster.
34+
dynamic map[logicalcluster.Name]*DefaultRESTMapper
3535
}
3636

37-
func NewDynamicRESTMapper(defaultGroupVersions []schema.GroupVersion) *DynamicRESTMapper {
37+
func NewDynamicRESTMapper() *DynamicRESTMapper {
3838
return &DynamicRESTMapper{
39-
byCluster: make(map[logicalcluster.Name]*DefaultRESTMapper),
39+
builtin: NewDefaultRESTMapper(nil),
40+
dynamic: make(map[logicalcluster.Name]*DefaultRESTMapper),
4041
}
4142
}
4243

4344
func (d *DynamicRESTMapper) deleteMappingsForCluster(clusterName logicalcluster.Name) {
4445
d.lock.Lock()
4546
defer d.lock.Unlock()
46-
delete(d.byCluster, clusterName)
47+
delete(d.dynamic, clusterName)
4748
}
4849

4950
// ForCluster returns a RESTMapper for the specified cluster name.
5051
// The method never returns nil. If the cluster doesn't exist at the time
5152
// of calling ForCluster, or if it is deleted while holding the returned
52-
// RESTMapper instance, all RESTMapper's methods will empty matches and
53-
// NoResourceMatchError error.
53+
// RESTMapper instance, all RESTMapper's methods will return empty matches
54+
// and a NoResourceMatchError error.
5455
func (d *DynamicRESTMapper) ForCluster(clusterName logicalcluster.Name) *ForCluster {
5556
return newForCluster(clusterName, d)
5657
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dynamicrestmapper
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
"time"
24+
25+
"github.com/go-logr/logr"
26+
27+
apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
28+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
29+
apierrors "k8s.io/apimachinery/pkg/api/errors"
30+
"k8s.io/apimachinery/pkg/api/meta"
31+
"k8s.io/apimachinery/pkg/runtime/schema"
32+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33+
"k8s.io/apimachinery/pkg/util/wait"
34+
"k8s.io/client-go/tools/cache"
35+
"k8s.io/client-go/util/workqueue"
36+
"k8s.io/klog/v2"
37+
38+
kcpapiextensionsv1informers "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
39+
"github.com/kcp-dev/logicalcluster/v3"
40+
41+
"github.com/kcp-dev/kcp/pkg/logging"
42+
"github.com/kcp-dev/kcp/pkg/tombstone"
43+
builtinschemas "github.com/kcp-dev/kcp/pkg/virtual/apiexport/schemas/builtin"
44+
)
45+
46+
const (
47+
BuiltinTypesControllerName = "kcp-dynamicrestmapper-builtin"
48+
)
49+
50+
var systemCRDClusterName = logicalcluster.Name("system:system-crds")
51+
52+
type BuiltinTypesController struct {
53+
queue workqueue.TypedRateLimitingInterface[string]
54+
55+
state *DynamicRESTMapper
56+
groupVersions map[string]string
57+
58+
getCRD func(clusterName logicalcluster.Name, name string) (*apiextensionsv1.CustomResourceDefinition, error)
59+
}
60+
61+
func NewBuiltinTypesController(
62+
ctx context.Context,
63+
state *DynamicRESTMapper,
64+
crdInformer kcpapiextensionsv1informers.CustomResourceDefinitionClusterInformer,
65+
) (*BuiltinTypesController, error) {
66+
c := &BuiltinTypesController{
67+
state: state,
68+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
69+
workqueue.DefaultTypedControllerRateLimiter[string](),
70+
workqueue.TypedRateLimitingQueueConfig[string]{
71+
Name: BuiltinTypesControllerName,
72+
},
73+
),
74+
groupVersions: make(map[string]string),
75+
getCRD: func(clusterName logicalcluster.Name, name string) (*apiextensionsv1.CustomResourceDefinition, error) {
76+
return crdInformer.Lister().Cluster(clusterName).Get(name)
77+
},
78+
}
79+
80+
// Populate the builtin RESTMapper with core types.
81+
// We assume those never change, so we add mapping for them only during initialization.
82+
83+
for i := range builtinschemas.BuiltInAPIs {
84+
group := builtinschemas.BuiltInAPIs[i].GroupVersion.Group
85+
version := builtinschemas.BuiltInAPIs[i].GroupVersion.Version
86+
if version > c.groupVersions[group] {
87+
c.groupVersions[group] = version
88+
}
89+
c.state.builtin.add(newTypeMeta(
90+
builtinschemas.BuiltInAPIs[i].GroupVersion.Group,
91+
builtinschemas.BuiltInAPIs[i].GroupVersion.Version,
92+
builtinschemas.BuiltInAPIs[i].Names.Kind,
93+
builtinschemas.BuiltInAPIs[i].Names.Singular,
94+
builtinschemas.BuiltInAPIs[i].Names.Plural,
95+
resourceScopeToRESTScope(builtinschemas.BuiltInAPIs[i].ResourceScope)),
96+
)
97+
}
98+
99+
logger := logging.WithReconciler(klog.Background(), BuiltinTypesControllerName)
100+
101+
// System CRDs could change over time, and so we build that part of the mapper dynamically.
102+
103+
// We are only interested in system CRDs.
104+
_, _ = crdInformer.Informer().Cluster("system:system-crds").AddEventHandler(cache.ResourceEventHandlerFuncs{
105+
AddFunc: func(obj interface{}) {
106+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](obj), logger)
107+
},
108+
UpdateFunc: func(oldObj, newObj interface{}) {
109+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](newObj), logger)
110+
},
111+
DeleteFunc: func(obj interface{}) {
112+
c.enqueueCRD(tombstone.Obj[*apiextensionsv1.CustomResourceDefinition](obj), logger)
113+
},
114+
})
115+
116+
return c, nil
117+
}
118+
119+
func (c *BuiltinTypesController) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition, logger logr.Logger) {
120+
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
121+
// The CRD is not ready yet. Nothing to do, we'll get notified on the next update event.
122+
return
123+
}
124+
125+
key := fmt.Sprintf("%s.%s/%s", crd.Spec.Group, crd.Spec.Names.Plural, crd.Name)
126+
127+
logger.V(4).Info("queueing system CRD")
128+
c.queue.Add(key)
129+
}
130+
131+
func (c *BuiltinTypesController) Start(ctx context.Context, numThreads int) {
132+
defer utilruntime.HandleCrash()
133+
defer c.queue.ShutDown()
134+
135+
logger := logging.WithReconciler(klog.FromContext(ctx), BuiltinTypesControllerName)
136+
ctx = klog.NewContext(ctx, logger)
137+
logger.Info("Starting controller")
138+
defer logger.Info("Shutting down controller")
139+
140+
for range numThreads {
141+
go wait.UntilWithContext(ctx, c.startWorker, time.Second)
142+
}
143+
144+
<-ctx.Done()
145+
}
146+
147+
func (c *BuiltinTypesController) startWorker(ctx context.Context) {
148+
for c.processNextWorkItem(ctx) {
149+
}
150+
}
151+
152+
func (c *BuiltinTypesController) processNextWorkItem(ctx context.Context) bool {
153+
// Wait until there is a new item in the working queue
154+
key, quit := c.queue.Get()
155+
if quit {
156+
return false
157+
}
158+
159+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
160+
ctx = klog.NewContext(ctx, logger)
161+
logger.Info("processing key")
162+
163+
// No matter what, tell the queue we're done with this key, to unblock
164+
// other workers.
165+
defer c.queue.Done(key)
166+
167+
if err := c.process(ctx, key); err != nil {
168+
utilruntime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", DynamicTypesControllerName, key, err))
169+
c.queue.AddRateLimited(key)
170+
return true
171+
}
172+
c.queue.Forget(key)
173+
return true
174+
}
175+
176+
func (c *BuiltinTypesController) gatherGVKRsForCRD(crd *apiextensionsv1.CustomResourceDefinition) []typeMeta {
177+
if crd == nil {
178+
return nil
179+
}
180+
gvkrs := make([]typeMeta, 0, len(crd.Spec.Versions))
181+
for _, version := range crd.Spec.Versions {
182+
if !version.Served {
183+
continue
184+
}
185+
186+
gvkrs = append(gvkrs, newTypeMeta(
187+
crd.Spec.Group,
188+
version.Name,
189+
crd.Status.AcceptedNames.Kind,
190+
crd.Status.AcceptedNames.Singular,
191+
crd.Status.AcceptedNames.Plural,
192+
resourceScopeToRESTScope(crd.Spec.Scope),
193+
))
194+
}
195+
return gvkrs
196+
}
197+
198+
func (c *BuiltinTypesController) gatherGVKRsForMappedGroupResource(gr schema.GroupResource) ([]typeMeta, error) {
199+
gvkrs, err := c.state.builtin.getGVKRs(gr)
200+
if err != nil {
201+
if meta.IsNoMatchError(err) {
202+
return nil, nil
203+
}
204+
return nil, err
205+
}
206+
return gvkrs, nil
207+
}
208+
209+
func (c *BuiltinTypesController) process(ctx context.Context, key string) error {
210+
logger := logging.WithQueueKey(klog.FromContext(ctx), key)
211+
212+
parts := strings.Split(key, "/")
213+
gr := schema.ParseGroupResource(parts[0])
214+
crdName := parts[1]
215+
216+
crd, err := c.getCRD(systemCRDClusterName, crdName)
217+
if err != nil && apierrors.IsNotFound(err) {
218+
return err
219+
}
220+
221+
// Remove and add the mapping -- this way we can refresh any existing mappings.
222+
223+
typeMetaToRemove, err := c.gatherGVKRsForMappedGroupResource(gr)
224+
if err != nil {
225+
return err
226+
}
227+
typeMetaToAdd := c.gatherGVKRsForCRD(crd)
228+
229+
logger.V(4).Info("applying mappings")
230+
231+
c.state.lock.Lock()
232+
defer c.state.lock.Unlock()
233+
c.state.builtin.apply(typeMetaToRemove, typeMetaToAdd)
234+
return nil
235+
}

0 commit comments

Comments
 (0)