Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion api/v1beta1/schemareplication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,33 @@ type SchemaReplicationSpec struct {
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// Defines a Secret which contains credentials to be used for schema replication.
// The Secret must contain the keys `username` and `password` in its Data field, or operator will error.
// +kubebuilder:validation:Required
// Have to set either secretBackend.vault.secretPath or spec.upstreamSecret, but not both.
// +kubebuilder:validation:Optional
UpstreamSecret *corev1.LocalObjectReference `json:"upstreamSecret,omitempty"`
// endpoints should be one or multiple endpoints separated by ','.
// Must provide either spec.endpoints or endpoints in spec.upstreamSecret.
// When endpoints are provided in both spec.endpoints and spec.upstreamSecret, spec.endpoints takes
// precedence.
Endpoints string `json:"endpoints,omitempty"`
// Secret backend configuration for the RabbitmqCluster.
// Enables to fetch default user credentials and certificates from K8s external secret stores.
SecretBackend SecretBackend `json:"secretBackend,omitempty"`
}

// SecretBackend configures a single secret backend.
// Today, only Vault exists as supported secret backend.
// Future secret backends could be Secrets Store CSI Driver.
// If not configured, K8s Secrets will be used.
type SecretBackend struct {
Vault *VaultSpec `json:"vault,omitempty"`
}

type VaultSpec struct {
// Path in Vault to access a KV (Key-Value) secret with the fields username and password to be used for schema replication.
// For example "secret/data/rabbitmq/config".
// Optional; if not provided, username and password will come from spec.upstreamSecret.
// Have to set either secretBackend.vault.secretPath or spec.upstreamSecret, but not both.
SecretPath string `json:"secretPath,omitempty"`
}

// SchemaReplicationStatus defines the observed state of SchemaReplication
Expand Down
30 changes: 25 additions & 5 deletions api/v1beta1/schemareplication_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ func (s *SchemaReplication) SetupWebhookWithManager(mgr ctrl.Manager) error {

var _ webhook.Validator = &SchemaReplication{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type.
// either secretBackend.vault.secretPath or upstreamSecret must be provided but not both.
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both.
func (s *SchemaReplication) ValidateCreate() error {
if err := s.validateSecret(); err != nil {
return err
}
return s.Spec.RabbitmqClusterReference.ValidateOnCreate(s.GroupResource(), s.Name)
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type.
// either secretBackend.vault.secretPath or upstreamSecret must be provided but not both.
func (s *SchemaReplication) ValidateUpdate(old runtime.Object) error {
oldReplication, ok := old.(*SchemaReplication)
if !ok {
Expand All @@ -36,10 +41,25 @@ func (s *SchemaReplication) ValidateUpdate(old runtime.Object) error {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), "update on rabbitmqClusterReference is forbidden"))
}
return nil
return s.validateSecret()
}

// no validation on delete
// ValidateDelete no validation on delete
func (s *SchemaReplication) ValidateDelete() error {
return nil
}

func (s *SchemaReplication) validateSecret() error {
if s.Spec.UpstreamSecret != nil && s.Spec.UpstreamSecret.Name != "" && s.Spec.SecretBackend.Vault != nil && s.Spec.SecretBackend.Vault.SecretPath != "" {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
field.Forbidden(field.NewPath("spec"),
"do not provide both secretBackend.vault.secretPath and upstreamSecret"))
}

if (s.Spec.UpstreamSecret == nil || s.Spec.UpstreamSecret.Name == "") && (s.Spec.SecretBackend.Vault == nil || s.Spec.SecretBackend.Vault.SecretPath == "") {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
field.Forbidden(field.NewPath("spec"),
"must provide either secretBackend.vault.secretPath or upstreamSecret"))
}
return nil
}
57 changes: 53 additions & 4 deletions api/v1beta1/schemareplication_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ var _ = Describe("schema-replication webhook", func() {
notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = nil
Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue())
})

It("does not allow both spec.upstreamSecret and spec.secretBackend.vault.userPath be configured", func() {
notAllowed := replication.DeepCopy()
notAllowed.Spec.SecretBackend = SecretBackend{
Vault: &VaultSpec{
SecretPath: "not-good",
},
}
Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue())
})

It("spec.upstreamSecret and spec.secretBackend.vault.userPath cannot both be not configured", func() {
notAllowed := replication.DeepCopy()
notAllowed.Spec.SecretBackend = SecretBackend{
Vault: &VaultSpec{},
}
notAllowed.Spec.UpstreamSecret.Name = ""
Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue())
})
})

Context("ValidateUpdate", func() {
Expand All @@ -48,7 +67,37 @@ var _ = Describe("schema-replication webhook", func() {
Expect(apierrors.IsForbidden(updated.ValidateUpdate(&replication))).To(BeTrue())
})

It("does not allow updates on rabbitmqClusterReference.connectionSecret", func() {
It("does not allow both spec.upstreamSecret and spec.secretBackend.vault.userPath be configured", func() {
updated := replication.DeepCopy()
updated.Spec.SecretBackend = SecretBackend{
Vault: &VaultSpec{
SecretPath: "not-good",
},
}
Expect(apierrors.IsForbidden(updated.ValidateUpdate(&replication))).To(BeTrue())
})

It("spec.upstreamSecret and spec.secretBackend.vault.userPath cannot both be not configured", func() {
updated := replication.DeepCopy()
updated.Spec.SecretBackend = SecretBackend{
Vault: &VaultSpec{},
}
updated.Spec.UpstreamSecret.Name = ""
Expect(apierrors.IsForbidden(updated.ValidateUpdate(&replication))).To(BeTrue())
})

It("allows update on spec.secretBackend.vault.userPath", func() {
updated := replication.DeepCopy()
updated.Spec.SecretBackend = SecretBackend{
Vault: &VaultSpec{
SecretPath: "a-new-path",
},
}
updated.Spec.UpstreamSecret.Name = ""
Expect(updated.ValidateUpdate(&replication)).To(Succeed())
})

It("allows updates on rabbitmqClusterReference.connectionSecret", func() {
connectionScr := SchemaReplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-replication",
Expand All @@ -65,9 +114,9 @@ var _ = Describe("schema-replication webhook", func() {
},
},
}
new := connectionScr.DeepCopy()
new.Spec.RabbitmqClusterReference.ConnectionSecret.Name = "new-secret"
Expect(apierrors.IsForbidden(new.ValidateUpdate(&connectionScr))).To(BeTrue())
newObj := connectionScr.DeepCopy()
newObj.Spec.RabbitmqClusterReference.ConnectionSecret.Name = "newObj-secret"
Expect(apierrors.IsForbidden(newObj.ValidateUpdate(&connectionScr))).To(BeTrue())
})

It("allows updates on spec.upstreamSecret", func() {
Expand Down
36 changes: 36 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion config/crd/bases/rabbitmq.com_schemareplications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,29 @@ spec:
Defaults to the namespace of the requested resource if omitted.
type: string
type: object
secretBackend:
description: Secret backend configuration for the RabbitmqCluster.
Enables to fetch default user credentials and certificates from
K8s external secret stores.
properties:
vault:
properties:
secretPath:
description: Path in Vault to access a KV (Key-Value) secret
with the fields username and password to be used for schema
replication. For example "secret/data/rabbitmq/config".
Optional; if not provided, username and password will come
from spec.upstreamSecret. Have to set either secretBackend.vault.secretPath
or spec.upstreamSecret, but not both.
type: string
type: object
type: object
upstreamSecret:
description: Defines a Secret which contains credentials to be used
for schema replication. The Secret must contain the keys `username`
and `password` in its Data field, or operator will error.
and `password` in its Data field, or operator will error. Have to
set either secretBackend.vault.secretPath or spec.upstreamSecret,
but not both.
properties:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
Expand Down
40 changes: 27 additions & 13 deletions controllers/schemareplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
)

const schemaReplicationParameterName = "schema_definition_sync_upstream"
const SchemaReplicationParameterName = "schema_definition_sync_upstream"

// SchemaReplicationReconciler reconciles a SchemaReplication object
type SchemaReplicationReconciler struct {
Expand Down Expand Up @@ -113,14 +113,14 @@ func (r *SchemaReplicationReconciler) setSchemaReplicationUpstream(ctx context.C
return err
}

if err := validateResponse(client.PutGlobalParameter(schemaReplicationParameterName, endpoints)); err != nil {
msg := fmt.Sprintf("failed to set '%s' global parameter", schemaReplicationParameterName)
if err := validateResponse(client.PutGlobalParameter(SchemaReplicationParameterName, endpoints)); err != nil {
msg := fmt.Sprintf("failed to set '%s' global parameter", SchemaReplicationParameterName)
r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedUpdate", msg)
logger.Error(err, msg, "upstream secret", replication.Spec.UpstreamSecret)
return err
}

msg := fmt.Sprintf("successfully set '%s' global parameter", schemaReplicationParameterName)
msg := fmt.Sprintf("successfully set '%s' global parameter", SchemaReplicationParameterName)
logger.Info(msg)
r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
return nil
Expand All @@ -129,29 +129,43 @@ func (r *SchemaReplicationReconciler) setSchemaReplicationUpstream(ctx context.C
func (r *SchemaReplicationReconciler) deleteSchemaReplicationParameters(ctx context.Context, client rabbitmqclient.Client, replication *topology.SchemaReplication) error {
logger := ctrl.LoggerFrom(ctx)

err := validateResponseForDeletion(client.DeleteGlobalParameter(schemaReplicationParameterName))
err := validateResponseForDeletion(client.DeleteGlobalParameter(SchemaReplicationParameterName))
if errors.Is(err, NotFound) {
logger.Info("cannot find global parameter; no need to delete it", "parameter", schemaReplicationParameterName)
logger.Info("cannot find global parameter; no need to delete it", "parameter", SchemaReplicationParameterName)
} else if err != nil {
msg := fmt.Sprintf("failed to delete global parameter '%s'", schemaReplicationParameterName)
msg := fmt.Sprintf("failed to delete global parameter '%s'", SchemaReplicationParameterName)
r.Recorder.Event(replication, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}

msg := fmt.Sprintf("successfully delete '%s' global parameter", schemaReplicationParameterName)
msg := fmt.Sprintf("successfully delete '%s' global parameter", SchemaReplicationParameterName)
logger.Info(msg)
r.Recorder.Event(replication, corev1.EventTypeNormal, "SuccessfulDelete", msg)
return removeFinalizer(ctx, r.Client, replication)
}

func (r *SchemaReplicationReconciler) getUpstreamEndpoints(ctx context.Context, replication *topology.SchemaReplication) (internal.UpstreamEndpoints, error) {
if replication.Spec.UpstreamSecret == nil {
return internal.UpstreamEndpoints{}, fmt.Errorf("no upstream secret provided")
}
secret := &corev1.Secret{}
if err := r.Get(ctx, types.NamespacedName{Name: replication.Spec.UpstreamSecret.Name, Namespace: replication.Namespace}, secret); err != nil {
return internal.UpstreamEndpoints{}, err
if replication.Spec.SecretBackend.Vault != nil && replication.Spec.SecretBackend.Vault.SecretPath != "" {
secretStoreClient, err := rabbitmqclient.SecretStoreClientProvider()
if err != nil {
return internal.UpstreamEndpoints{}, fmt.Errorf("unable to create a vault client connection to secret store: %w", err)
}

user, pass, err := secretStoreClient.ReadCredentials(replication.Spec.SecretBackend.Vault.SecretPath)
if err != nil {
return internal.UpstreamEndpoints{}, fmt.Errorf("unable to retrieve credentials from secret store: %w", err)
}
secret.Data = make(map[string][]byte)
secret.Data["username"] = []byte(user)
secret.Data["password"] = []byte(pass)
} else if replication.Spec.UpstreamSecret == nil {
return internal.UpstreamEndpoints{}, fmt.Errorf("no upstream secret or secretBackend provided")
} else {
if err := r.Get(ctx, types.NamespacedName{Name: replication.Spec.UpstreamSecret.Name, Namespace: replication.Namespace}, secret); err != nil {
return internal.UpstreamEndpoints{}, err
}
}

endpoints, err := internal.GenerateSchemaReplicationParameters(secret, replication.Spec.Endpoints)
Expand Down
Loading