Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/examples/non-operator-managed-rabbitmq/queue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type: Opaque
stringData:
username: a-user # an existing user
password: a-secure-password
uri: my.rabbit:15672 # uri for the management api
uri: https://my.rabbit:15672 # uri for the management api; when scheme is not provided in uri, operator defalts to 'http'
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
Expand All @@ -20,4 +20,4 @@ spec:
durable: true
rabbitmqClusterReference:
connectionSecret:
name: my-rabbitmq-creds # provided instead of a rabbitmqcluster name; secret must contain keys username, password and uri, and be in the same namespace as the object
name: my-rabbitmq-creds # provided instead of a rabbitmqcluster name; secret must contain keys username, password and uri, and be in the same namespace as the object
41 changes: 29 additions & 12 deletions internal/cluster_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"strings"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
Expand Down Expand Up @@ -40,11 +41,7 @@ func ParseRabbitmqClusterReference(ctx context.Context, c client.Client, rmq top
if err := c.Get(ctx, types.NamespacedName{Namespace: requestNamespace, Name: rmq.ConnectionSecret.Name}, secret); err != nil {
return nil, false, err
}
creds, err := readCredentialsFromKubernetesSecret(secret)
if err != nil {
return nil, false, fmt.Errorf("unable to retrieve information from Kubernetes secret %s: %w", secret.Name, err)
}
return creds, false, nil
return readCredentialsFromKubernetesSecret(secret)
}

var namespace string
Expand Down Expand Up @@ -101,7 +98,7 @@ func ParseRabbitmqClusterReference(ctx context.Context, c client.Client, rmq top
return nil, false, err
}

endpoint, err := managementURI(svc)
endpoint, err := managementURI(svc, cluster.TLSEnabled())
if err != nil {
return nil, false, fmt.Errorf("failed to get endpoint from specified rabbitmqcluster: %w", err)
}
Expand Down Expand Up @@ -133,18 +130,34 @@ func AllowedNamespace(rmq topology.RabbitmqClusterReference, requestNamespace st
return true
}

func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (ConnectionCredentials, error) {
func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (ConnectionCredentials, bool, error) {
if secret == nil {
return nil, errors.New("unable to extract data from nil secret")
return nil, false, fmt.Errorf("unable to retrieve information from Kubernetes secret %s: %w", secret.Name, errors.New("nil secret"))
}

uBytes, found := secret.Data["uri"]
if !found {
return nil, false, keyMissingErr("uri")
}

uri := string(uBytes)
if string(uBytes[0:4]) != "http" {
uri = "http://" + uri // set scheme to http if not provided
}
var tlsEnabled bool
if parsed, err := url.Parse(uri); err != nil {
return nil, false, err
} else if parsed.Scheme == "https" {
tlsEnabled = true
}

return ClusterCredentials{
data: map[string][]byte{
"username": secret.Data["username"],
"password": secret.Data["password"],
"uri": secret.Data["uri"],
"uri": []byte(uri),
},
}, nil
}, tlsEnabled, nil
}

func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
Expand All @@ -155,13 +168,17 @@ func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
return string(secret.Data["username"]), string(secret.Data["password"]), nil
}

func managementURI(svc *corev1.Service) (string, error) {
func managementURI(svc *corev1.Service, tlsEnabled bool) (string, error) {
port := managementPort(svc)
if port == 0 {
return "", fmt.Errorf("failed to find 'management' or 'management-tls' from service %s", svc.Name)
}

return fmt.Sprintf("%s:%d", serviceDNSAddress(svc), port), nil
scheme := "http"
if tlsEnabled {
scheme = "https"
}
return fmt.Sprintf("%s://%s:%d", scheme, serviceDNSAddress(svc), port), nil
}

// serviceDNSAddress returns the cluster-local DNS entry associated
Expand Down
192 changes: 169 additions & 23 deletions internal/cluster_reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
ctx = context.Background()
namespace = "rabbitmq-system"
)

JustBeforeEach(func() {
s := scheme.Scheme
s.AddKnownTypes(rabbitmqv1beta1.SchemeBuilder.GroupVersion, &rabbitmqv1beta1.RabbitmqCluster{})
Expand Down Expand Up @@ -89,8 +90,10 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
Expect(tlsEnabled).To(BeFalse())
usernameBytes, _ := credsProvider.Data("username")
passwordBytes, _ := credsProvider.Data("password")
uriBytes, _ := credsProvider.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("http://rmq.rabbitmq-system.svc:15672")))
})

When("RabbitmqCluster does not have status.defaultUser set", func() {
Expand Down Expand Up @@ -172,44 +175,188 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
It("should return the expected credentials", func() {
usernameBytes, _ := credsProv.Data("username")
passwordBytes, _ := credsProv.Data("password")
uriBytes, _ := credsProv.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("http://rmq.rabbitmq-system.svc:15672")))
})
})
})
When("spec.rabbitmqClusterReference.connectionSecret is set instead of cluster name", func() {

When("the RabbitmqCluster is configured with TLS", func() {
BeforeEach(func() {
connectionSecret := &corev1.Secret{
existingRabbitMQCluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
TLS: rabbitmqv1beta1.TLSSpec{
SecretName: "a-tls-secret",
DisableNonTLSListeners: true,
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
existingCredentialSecret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Name: "rmq-default-user-credentials",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("10.0.0.0:15671"),
"username": []byte("test-user"),
"password": []byte("test-password"),
"username": []byte(existingRabbitMQUsername),
"password": []byte(existingRabbitMQPassword),
},
}
objs = []runtime.Object{connectionSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
existingService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "1.2.3.4",
Ports: []corev1.ServicePort{
{
Name: "management-tls",
Port: int32(15671),
},
},
},
namespace)
}
objs = []runtime.Object{existingRabbitMQCluster, existingCredentialSecret, existingService}
})

It("returns correct creds in connectionCredentials", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient, topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name}, existingRabbitMQCluster.Namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("10.0.0.0:15671"))
Expect(tlsEnabled).To(BeTrue())
usernameBytes, _ := credsProvider.Data("username")
passwordBytes, _ := credsProvider.Data("password")
uriBytes, _ := credsProvider.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("https://rmq.rabbitmq-system.svc:15671")))
})
})

Context("spec.rabbitmqClusterReference.connectionSecret is set", func() {
When("uri has no scheme defined", func() {
BeforeEach(func() {
noSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("10.0.0.0:15672"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{noSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("http://10.0.0.0:15672"))
})
})

When("uri sets http as the scheme", func() {
BeforeEach(func() {
httpSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("http://10.0.0.0:15672"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{httpSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("http://10.0.0.0:15672"))
})
})

When("uri sets https as the scheme", func() {
BeforeEach(func() {
httpsSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("https://10.0.0.0:15671"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{httpsSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeTrue())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("https://10.0.0.0:15671"))
})
})
})
})
Expand Down Expand Up @@ -271,5 +418,4 @@ var _ = Describe("AllowedNamespace", func() {
Expect(internal.AllowedNamespace(ref, "whatever", cluster)).To(BeTrue())
})
})

})
6 changes: 3 additions & 3 deletions internal/rabbitmq_client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func generateRabbitholeClient(connectionCreds ConnectionCredentials, tlsEnabled
return nil, keyMissingErr("password")
}

endpoint, found := connectionCreds.Data("uri")
uri, found := connectionCreds.Data("uri")
if !found {
return nil, keyMissingErr("uri")
}
Expand All @@ -76,12 +76,12 @@ func generateRabbitholeClient(connectionCreds ConnectionCredentials, tlsEnabled
cfg.RootCAs = certPool

transport := &http.Transport{TLSClientConfig: cfg}
rabbitmqClient, err = rabbithole.NewTLSClient(fmt.Sprintf("https://%s", string(endpoint)), string(defaultUser), string(defaultUserPass), transport)
rabbitmqClient, err = rabbithole.NewTLSClient(fmt.Sprintf("%s", string(uri)), string(defaultUser), string(defaultUserPass), transport)
if err != nil {
return nil, fmt.Errorf("failed to instantiate rabbit rabbitmqClient: %v", err)
}
} else {
rabbitmqClient, err = rabbithole.NewClient(fmt.Sprintf("http://%s", string(endpoint)), string(defaultUser), string(defaultUserPass))
rabbitmqClient, err = rabbithole.NewClient(fmt.Sprintf("%s", string(uri)), string(defaultUser), string(defaultUserPass))
if err != nil {
return nil, fmt.Errorf("failed to instantiate rabbit rabbitmqClient: %v", err)
}
Expand Down
7 changes: 2 additions & 5 deletions internal/rabbitmq_client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package internal_test
import (
"crypto/x509"
"errors"
"fmt"
"github.com/rabbitmq/messaging-topology-operator/internal/internalfakes"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -47,9 +46,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
FakeConnectionCredentials = &internalfakes.FakeConnectionCredentials{}
FakeConnectionCredentials.DataReturnsOnCall(0, []byte(existingRabbitMQUsername), true)
FakeConnectionCredentials.DataReturnsOnCall(1, []byte(existingRabbitMQPassword), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fmt.Sprintf("%s:%s",
fakeRabbitMQURL.Hostname(),
fakeRabbitMQURL.Port())), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fakeRabbitMQURL.String()), true)

fakeRabbitMQServer.RouteToHandler("PUT", "/api/users/example-user", func(w http.ResponseWriter, req *http.Request) {
user, password, ok := req.BasicAuth()
Expand Down Expand Up @@ -219,7 +216,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
FakeConnectionCredentials = &internalfakes.FakeConnectionCredentials{}
FakeConnectionCredentials.DataReturnsOnCall(0, []byte(existingRabbitMQUsername), true)
FakeConnectionCredentials.DataReturnsOnCall(1, []byte(existingRabbitMQPassword), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fmt.Sprintf("%s:%d", fakeRabbitMQURL.Hostname(), fakeRabbitMQPort)), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fakeRabbitMQURL.String()), true)

fakeRabbitMQServer.RouteToHandler("PUT", "/api/users/example-user", func(w http.ResponseWriter, req *http.Request) {
user, password, ok := req.BasicAuth()
Expand Down
Loading