Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions pkg/adaptation/adaptation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/containerd/ttrpc"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"

"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -67,6 +69,7 @@ type Adaptation struct {
serverOpts []ttrpc.ServerOpt
listener net.Listener
plugins []*plugin
validators []*plugin
syncLock sync.RWMutex
wasmService *api.PluginPlugin
}
Expand Down Expand Up @@ -248,8 +251,22 @@ func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRe
defer r.Unlock()
defer r.removeClosedPlugins()

result := collectCreateContainerResult(req)
var (
result = collectCreateContainerResult(req)
validate *ValidateContainerAdjustmentRequest
)

if r.hasValidators() {
validate = &ValidateContainerAdjustmentRequest{
Pod: req.Pod,
Container: proto.Clone(req.Container).(*Container),
}
}

for _, plugin := range r.plugins {
if validate != nil {
validate.AddPlugin(plugin.base, plugin.idx)
}
rpl, err := plugin.createContainer(ctx, req)
if err != nil {
return nil, err
Expand All @@ -260,7 +277,7 @@ func (r *Adaptation) CreateContainer(ctx context.Context, req *CreateContainerRe
}
}

return result.createContainerResponse(), nil
return r.validateContainerAdjustment(ctx, validate, result)
}

// PostCreateContainer relays the corresponding CRI event to plugins.
Expand Down Expand Up @@ -363,6 +380,40 @@ func (r *Adaptation) updateContainers(ctx context.Context, req []*ContainerUpdat
return r.updateFn(ctx, req)
}

// Validate requested container adjustments.
func (r *Adaptation) validateContainerAdjustment(ctx context.Context, req *ValidateContainerAdjustmentRequest, result *result) (*CreateContainerResponse, error) {
rpl := result.createContainerResponse()

if req == nil || len(r.validators) == 0 {
return rpl, nil
}

req.AddResponse(rpl)
req.AddOwners(result.owners)

errors := make(chan error, len(r.validators))
wg := sync.WaitGroup{}

for _, p := range r.validators {
wg.Add(1)
go func(p *plugin) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note for the future that we might want to revisit this slightly and think about whether we want to limit the concurrency a bit; might become an issue if there are many validator plugins. No need to change as part of this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any places where we might need to update comments to explain that validation happens in parallel? (Just wondering if customers might try creating plugins with low plugin_idx and wonder why they don't receive priority)

@chrishenzie Yes, I think it will be a good idea to emphasize in the documentation that during the validation phase there is no 'plugin ordering by index' as validating plugins are run in parallel.

But regarding users wondering "why they don't get priority", I think they should not. Any presumed ordering/priority would be relevant only if it was possible to force validation both to fail and to succeed. But our semantics is deliberately such that the latter is not possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true and users of the api may want ordering regardless of our scheme... E.g. I will only approve after ___ approval, if my check is also valid. Allocate claim foo before claim foo'...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically one would have a set operating in parallel at a level where the higher levels must finish first? built-in, set-a.. set b with a dependency in set-a..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. I will only approve after ___ approval, if my check is also valid

The validations are a logical AND, so if any fails the container is rejected. I don't think you can model "I will only approve after $foo approval" since if $foo rejects the container is rejected anyway, and if $foo succeeds then you'd also succeed your validation?

defer wg.Done()
errors <- p.ValidateContainerAdjustment(ctx, req)
}(p)
}

wg.Wait()
close(errors)

for err := range errors {
if err != nil {
return nil, err
}
}

return rpl, nil
}

// Start up pre-installed plugins.
func (r *Adaptation) startPlugins() (retErr error) {
var plugins []*plugin
Expand Down Expand Up @@ -439,12 +490,15 @@ func (r *Adaptation) stopPlugins() {
}

func (r *Adaptation) removeClosedPlugins() {
var active, closed []*plugin
var active, closed, validators []*plugin
for _, p := range r.plugins {
if p.isClosed() {
closed = append(closed, p)
} else {
active = append(active, p)
if p.isContainerAdjustmentValidator() {
validators = append(validators, p)
}
}
}

Expand All @@ -455,7 +509,9 @@ func (r *Adaptation) removeClosedPlugins() {
}
}()
}

r.plugins = active
r.validators = validators
}

func (r *Adaptation) startListener() error {
Expand Down Expand Up @@ -519,6 +575,9 @@ func (r *Adaptation) acceptPluginConnections(l net.Listener) error {
} else {
r.Lock()
r.plugins = append(r.plugins, p)
if p.isContainerAdjustmentValidator() {
r.validators = append(r.validators, p)
}
r.sortPlugins()
r.Unlock()
log.Infof(ctx, "plugin %q connected and synchronized", p.name())
Expand Down Expand Up @@ -588,12 +647,25 @@ func (r *Adaptation) sortPlugins() {
sort.Slice(r.plugins, func(i, j int) bool {
return r.plugins[i].idx < r.plugins[j].idx
})
sort.Slice(r.validators, func(i, j int) bool {
return r.validators[i].idx < r.validators[j].idx
})
if len(r.plugins) > 0 {
log.Infof(noCtx, "plugin invocation order")
for i, p := range r.plugins {
log.Infof(noCtx, " #%d: %q (%s)", i+1, p.name(), p.qualifiedName())
}
}
if len(r.validators) > 0 {
log.Infof(noCtx, "validator plugins")
for _, p := range r.validators {
log.Infof(noCtx, " %q (%s)", p.name(), p.qualifiedName())
}
}
}

func (r *Adaptation) hasValidators() bool {
return len(r.validators) > 0
}

func (r *Adaptation) requestPluginSync() {
Expand Down
94 changes: 94 additions & 0 deletions pkg/adaptation/adaptation_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package adaptation_test

import (
"context"
"fmt"
"os"
"path/filepath"
"slices"
Expand Down Expand Up @@ -921,6 +922,99 @@ var _ = Describe("Plugin container creation adjustments", func() {
)
})

When("there are validating plugins", func() {
BeforeEach(func() {
s.Prepare(
&mockRuntime{},
&mockPlugin{idx: "00", name: "foo"},
&mockPlugin{idx: "00", name: "validator"},
)
})

DescribeTable("validation result should be honored",
func(subject string, shouldFail bool, expected *api.ContainerAdjustment) {
var (
runtime = s.runtime
plugins = s.plugins
ctx = context.Background()

pod = &api.PodSandbox{
Id: "pod0",
Name: "pod0",
Uid: "uid0",
Namespace: "default",
}
ctr = &api.Container{
Id: "ctr0",
PodSandboxId: "pod0",
Name: "ctr0",
State: api.ContainerState_CONTAINER_CREATED, // XXX FIXME-kludge
Args: []string{
"echo",
"original",
"argument",
"list",
},
}

forbidden = "no-no"
)

create := func(p *mockPlugin, _ *api.PodSandbox, _ *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) {
plugin := p.idx + "-" + p.name
a := &api.ContainerAdjustment{}
switch subject {
case "annotation":
key := "key"
if shouldFail {
key = forbidden
}
a.AddAnnotation(key, plugin)
}

return a, nil, nil
}

validate := func(_ *mockPlugin, req *api.ValidateContainerAdjustmentRequest) error {
_, ok := req.Owners.AnnotationOwner(req.Container.Id, forbidden)
if ok {
return fmt.Errorf("forbidden annotation %q adjusted", forbidden)
}
return nil
}

plugins[0].createContainer = create
plugins[1].validateAdjustment = validate
s.Startup()

podReq := &api.RunPodSandboxRequest{Pod: pod}
Expect(runtime.RunPodSandbox(ctx, podReq)).To(Succeed())
ctrReq := &api.CreateContainerRequest{
Pod: pod,
Container: ctr,
}
reply, err := runtime.CreateContainer(ctx, ctrReq)
if shouldFail {
Expect(err).ToNot(BeNil())
} else {
Expect(err).To(BeNil())
Expect(protoEqual(reply.Adjust.Strip(), expected.Strip())).Should(BeTrue(),
protoDiff(reply.Adjust, expected))
}
},

Entry("adjust allowed annotation", "annotation", false,
&api.ContainerAdjustment{
Annotations: map[string]string{
"key": "00-foo",
},
},
),

Entry("adjust forbidden annotation", "annotation", true, nil),
)
})

})

// --------------------------------------------
Expand Down
35 changes: 20 additions & 15 deletions pkg/adaptation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type (
PostUpdateContainerRequest = api.PostUpdateContainerRequest
PostUpdateContainerResponse = api.PostUpdateContainerResponse

ValidateContainerAdjustmentRequest = api.ValidateContainerAdjustmentRequest
ValidateContainerAdjustmentResponse = api.ValidateContainerAdjustmentResponse
PluginInstance = api.PluginInstance

PodSandbox = api.PodSandbox
LinuxPodSandbox = api.LinuxPodSandbox
Container = api.Container
Expand Down Expand Up @@ -94,21 +98,22 @@ type (
// Aliased consts for api/api.proto.
// nolint
const (
Event_UNKNOWN = api.Event_UNKNOWN
Event_RUN_POD_SANDBOX = api.Event_RUN_POD_SANDBOX
Event_UPDATE_POD_SANDBOX = api.Event_UPDATE_POD_SANDBOX
Event_POST_UPDATE_POD_SANDBOX = api.Event_POST_UPDATE_POD_SANDBOX
Event_STOP_POD_SANDBOX = api.Event_STOP_POD_SANDBOX
Event_REMOVE_POD_SANDBOX = api.Event_REMOVE_POD_SANDBOX
Event_CREATE_CONTAINER = api.Event_CREATE_CONTAINER
Event_POST_CREATE_CONTAINER = api.Event_POST_CREATE_CONTAINER
Event_START_CONTAINER = api.Event_START_CONTAINER
Event_POST_START_CONTAINER = api.Event_POST_START_CONTAINER
Event_UPDATE_CONTAINER = api.Event_UPDATE_CONTAINER
Event_POST_UPDATE_CONTAINER = api.Event_POST_UPDATE_CONTAINER
Event_STOP_CONTAINER = api.Event_STOP_CONTAINER
Event_REMOVE_CONTAINER = api.Event_REMOVE_CONTAINER
ValidEvents = api.ValidEvents
Event_UNKNOWN = api.Event_UNKNOWN
Event_RUN_POD_SANDBOX = api.Event_RUN_POD_SANDBOX
Event_UPDATE_POD_SANDBOX = api.Event_UPDATE_POD_SANDBOX
Event_POST_UPDATE_POD_SANDBOX = api.Event_POST_UPDATE_POD_SANDBOX
Event_STOP_POD_SANDBOX = api.Event_STOP_POD_SANDBOX
Event_REMOVE_POD_SANDBOX = api.Event_REMOVE_POD_SANDBOX
Event_CREATE_CONTAINER = api.Event_CREATE_CONTAINER
Event_POST_CREATE_CONTAINER = api.Event_POST_CREATE_CONTAINER
Event_START_CONTAINER = api.Event_START_CONTAINER
Event_POST_START_CONTAINER = api.Event_POST_START_CONTAINER
Event_UPDATE_CONTAINER = api.Event_UPDATE_CONTAINER
Event_POST_UPDATE_CONTAINER = api.Event_POST_UPDATE_CONTAINER
Event_STOP_CONTAINER = api.Event_STOP_CONTAINER
Event_REMOVE_CONTAINER = api.Event_REMOVE_CONTAINER
Event_VALIDATE_CONTAINER_ADJUSTMENT = api.Event_VALIDATE_CONTAINER_ADJUSTMENT
ValidEvents = api.ValidEvents

ContainerState_CONTAINER_UNKNOWN = api.ContainerState_CONTAINER_UNKNOWN
ContainerState_CONTAINER_CREATED = api.ContainerState_CONTAINER_CREATED
Expand Down
25 changes: 25 additions & 0 deletions pkg/adaptation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ func (p *plugin) isExternal() bool {
return p.cmd == nil
}

// Check if the plugin is a container adjustment validator.
func (p *plugin) isContainerAdjustmentValidator() bool {
return p.events.IsSet(Event_VALIDATE_CONTAINER_ADJUSTMENT)
}

// 'connect' a plugin, setting up multiplexing on its socket.
func (p *plugin) connect(conn stdnet.Conn) (retErr error) {
mux := multiplex.Multiplex(conn, multiplex.WithBlockedRead())
Expand Down Expand Up @@ -675,6 +680,26 @@ func (p *plugin) StateChange(ctx context.Context, evt *StateChangeEvent) (err er
return nil
}

func (p *plugin) ValidateContainerAdjustment(ctx context.Context, req *ValidateContainerAdjustmentRequest) error {
if !p.events.IsSet(Event_VALIDATE_CONTAINER_ADJUSTMENT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we make use of isContainerAdjustmentValidator() above since we're doing the same thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I'd prefer to keep, in that regard, the boilerplate to forward the request to the plugin identical to the other boilerplates. The is*-checker is used in a different context, where it is more readable to hide the test behind a function with a self-explanatory name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a strong feeling against having both, then I think I'd rather get rid of isContainerAdjustmentValidator() and write it out in the other context.

return nil
}

ctx, cancel := context.WithTimeout(ctx, getPluginRequestTimeout())
defer cancel()

rpl, err := p.impl.ValidateContainerAdjustment(ctx, req)
if err != nil {
if isFatalError(err) {
log.Errorf(ctx, "closing plugin %s, failed to validate request: %v", p.name(), err)
p.close()
}
return fmt.Errorf("validator plugin %s failed: %v", p.name(), err)
}

return rpl.ValidationResult(p.name())
}

// isFatalError returns true if the error is fatal and the plugin connection should be closed.
func isFatalError(err error) bool {
switch {
Expand Down
7 changes: 7 additions & 0 deletions pkg/adaptation/plugin_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,10 @@ func (p *pluginType) StateChange(ctx context.Context, req *StateChangeEvent) (er
}
return err
}

func (p *pluginType) ValidateContainerAdjustment(ctx context.Context, req *ValidateContainerAdjustmentRequest) (*ValidateContainerAdjustmentResponse, error) {
if p.wasmImpl != nil {
return p.wasmImpl.ValidateContainerAdjustment(ctx, req)
}
return p.ttrpcImpl.ValidateContainerAdjustment(ctx, req)
}
Loading
Loading