Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/onsi/gomega v1.20.0
github.com/rabbitmq/cluster-operator v1.14.0
github.com/sclevine/yj v0.0.0-20200815061347-554173e71934
gopkg.in/ini.v1 v1.66.6
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
k8s.io/client-go v0.24.3
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
Expand Down
29 changes: 26 additions & 3 deletions rabbitmqclient/cluster_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
"gopkg.in/ini.v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -99,7 +100,12 @@ func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqC
return nil, false, err
}

endpoint, err := managementURI(svc, cluster.TLSEnabled(), clusterDomain)
additionalConfig, err := readClusterAdditionalConfig(cluster)
if err != nil {
return nil, false, fmt.Errorf("unable to parse additionconfig setting from the rabbitmqcluster resource: %w", err)
}

endpoint, err := managementURI(svc, cluster.TLSEnabled(), clusterDomain, additionalConfig["management.path_prefix"])
if err != nil {
return nil, false, fmt.Errorf("failed to get endpoint from specified rabbitmqcluster: %w", err)
}
Expand Down Expand Up @@ -161,6 +167,18 @@ func readCredentialsFromKubernetesSecret(secret *corev1.Secret) (ConnectionCrede
}, tlsEnabled, nil
}

func readClusterAdditionalConfig(cluster *rabbitmqv1beta1.RabbitmqCluster) (additionalConfig map[string]string, err error) {
cfg, err := ini.Load([]byte(cluster.Spec.Rabbitmq.AdditionalConfig))
if err != nil {
return
}

// Additional config has only a default section
additionalConfig = cfg.Section("").KeysHash()

return
}

func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
if secret == nil {
return "", "", errors.New("unable to extract data from nil secret")
Expand All @@ -169,7 +187,7 @@ func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
return string(secret.Data["username"]), string(secret.Data["password"]), nil
}

func managementURI(svc *corev1.Service, tlsEnabled bool, clusterDomain string) (string, error) {
func managementURI(svc *corev1.Service, tlsEnabled bool, clusterDomain string, pathPrefix string) (string, error) {
var managementUiPort int
for _, port := range svc.Spec.Ports {
if port.Name == "management-tls" {
Expand All @@ -190,5 +208,10 @@ func managementURI(svc *corev1.Service, tlsEnabled bool, clusterDomain string) (
if tlsEnabled {
scheme = "https"
}
return fmt.Sprintf("%s://%s.%s.svc%s:%d", scheme, svc.Name, svc.Namespace, clusterDomain, managementUiPort), nil
url := url.URL{
Scheme: scheme,
Host: fmt.Sprintf("%s.%s.svc%s:%d", svc.Name, svc.Namespace, clusterDomain, managementUiPort),
Path: pathPrefix,
}
return url.String(), nil
}
71 changes: 71 additions & 0 deletions rabbitmqclient/cluster_reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmqclient_test
import (
"context"
"fmt"

"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient/rabbitmqclientfakes"

Expand Down Expand Up @@ -283,6 +284,76 @@ var _ = Describe("ParseReference", func() {
})
})

When("the RabbitmqCluster is configured with management path_prefix", func() {
BeforeEach(func() {
existingRabbitMQCluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
AdditionalConfig: `
management.path_prefix = /my/prefix
`,
},
},
Status: rabbitmqv1beta1.RabbitmqClusterStatus{
Binding: &corev1.LocalObjectReference{
Name: "rmq-default-user-credentials",
},
DefaultUser: &rabbitmqv1beta1.RabbitmqClusterDefaultUser{
ServiceReference: &rabbitmqv1beta1.RabbitmqClusterServiceReference{
Name: "rmq",
Namespace: namespace,
},
},
},
}
existingCredentialSecret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq-default-user-credentials",
Namespace: namespace,
},
Data: map[string][]byte{
"username": []byte(existingRabbitMQUsername),
"password": []byte(existingRabbitMQPassword),
},
}
existingService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "rmq",
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "1.2.3.4",
Ports: []corev1.ServicePort{
{
Name: "management-tls",
Port: int32(15671),
},
},
},
}
objs = []runtime.Object{existingRabbitMQCluster, existingCredentialSecret, existingService}
})

It("returns correct creds in connectionCredentials", func() {
credsProvider, _, err := rabbitmqclient.ParseReference(ctx, fakeClient,
topology.RabbitmqClusterReference{Name: existingRabbitMQCluster.Name},
existingRabbitMQCluster.Namespace,
"")
Expect(err).NotTo(HaveOccurred())

usernameBytes, _ := credsProvider.Data("username")
passwordBytes, _ := credsProvider.Data("password")
uriBytes, _ := credsProvider.Data("uri")
Expect(usernameBytes).To(Equal([]byte(existingRabbitMQUsername)))
Expect(passwordBytes).To(Equal([]byte(existingRabbitMQPassword)))
Expect(uriBytes).To(Equal([]byte("http://rmq.rabbitmq-system.svc:15671/my/prefix")))
})
})

Context("spec.rabbitmqClusterReference.connectionSecret is set", func() {
When("uri has no scheme defined", func() {
BeforeEach(func() {
Expand Down