Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/examples/non-operator-managed-rabbitmq/queue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type: Opaque
stringData:
username: a-user # an existing user
password: a-secure-password
uri: my.rabbit:15672 # uri for the management api
uri: https://my.rabbit:15672 # uri for the management api; when scheme is not provided in uri, operator defalts to 'http'
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
Expand All @@ -20,4 +20,4 @@ spec:
durable: true
rabbitmqClusterReference:
connectionSecret:
name: my-rabbitmq-creds # provided instead of a rabbitmqcluster name; secret must contain keys username, password and uri, and be in the same namespace as the object
name: my-rabbitmq-creds # provided instead of a rabbitmqcluster name; secret must contain keys username, password and uri, and be in the same namespace as the object
41 changes: 29 additions & 12 deletions internal/cluster_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"strings"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
Expand Down Expand Up @@ -40,11 +41,7 @@ func ParseRabbitmqClusterReference(ctx context.Context, c client.Client, rmq top
if err := c.Get(ctx, types.NamespacedName{Namespace: requestNamespace, Name: rmq.ConnectionSecret.Name}, secret); err != nil {
return nil, false, err
}
creds, err := readCredentialsFromKubernetesSecret(secret)
if err != nil {
return nil, false, fmt.Errorf("unable to retrieve information from Kubernetes secret %s: %w", secret.Name, err)
}
return creds, false, nil
return readCredentialsFromKubernetesSecret(secret)
}

var namespace string
Expand Down Expand Up @@ -101,7 +98,7 @@ func ParseRabbitmqClusterReference(ctx context.Context, c client.Client, rmq top
return nil, false, err
}

endpoint, err := managementURI(svc)
endpoint, err := managementURI(svc, cluster.TLSEnabled())
if err != nil {
return nil, false, fmt.Errorf("failed to get endpoint from specified rabbitmqcluster: %w", err)
}
Expand Down Expand Up @@ -133,18 +130,34 @@ func AllowedNamespace(rmq topology.RabbitmqClusterReference, requestNamespace st
return true
}

func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (ConnectionCredentials, error) {
func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (ConnectionCredentials, bool, error) {
if secret == nil {
return nil, errors.New("unable to extract data from nil secret")
return nil, false, fmt.Errorf("unable to retrieve information from Kubernetes secret %s: %w", secret.Name, errors.New("nil secret"))
}

uBytes, found := secret.Data["uri"]
if !found {
return nil, false, keyMissingErr("uri")
}

uri := string(uBytes)
if !strings.HasPrefix(uri, "http") {
uri = "http://" + uri // set scheme to http if not provided
}
var tlsEnabled bool
if parsed, err := url.Parse(uri); err != nil {
return nil, false, err
} else if parsed.Scheme == "https" {
tlsEnabled = true
}

return ClusterCredentials{
data: map[string][]byte{
"username": secret.Data["username"],
"password": secret.Data["password"],
"uri": secret.Data["uri"],
"uri": []byte(uri),
},
}, nil
}, tlsEnabled, nil
}

func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
Expand All @@ -155,13 +168,17 @@ func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
return string(secret.Data["username"]), string(secret.Data["password"]), nil
}

func managementURI(svc *corev1.Service) (string, error) {
func managementURI(svc *corev1.Service, tlsEnabled bool) (string, error) {
port := managementPort(svc)
if port == 0 {
return "", fmt.Errorf("failed to find 'management' or 'management-tls' from service %s", svc.Name)
}

return fmt.Sprintf("%s:%d", serviceDNSAddress(svc), port), nil
scheme := "http"
if tlsEnabled {
scheme = "https"
}
return fmt.Sprintf("%s://%s:%d", scheme, serviceDNSAddress(svc), port), nil
}

// serviceDNSAddress returns the cluster-local DNS entry associated
Expand Down
192 changes: 169 additions & 23 deletions internal/cluster_reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
ctx = context.Background()
namespace = "rabbitmq-system"
)

JustBeforeEach(func() {
s := scheme.Scheme
s.AddKnownTypes(rabbitmqv1beta1.SchemeBuilder.GroupVersion, &rabbitmqv1beta1.RabbitmqCluster{})
Expand Down Expand Up @@ -89,8 +90,10 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
Expect(tlsEnabled).To(BeFalse())
usernameBytes, _ := credsProvider.Data("username")
passwordBytes, _ := credsProvider.Data("password")
uriBytes, _ := credsProvider.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("http://rmq.rabbitmq-system.svc:15672")))
})

When("RabbitmqCluster does not have status.defaultUser set", func() {
Expand Down Expand Up @@ -172,44 +175,188 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
It("should return the expected credentials", func() {
usernameBytes, _ := credsProv.Data("username")
passwordBytes, _ := credsProv.Data("password")
uriBytes, _ := credsProv.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("http://rmq.rabbitmq-system.svc:15672")))
})
})
})
When("spec.rabbitmqClusterReference.connectionSecret is set instead of cluster name", func() {

When("the RabbitmqCluster is configured with TLS", func() {
BeforeEach(func() {
connectionSecret := &corev1.Secret{
existingRabbitMQCluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
TLS: rabbitmqv1beta1.TLSSpec{
SecretName: "a-tls-secret",
DisableNonTLSListeners: true,
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
existingCredentialSecret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Name: "rmq-default-user-credentials",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("10.0.0.0:15671"),
"username": []byte("test-user"),
"password": []byte("test-password"),
"username": []byte(existingRabbitMQUsername),
"password": []byte(existingRabbitMQPassword),
},
}
objs = []runtime.Object{connectionSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
existingService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "1.2.3.4",
Ports: []corev1.ServicePort{
{
Name: "management-tls",
Port: int32(15671),
},
},
},
namespace)
}
objs = []runtime.Object{existingRabbitMQCluster, existingCredentialSecret, existingService}
})

It("returns correct creds in connectionCredentials", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient, topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name}, existingRabbitMQCluster.Namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("10.0.0.0:15671"))
Expect(tlsEnabled).To(BeTrue())
usernameBytes, _ := credsProvider.Data("username")
passwordBytes, _ := credsProvider.Data("password")
uriBytes, _ := credsProvider.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("https://rmq.rabbitmq-system.svc:15671")))
})
})

Context("spec.rabbitmqClusterReference.connectionSecret is set", func() {
When("uri has no scheme defined", func() {
BeforeEach(func() {
noSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("10.0.0.0:15672"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{noSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("http://10.0.0.0:15672"))
})
})

When("uri sets http as the scheme", func() {
BeforeEach(func() {
httpSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("http://10.0.0.0:15672"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{httpSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeFalse())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("http://10.0.0.0:15672"))
})
})

When("uri sets https as the scheme", func() {
BeforeEach(func() {
httpsSchemeSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-connection-info",
Namespace: namespace,
},
Data: map[string][]byte{
"uri": []byte("https://10.0.0.0:15671"),
"username": []byte("test-user"),
"password": []byte("test-password"),
},
}
objs = []runtime.Object{httpsSchemeSecret}
})

It("returns the expected connection information", func() {
credsProvider, tlsEnabled, err := internal.ParseRabbitmqClusterReference(ctx, fakeClient,
topology.RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "rmq-connection-info",
},
},
namespace)
Expect(err).NotTo(HaveOccurred())

Expect(tlsEnabled).To(BeTrue())
returnedUser, _ := credsProvider.Data("username")
returnedPass, _ := credsProvider.Data("password")
returnedURI, _ := credsProvider.Data("uri")
Expect(string(returnedUser)).To(Equal("test-user"))
Expect(string(returnedPass)).To(Equal("test-password"))
Expect(string(returnedURI)).To(Equal("https://10.0.0.0:15671"))
})
})
})
})
Expand Down Expand Up @@ -271,5 +418,4 @@ var _ = Describe("AllowedNamespace", func() {
Expect(internal.AllowedNamespace(ref, "whatever", cluster)).To(BeTrue())
})
})

})
6 changes: 3 additions & 3 deletions internal/rabbitmq_client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func generateRabbitholeClient(connectionCreds ConnectionCredentials, tlsEnabled
return nil, keyMissingErr("password")
}

endpoint, found := connectionCreds.Data("uri")
uri, found := connectionCreds.Data("uri")
if !found {
return nil, keyMissingErr("uri")
}
Expand All @@ -76,12 +76,12 @@ func generateRabbitholeClient(connectionCreds ConnectionCredentials, tlsEnabled
cfg.RootCAs = certPool

transport := &http.Transport{TLSClientConfig: cfg}
rabbitmqClient, err = rabbithole.NewTLSClient(fmt.Sprintf("https://%s", string(endpoint)), string(defaultUser), string(defaultUserPass), transport)
rabbitmqClient, err = rabbithole.NewTLSClient(fmt.Sprintf("%s", string(uri)), string(defaultUser), string(defaultUserPass), transport)
if err != nil {
return nil, fmt.Errorf("failed to instantiate rabbit rabbitmqClient: %v", err)
}
} else {
rabbitmqClient, err = rabbithole.NewClient(fmt.Sprintf("http://%s", string(endpoint)), string(defaultUser), string(defaultUserPass))
rabbitmqClient, err = rabbithole.NewClient(fmt.Sprintf("%s", string(uri)), string(defaultUser), string(defaultUserPass))
if err != nil {
return nil, fmt.Errorf("failed to instantiate rabbit rabbitmqClient: %v", err)
}
Expand Down
7 changes: 2 additions & 5 deletions internal/rabbitmq_client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package internal_test
import (
"crypto/x509"
"errors"
"fmt"
"github.com/rabbitmq/messaging-topology-operator/internal/internalfakes"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -47,9 +46,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
FakeConnectionCredentials = &internalfakes.FakeConnectionCredentials{}
FakeConnectionCredentials.DataReturnsOnCall(0, []byte(existingRabbitMQUsername), true)
FakeConnectionCredentials.DataReturnsOnCall(1, []byte(existingRabbitMQPassword), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fmt.Sprintf("%s:%s",
fakeRabbitMQURL.Hostname(),
fakeRabbitMQURL.Port())), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fakeRabbitMQURL.String()), true)

fakeRabbitMQServer.RouteToHandler("PUT", "/api/users/example-user", func(w http.ResponseWriter, req *http.Request) {
user, password, ok := req.BasicAuth()
Expand Down Expand Up @@ -219,7 +216,7 @@ var _ = Describe("ParseRabbitmqClusterReference", func() {
FakeConnectionCredentials = &internalfakes.FakeConnectionCredentials{}
FakeConnectionCredentials.DataReturnsOnCall(0, []byte(existingRabbitMQUsername), true)
FakeConnectionCredentials.DataReturnsOnCall(1, []byte(existingRabbitMQPassword), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fmt.Sprintf("%s:%d", fakeRabbitMQURL.Hostname(), fakeRabbitMQPort)), true)
FakeConnectionCredentials.DataReturnsOnCall(2, []byte(fakeRabbitMQURL.String()), true)

fakeRabbitMQServer.RouteToHandler("PUT", "/api/users/example-user", func(w http.ResponseWriter, req *http.Request) {
user, password, ok := req.BasicAuth()
Expand Down
Loading