Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ jobs:
kind create cluster --image kindest/node:"$K8S_VERSION"
make cert-manager
make cluster-operator
DOCKER_REGISTRY_SERVER=local-server OPERATOR_IMAGE=local-operator make deploy-kind
make system-tests
DOCKER_REGISTRY_SERVER=local-server OPERATOR_IMAGE=local-operator make deploy-kind BUILD_KIT=docker
make system-tests BUILD_KIT=docker
19 changes: 14 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ platform := $(shell uname | tr A-Z a-z)
list: ## list Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

### Tools
# Allows flexbility to use other build kits, like nerdctl
BUILD_KIT ?= /usr/local/bin/docker
Comment on lines +11 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!


define get_mod_code_generator
echo "Only go get & mod k8s.io/code-generator, but do not install it"
echo "⚠️ Keep it at the same version as captured in go.mod, otherwise we may end up with version inconsistencies"
Expand All @@ -30,6 +34,8 @@ export KUBEBUILDER_ASSETS = $(LOCAL_TESTBIN)/k8s/$(ENVTEST_K8S_VERSION)-$(platfo
$(KUBEBUILDER_ASSETS):
setup-envtest --os $(platform) --arch $(ARCHITECTURE) --bin-dir $(LOCAL_TESTBIN) use $(ENVTEST_K8S_VERSION)

### Targets

.PHONY: unit-tests
unit-tests: install-tools $(KUBEBUILDER_ASSETS) generate fmt vet manifests ## Run unit tests
ginkgo -r --randomizeAllSpecs api/ internal/
Expand All @@ -38,6 +44,9 @@ unit-tests: install-tools $(KUBEBUILDER_ASSETS) generate fmt vet manifests ## Ru
integration-tests: install-tools $(KUBEBUILDER_ASSETS) generate fmt vet manifests ## Run integration tests
ginkgo -r --randomizeAllSpecs controllers/

just-integration-tests: $(KUBEBUILDER_ASSETS) vet
ginkgo -randomizeAllSpecs -r controllers/

local-tests: unit-tests integration-tests ## Run all local tests (unit & integration)

system-tests: ## run end-to-end tests against Kubernetes cluster defined in ~/.kube/config. Expects cluster operator and messaging topology operator to be installed in the cluster
Expand Down Expand Up @@ -82,7 +91,7 @@ deploy-dev: check-env-docker-credentials docker-build-dev manifests deploy-rbac

# Load operator image and deploy operator into current KinD cluster
deploy-kind: manifests deploy-rbac
docker build --build-arg=GIT_COMMIT=$(GIT_COMMIT) -t $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT) .
$(BUILD_KIT) build --build-arg=GIT_COMMIT=$(GIT_COMMIT) -t $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT) .
kind load docker-image $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT)
kustomize build config/default/overlays/kind | sed 's@((operator_docker_image))@"$(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT)"@' | kubectl apply -f -

Expand Down Expand Up @@ -131,8 +140,8 @@ ifndef DOCKER_REGISTRY_SECRET
endif

docker-build-dev: check-env-docker-repo git-commit-sha
docker build --build-arg=GIT_COMMIT=$(GIT_COMMIT) -t $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT) .
docker push $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT)
$(BUILD_KIT) build --build-arg=GIT_COMMIT=$(GIT_COMMIT) -t $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT) .
$(BUILD_KIT) push $(DOCKER_REGISTRY_SERVER)/$(OPERATOR_IMAGE):$(GIT_COMMIT)

docker-registry-secret: check-env-docker-credentials operator-namespace
echo "creating registry secret and patching default service account"
Expand Down Expand Up @@ -172,8 +181,8 @@ generate-manifests:
kustomize build config/installation/cert-manager/ > releases/messaging-topology-operator-with-certmanager.yaml

CERT_MANAGER_VERSION ?=v1.2.0
cert-manager:
cert-manager: ## Deploys Cert Manager from JetStack repo. Use CERT_MANAGER_VERSION to customise version e.g. v1.2.0
kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml

destroy-cert-manager:
destroy-cert-manager: ## Deletes Cert Manager deployment created by 'make cert-manager'
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml
15 changes: 10 additions & 5 deletions controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
// BindingReconciler reconciles a Binding object
type BindingReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
KubernetesClusterDomain string
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=bindings,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -54,7 +55,7 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace)
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace, r.KubernetesClusterDomain)
if err != nil {
return handleRMQReferenceParseError(ctx, r.Client, r.Recorder, binding, &binding.Status.Conditions, err)
}
Expand Down Expand Up @@ -206,6 +207,10 @@ func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topolog
return info, nil
}

func (r *BindingReconciler) SetInternalDomainName(clusterDomain string) {
r.KubernetesClusterDomain = clusterDomain
}

func (r *BindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&topology.Binding{}).
Expand Down
37 changes: 24 additions & 13 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controllers
import (
"context"
"crypto/x509"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand All @@ -22,20 +21,32 @@ const (

// names for each of the controllers
const (
VhostControllerName = "vhost-controller"
QueueControllerName = "queue-controller"
ExchangeControllerName = "exchange-controller"
BindingControllerName = "binding-controller"
UserControllerName = "user-controller"
PolicyControllerName = "policy-controller"
PermissionControllerName = "permission-controller"
SchemaReplicationControllerName = "schema-replication-controller"
FederationControllerName = "federation-controller"
ShovelControllerName = "shovel-controller"
SuperStreamControllerName = "super-stream-controller"
SuperStreamConsumerControllerName = "super-stream-consumer-controller"
VhostControllerName = "vhost-controller"
QueueControllerName = "queue-controller"
ExchangeControllerName = "exchange-controller"
BindingControllerName = "binding-controller"
UserControllerName = "user-controller"
PolicyControllerName = "policy-controller"
PermissionControllerName = "permission-controller"
SchemaReplicationControllerName = "schema-replication-controller"
FederationControllerName = "federation-controller"
ShovelControllerName = "shovel-controller"
SuperStreamControllerName = "super-stream-controller"
)

// names for environment variables
const (
KubernetesInternalDomainEnvVar = "MESSAGING_DOMAIN_NAME"
OperatorNamespaceEnvVar = "OPERATOR_NAMESPACE"
EnableWebhooksEnvVar = "ENABLE_WEBHOOKS"
)

type TopologyController interface {
Reconcile(context.Context, ctrl.Request) (ctrl.Result, error)
SetupWithManager(mgr ctrl.Manager) error
SetInternalDomainName(string)
}

func extractSystemCertPool(ctx context.Context, recorder record.EventRecorder, object runtime.Object) (*x509.CertPool, error) {
logger := ctrl.LoggerFrom(ctx)

Expand Down
169 changes: 169 additions & 0 deletions controllers/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package controllers_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
topologyV1alpha "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"net/http"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Controllers/Common", func() {
var (
topologyObjects []runtimeclient.Object
commonRabbitmqClusterRef = topology.RabbitmqClusterReference{
Name: "example-rabbit",
Namespace: "default",
}
commonHttpCreatedResponse = &http.Response{
Status: "201 Created",
StatusCode: http.StatusCreated,
}
commonHttpDeletedResponse = &http.Response{
Status: "204 No Content",
StatusCode: http.StatusNoContent,
}
)

BeforeEach(func() {
// The order in which these are declared matters
// Keep it sync with the order in which 'topologyControllers' are declared in 'suite_test.go`
topologyObjects = []runtimeclient.Object{
&topology.Binding{
ObjectMeta: metav1.ObjectMeta{Name: "some-binding", Namespace: "default"},
Spec: topology.BindingSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.Exchange{
ObjectMeta: metav1.ObjectMeta{Name: "some-exchange", Namespace: "default"},
Spec: topology.ExchangeSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.Permission{
ObjectMeta: metav1.ObjectMeta{Name: "some-exchange", Namespace: "default"},
Spec: topology.PermissionSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.Policy{
ObjectMeta: metav1.ObjectMeta{Name: "some-policy", Namespace: "default"},
Spec: topology.PolicySpec{RabbitmqClusterReference: commonRabbitmqClusterRef,
Definition: &runtime.RawExtension{
Raw: []byte(`{"key":"value"}`),
}},
},
&topology.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "some-queue", Namespace: "default"},
Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.User{
ObjectMeta: metav1.ObjectMeta{Name: "some-user", Namespace: "default"},
Spec: topology.UserSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.Vhost{
ObjectMeta: metav1.ObjectMeta{Name: "some-vhost", Namespace: "default"},
Spec: topology.VhostSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.SchemaReplication{
ObjectMeta: metav1.ObjectMeta{Name: "some-vhost", Namespace: "default"},
Spec: topology.SchemaReplicationSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
&topology.Federation{
ObjectMeta: metav1.ObjectMeta{Name: "some-federation", Namespace: "default"},
Spec: topology.FederationSpec{RabbitmqClusterReference: commonRabbitmqClusterRef,
UriSecret: &corev1.LocalObjectReference{Name: "federation-uri"}},
},
&topology.Shovel{
ObjectMeta: metav1.ObjectMeta{Name: "some-shovel", Namespace: "default"},
Spec: topology.ShovelSpec{RabbitmqClusterReference: commonRabbitmqClusterRef,
UriSecret: &corev1.LocalObjectReference{Name: "shovel-uri-secret"}},
},
&topologyV1alpha.SuperStream{
ObjectMeta: metav1.ObjectMeta{Name: "some-super-stream", Namespace: "default"},
Spec: topologyV1alpha.SuperStreamSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
},
}

fakeRabbitMQClient.DeclareBindingReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteBindingReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.DeclareExchangeReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteExchangeReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.UpdatePermissionsInReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.ClearPermissionsInReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.PutPolicyReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeletePolicyReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.PutUserReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteUserReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.PutVhostReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteVhostReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.PutGlobalParameterReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteGlobalParameterReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.PutFederationUpstreamReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteFederationUpstreamReturns(commonHttpDeletedResponse, nil)
fakeRabbitMQClient.DeclareShovelReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteShovelReturns(commonHttpDeletedResponse, nil)
})

It("sets the domain name in the URI to connect to RabbitMQ", func() {
for _, controller := range topologyControllers {
controller.SetInternalDomainName(".some-domain.com")
}

for i, _ := range topologyControllers {
Expect(client.Create(ctx, topologyObjects[i])).To(Succeed())
Comment on lines +111 to +117
Copy link
Member Author

@Zerpet Zerpet Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was a bit flaky. For some reason, sometimes the test would fail by not adding the internal domain name. By splitting in two different loops the "setup" and the "tests", the flake seems to go away.


// Wait until the client factory is called
Eventually(func() int {
return len(fakeRabbitMQClientFactoryArgsForCall)
}, 5).Should(BeNumerically(">", i))

credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(i)
uri, found := credentials.Data("uri")
Expect(found).To(BeTrue(), "expected to find key 'uri'")
// Equals() fails here because the types dont match. BeEquivalent() is more lax, allows different type comparison
Expect(uri).To(BeEquivalentTo("https://example-rabbit.default.svc.some-domain.com:15671"),
"Offender: %d", i)
}
})

When("domain name is not set", func() {
It("uses internal short name", func() {
for i, controller := range topologyControllers {
controller.SetInternalDomainName("")
Expect(client.Create(ctx, topologyObjects[i])).To(Succeed())

// Wait until the client factory is called
Eventually(func() int {
return len(fakeRabbitMQClientFactoryArgsForCall)
}, 5).Should(BeNumerically(">", i))

credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(i)
uri, found := credentials.Data("uri")
Expect(found).To(BeTrue(), "expected to find key 'uri'")
// Equals() fails here because the types dont match. BeEquivalent() is more lax, allows different type comparison
Expect(uri).To(BeEquivalentTo("https://example-rabbit.default.svc:15671"),
"Offender: %d", i)
}
})
})

AfterEach(func() {
for _, controller := range topologyControllers {
controller.SetInternalDomainName("")
}
for _, topologyObject := range topologyObjects {
Expect(client.Delete(ctx, topologyObject)).To(Succeed())
Eventually(func() bool {
dummy := topologyObject.DeepCopyObject()
err := client.Get(ctx,
types.NamespacedName{Name: topologyObject.GetName(), Namespace: topologyObject.GetNamespace()},
dummy.(runtimeclient.Object))
return apierrors.IsNotFound(err)
}, 10, 2).Should(BeTrue(), "expected to receive a 'not found' error")
}
})
})
15 changes: 10 additions & 5 deletions controllers/exchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
// ExchangeReconciler reconciles a Exchange object
type ExchangeReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
KubernetesClusterDomain string
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=exchanges,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -52,7 +53,7 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace)
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace, r.KubernetesClusterDomain)
if err != nil {
return handleRMQReferenceParseError(ctx, r.Client, r.Recorder, exchange, &exchange.Status.Conditions, err)
}
Expand Down Expand Up @@ -146,6 +147,10 @@ func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client internal
return removeFinalizer(ctx, r.Client, exchange)
}

func (r *ExchangeReconciler) SetInternalDomainName(domainName string) {
r.KubernetesClusterDomain = domainName
}

func (r *ExchangeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&topology.Exchange{}).
Expand Down
Loading