Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions k8s/negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,42 @@ func (*KindNegotiatedSerializer) DecoderToVersion(d runtime.Decoder, _ runtime.G
return d
}

// DeferredNegotiatedSerializer deserializes only the metadata of the resource, relying on a downstream processor to complete the unmarshaling.
// It uses *resource.PartialObject and *resource.TypedList[*resource.PartialObject] for its decoding types.
//
// DeferredNegotiatedSerializer is _Experimental_ and may be removed in a future release
type DeferredNegotiatedSerializer struct{}

// SupportedMediaTypes returns the JSON supported media type with a GenericJSONDecoder and kubernetes JSON Framer.
func (*DeferredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
serializer := &CodecDecoder{
SampleObject: &resource.PartialObject{},
SampleList: &resource.TypedList[*resource.PartialObject]{},
Codec: resource.NewPassthroughJSONCodec(),
Decoder: json.Unmarshal,
}

return []runtime.SerializerInfo{{
MediaType: string(resource.KindEncodingJSON),
Serializer: serializer,
StreamSerializer: &runtime.StreamSerializerInfo{
Serializer: serializer,
Framer: jsonserializer.Framer,
},
}}
}

// EncoderForVersion returns the `serializer` input
func (*DeferredNegotiatedSerializer) EncoderForVersion(serializer runtime.Encoder,
_ runtime.GroupVersioner) runtime.Encoder {
return serializer
}

// DecoderToVersion returns a GenericJSONDecoder
func (*DeferredNegotiatedSerializer) DecoderToVersion(d runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder {
return d
}

// CodecDecoder implements runtime.Serializer and works with Untyped* objects to implement runtime.Object
type CodecDecoder struct {
SampleObject resource.Object
Expand Down
170 changes: 170 additions & 0 deletions resource/partialobject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package resource

import (
"encoding/json"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

var _ Object = &PartialObject{}

// PartialObject implements resource.Object but only actually contains metadata information, and the raw payload that was used for unmarshaling.
// This is useful in accelerating the unmarshal process that is done serially with a NegotiatedSerializer in kubernetes watch requests,
// but does consume more memory as the entire original payload is embedded to avoid needing to copy or attempt to understand the non-metadata fields.
//
// PartialObject is _Experimental_ and may be removed in a future release
type PartialObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
Raw []byte `json:"-"`
}

type metadataOnlyObject struct {
*metav1.TypeMeta `json:",inline"`
*metav1.ObjectMeta `json:"metadata"`
}

func (p *PartialObject) UnmarshalJSON(b []byte) error {
md := metadataOnlyObject{}
if err := json.Unmarshal(b, &md); err != nil {
return err
}
p.TypeMeta = *md.TypeMeta
p.ObjectMeta = *md.ObjectMeta
Comment on lines +37 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be nil, couldn't they?

p.Raw = b
return nil
}

func (p *PartialObject) GetRaw() []byte {
return p.Raw
}

func (p *PartialObject) DeepCopyObject() runtime.Object {
return p.Copy()
}

func (*PartialObject) GetSpec() any {
return nil
}

func (*PartialObject) SetSpec(any) error {
return fmt.Errorf("spec cannot be set on a PartialObject")
}

func (*PartialObject) GetSubresources() map[string]any {
return map[string]any{}
}

func (*PartialObject) GetSubresource(string) (any, bool) {
return nil, false
}

func (*PartialObject) SetSubresource(string, any) error {
return fmt.Errorf("subresource cannot be set on a PartialObject")
}

func (p *PartialObject) GetStaticMetadata() StaticMetadata {
return StaticMetadata{
Name: p.ObjectMeta.Name,
Namespace: p.ObjectMeta.Namespace,
Group: p.GroupVersionKind().Group,
Version: p.GroupVersionKind().Version,
Kind: p.GroupVersionKind().Kind,
}
}

func (p *PartialObject) SetStaticMetadata(metadata StaticMetadata) {
p.Name = metadata.Name
p.Namespace = metadata.Namespace
p.SetGroupVersionKind(schema.GroupVersionKind{
Group: metadata.Group,
Version: metadata.Version,
Kind: metadata.Kind,
})
}

// GetCommonMetadata returns CommonMetadata for the object
//
//nolint:dupl
func (p *PartialObject) GetCommonMetadata() CommonMetadata {
var err error
dt := p.DeletionTimestamp
var deletionTimestamp *time.Time
if dt != nil {
deletionTimestamp = &dt.Time
}
updt := time.Time{}
createdBy := ""
updatedBy := ""
if p.Annotations != nil {
strUpdt, ok := p.Annotations[AnnotationUpdateTimestamp]
if ok {
updt, err = time.Parse(time.RFC3339, strUpdt)
if err != nil { //nolint:staticcheck,revive
// HMMMM
}
}
createdBy = p.Annotations[AnnotationCreatedBy]
updatedBy = p.Annotations[AnnotationUpdatedBy]
}
return CommonMetadata{
UID: string(p.UID),
ResourceVersion: p.ResourceVersion,
Generation: p.Generation,
Labels: p.Labels,
CreationTimestamp: p.CreationTimestamp.Time,
DeletionTimestamp: deletionTimestamp,
Finalizers: p.Finalizers,
UpdateTimestamp: updt,
CreatedBy: createdBy,
UpdatedBy: updatedBy,
// TODO: populate ExtraFields in PartialObject?
}
}

// SetCommonMetadata sets CommonMetadata for the object
//
//nolint:dupl
func (p *PartialObject) SetCommonMetadata(metadata CommonMetadata) {
p.UID = types.UID(metadata.UID)
p.ResourceVersion = metadata.ResourceVersion
p.Generation = metadata.Generation
p.Labels = metadata.Labels
p.CreationTimestamp = metav1.NewTime(metadata.CreationTimestamp)
if metadata.DeletionTimestamp != nil {
dt := metav1.NewTime(*metadata.DeletionTimestamp)
p.DeletionTimestamp = &dt
} else {
p.DeletionTimestamp = nil
}
p.Finalizers = metadata.Finalizers
if p.Annotations == nil {
p.Annotations = make(map[string]string)
}
if !metadata.UpdateTimestamp.IsZero() {
p.Annotations[AnnotationUpdateTimestamp] = metadata.UpdateTimestamp.Format(time.RFC3339)
}
if metadata.CreatedBy != "" {
p.Annotations[AnnotationCreatedBy] = metadata.CreatedBy
}
if metadata.UpdatedBy != "" {
p.Annotations[AnnotationUpdatedBy] = metadata.UpdatedBy
}
}

func (p *PartialObject) Copy() Object {
cpy := PartialObject{}
cpy.TypeMeta = metav1.TypeMeta{
Kind: p.Kind,
APIVersion: p.APIVersion,
}
p.ObjectMeta.DeepCopyInto(&cpy.ObjectMeta)
cpy.Raw = make([]byte, len(p.Raw))
copy(cpy.Raw, p.Raw)
return &cpy
}
2 changes: 1 addition & 1 deletion resource/untypedobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (u *UntypedObject) SetStaticMetadata(metadata StaticMetadata) {
}

// GetCommonMetadata returns CommonMetadata for the object
// nolint:revive,staticcheck
// nolint:revive,staticcheck,dupl
func (u *UntypedObject) GetCommonMetadata() CommonMetadata {
var err error
dt := u.DeletionTimestamp
Expand Down
21 changes: 14 additions & 7 deletions simple/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ type App struct {

// AppConfig is the configuration used by App
type AppConfig struct {
Name string
KubeConfig rest.Config
InformerConfig AppInformerConfig
ManagedKinds []AppManagedKind
UnmanagedKinds []AppUnmanagedKind
Converters map[schema.GroupKind]Converter
Name string
KubeConfig rest.Config
// ClientGenerator is the ClientGenerator to use when constructing informers.
// It is optional and will default to k8s.NewClientRegistry(KubeConfig, k8s.DefaultClientConfig()) if not present.
ClientGenerator resource.ClientGenerator
InformerConfig AppInformerConfig
ManagedKinds []AppManagedKind
UnmanagedKinds []AppUnmanagedKind
Converters map[schema.GroupKind]Converter
// VersionedCustomRoutes is a map of version string => custom route handlers for
// custom routes attached at the version level rather than attached to a specific kind.
// Custom route paths for each version should not conflict with plural names of kinds for the version.
Expand Down Expand Up @@ -267,10 +270,14 @@ type AppVersionRouteHandlers map[AppVersionRoute]AppCustomRouteHandler
// AppConfig MUST contain a valid KubeConfig to be valid.
// Watcher/Reconciler error handling, retry, and dequeue logic can be managed with AppConfig.InformerConfig.
func NewApp(config AppConfig) (*App, error) {
clients := config.ClientGenerator
if clients == nil {
k8s.NewClientRegistry(config.KubeConfig, k8s.DefaultClientConfig())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
k8s.NewClientRegistry(config.KubeConfig, k8s.DefaultClientConfig())
clients = k8s.NewClientRegistry(config.KubeConfig, k8s.DefaultClientConfig())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, that this should go away after syncing with main, since #1023 was merged.

}
a := &App{
informerController: operator.NewInformerController(operator.DefaultInformerControllerConfig()),
runner: app.NewMultiRunner(),
clientGenerator: k8s.NewClientRegistry(config.KubeConfig, k8s.DefaultClientConfig()),
clientGenerator: clients,
kinds: make(map[string]AppManagedKind),
gvrToGVK: make(map[string]string),
internalKinds: make(map[string]resource.Kind),
Expand Down
Loading