Skip to content
55 changes: 48 additions & 7 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,17 @@ func (c *M3DBController) handleClusterUpdate(
return fmt.Errorf("set %s has unset spec replica", set.Name)
}

// NB(cerkauskas): if statefulset is managed using on delete strategy, then operator
// should not expand nor shrink the cluster without the annotation
if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType {
_, inProgressAnnotationExists := set.Annotations[annotations.ParallelUpdateInProgress]
if !inProgressAnnotationExists {
c.logger.Warn("skipping statefulset resize because it does not have progress annotation",
zap.String("sts", set.Name))
continue
}
}

// Number of pods we want in the group.
desired := group.NumInstances
// Number of pods currently in the group.
Expand Down Expand Up @@ -755,6 +766,11 @@ func (c *M3DBController) handleClusterUpdate(
return fmt.Errorf("error reconciling bootstrap status: %v", err)
}

err = c.cleanupAnnotations(ctx, childrenSets)
if err != nil {
return fmt.Errorf("error cleaning up annotations for on delete strategy: %w", err)
}

c.logger.Info("nothing to do",
zap.Int("childrensets", len(childrenSets)),
zap.Int("zones", len(isoGroups)),
Expand All @@ -764,6 +780,36 @@ func (c *M3DBController) handleClusterUpdate(
return nil
}

func (c *M3DBController) cleanupAnnotations(
ctx context.Context, childrenSets []*appsv1.StatefulSet,
) error {
c.logger.Debug("cleaning up progress annotations")
for _, set := range childrenSets {
// NB(cerkauskas): we want to delete annotation no matter the strategy of cluster.
// Strategy could be changed while update is happening and we still want to remove
// the annotation since there is nothing more to do for operator.
if _, ok := set.Annotations[annotations.ParallelUpdateInProgress]; !ok {
c.logger.Debug("skipping set because it does not have progress annotation",
zap.String("sts", set.Name))
continue
}

c.logger.Info("removing update annotation for statefulset",
zap.String("sts", set.Name))

if err := c.patchStatefulSet(ctx, set, func(set *appsv1.StatefulSet) {
delete(set.Annotations, annotations.ParallelUpdateInProgress)
}); err != nil {
c.logger.Error("failed to remove annotation",
zap.String("sts", set.Name),
zap.Error(err))
return err
}
}
c.logger.Info("cleaned up progress annotations")
return nil
}

func (c *M3DBController) patchStatefulSet(
ctx context.Context,
set *appsv1.StatefulSet,
Expand Down Expand Up @@ -878,17 +924,12 @@ func (c *M3DBController) updateStatefulSetPods(
sts.Status.CurrentReplicas = sts.Status.UpdatedReplicas
sts.Status.CurrentRevision = sts.Status.UpdateRevision

if sts, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).
if _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).
UpdateStatus(ctx, sts, metav1.UpdateOptions{}); err != nil {
return false, err
}

if err = c.patchStatefulSet(ctx, sts, func(set *appsv1.StatefulSet) {
delete(set.Annotations, annotations.ParallelUpdateInProgress)
}); err != nil {
return false, err
}
logger.Info("update complete")
logger.Info("update of existing pods complete")

return false, nil
}
Expand Down
190 changes: 180 additions & 10 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"math"
"sort"
Expand All @@ -38,7 +39,9 @@ import (
"github.com/m3db/m3db-operator/pkg/k8sops/m3db"
"github.com/m3db/m3db-operator/pkg/k8sops/podidentity"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/shard"
namespacepb "github.com/m3db/m3/src/dbnode/generated/proto/namespace"
"github.com/m3db/m3/src/query/generated/proto/admin"

Expand Down Expand Up @@ -84,6 +87,7 @@ func setupTestCluster(
replicationFactor int,
numInstances int32,
onDeleteUpdateStrategy bool,
addRevision bool,
) (*myspec.M3DBCluster, *testDeps) {
cfgMapName := defaultConfigMapName
cluster := &myspec.M3DBCluster{
Expand Down Expand Up @@ -122,6 +126,11 @@ func setupTestCluster(
set.Annotations[k] = v
}

if addRevision {
set.Status.CurrentRevision = "current-revision"
set.Status.UpdateRevision = "current-revision"
}

statefulSets[i] = set
objects[i] = set
set.OwnerReferences = []metav1.OwnerReference{
Expand Down Expand Up @@ -725,7 +734,7 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cluster, deps := setupTestCluster(
t, *test.cluster, test.sets, nil, test.replicationFactor, 1, false,
t, *test.cluster, test.sets, nil, test.replicationFactor, 1, false, false,
)
defer deps.cleanup()
c := deps.newController(t)
Expand Down Expand Up @@ -902,7 +911,7 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
const replicas = 3
cluster, deps := setupTestCluster(t, *test.cluster, test.sets, nil, replicas, 1, false)
cluster, deps := setupTestCluster(t, *test.cluster, test.sets, nil, replicas, 1, false, false)
defer deps.cleanup()
c := deps.newController(t)

Expand Down Expand Up @@ -1057,7 +1066,7 @@ func TestHandleUpdateClusterOnDeleteStrategy(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := int32(len(test.pods) / len(test.sets))
cluster, deps := setupTestCluster(
t, *rawCluster, test.sets, test.pods, int(replicas), nodes, true,
t, *rawCluster, test.sets, test.pods, int(replicas), nodes, true, false,
)
defer deps.cleanup()
c := deps.newController(t)
Expand All @@ -1077,6 +1086,148 @@ func TestHandleUpdateClusterOnDeleteStrategy(t *testing.T) {
}
}

func TestHandleResizeClusterOnDeleteStrategy(t *testing.T) {
var (
clusterMeta = newMeta("cluster1", map[string]string{
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}, nil)
sets = []*metav1.ObjectMeta{
newMeta("cluster1-rep0", nil, nil),
newMeta("cluster1-rep1", nil, map[string]string{
annotations.ParallelUpdateInProgress: "1",
}),
newMeta("cluster1-rep2", nil, nil),
}

pods = generatePods("cluster1", 3, 1, "current-revision")
)
cluster, deps := setupTestCluster(t, *clusterMeta, sets, pods, 3, 1, true, true)
defer deps.cleanup()
c := deps.newController(t)

pl := placement.NewPlacement()
insts := []placement.Instance{}
for _, pod := range pods {
podIdentity := &myspec.PodIdentity{Name: pod.Name}
deps.idProvider.EXPECT().
Identity(pod, gomock.Any()).
AnyTimes().
Return(podIdentity, nil)

podID, err := podidentity.IdentityJSON(podIdentity)
require.NoError(t, err)

statefulSetName, ok := pod.Labels[labels.StatefulSet]
require.True(t, ok)
sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(statefulSetName)
require.NoError(t, err)

instMock := placement.NewMockInstance(deps.mockController)
instMock.EXPECT().IsAvailable().AnyTimes().Return(true)
instMock.EXPECT().ID().AnyTimes().Return(podID)
instMock.EXPECT().Shards().AnyTimes().Return(shard.NewShards(nil))
instMock.EXPECT().Hostname().AnyTimes().Return(pod.Name)
instMock.EXPECT().IsolationGroup().AnyTimes().Return(sts.Labels[labels.IsolationGroup])
insts = append(insts, instMock)
}
pl.SetInstances(insts)

deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{
Registry: &namespacepb.Registry{},
}, nil)
deps.placementClient.EXPECT().Get().AnyTimes().Return(pl, nil)
deps.placementClient.EXPECT().Add(gomock.Any()).AnyTimes().Do(
func(instsToAdd []*placementpb.Instance) {
insts := pl.Instances()
for _, instProto := range instsToAdd {
inst, err := placement.NewInstanceFromProto(instProto)
require.NoError(t, err)
instMock := placement.NewMockInstance(deps.mockController)
instMock.EXPECT().IsAvailable().AnyTimes().Return(true)
instMock.EXPECT().ID().AnyTimes().Return(inst.ID())
instMock.EXPECT().Shards().AnyTimes().Return(inst.Shards())
instMock.EXPECT().Hostname().AnyTimes().Return(inst.Hostname())
instMock.EXPECT().IsolationGroup().AnyTimes().Return(inst.IsolationGroup())
insts = append(insts, instMock)
}

pl.SetInstances(insts)
}).Return(nil)

// Scale up all isolation groups
for i := 0; i < len(cluster.Spec.IsolationGroups); i++ {
cluster.Spec.IsolationGroups[i].NumInstances = 3
}

//nolint:forcetypeassert
c.kubeClient.(*kubefake.Clientset).
PrependReactor("patch", "statefulsets",
func(action ktesting.Action) (bool, runtime.Object, error) {
// We are expecting patch only for Spec.Replicas as this test tests scaling
patchAction := action.(kubetesting.PatchActionImpl)
var patched appsv1.StatefulSet
require.NoError(t, json.Unmarshal(patchAction.Patch, &patched))

// If replicas are nil, then it must be deletion of annotations
if patched.Spec.Replicas == nil {
return false, nil, nil
}

numReplicas := *patched.Spec.Replicas

// Pretending to be stateful set controller. We want to simulate creation of pod because
// scaling is happening one pod at a time.
sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(patchAction.Name)
require.NoError(t, err)
sts.Spec.Replicas = &numReplicas
sts.Status.UpdatedReplicas = numReplicas
sts.Status.ReadyReplicas = numReplicas

pod := generatePodForStatefulSet("cluster1", sts.Name, int(numReplicas), "current-revision")
// Need to copy labels for pod to be found in placement update
for k, v := range sts.Labels {
pod.Labels[k] = v
}
require.NoError(t, deps.kubeClient.Tracker().Add(pod))
deps.idProvider.EXPECT().
Identity(pod, gomock.Any()).
AnyTimes().
Return(&myspec.PodIdentity{Name: pod.Name}, nil)

return true, sts, nil
})

for i := 0; i < 10; i++ {
require.NoError(t, c.handleClusterUpdate(context.Background(), cluster))
}

requireStatefulSetReplicas(t, deps, "cluster1-rep0", 1)
requireStatefulSetReplicas(t, deps, "cluster1-rep1", 3)
requireStatefulSetReplicas(t, deps, "cluster1-rep2", 1)

sts, err := deps.statefulSetLister.StatefulSets("namespace").Get("cluster1-rep1")
require.NoError(t, err)

// Let's make sure that annotation is removed when everything is done
_, ok := sts.Annotations[annotations.ParallelUpdateInProgress]
require.False(t, ok)
}

func requireStatefulSetReplicas(
t *testing.T,
deps *testDeps,
statefulSetName string,
expectedReplicas int32,
) {
t.Helper()

sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(statefulSetName)
require.NoError(t, err)
require.NotNil(t, t, sts.Spec.Replicas)
require.Equal(t, expectedReplicas, *sts.Spec.Replicas)
}

func TestHandleUpdateClusterFrozen(t *testing.T) {
var (
clusterMeta = newMeta("cluster1", map[string]string{
Expand All @@ -1088,7 +1239,7 @@ func TestHandleUpdateClusterFrozen(t *testing.T) {
newMeta("cluster1-rep0", nil, nil),
}
)
cluster, deps := setupTestCluster(t, *clusterMeta, sets, nil, 3, 1, false)
cluster, deps := setupTestCluster(t, *clusterMeta, sets, nil, 3, 1, false, false)
defer deps.cleanup()
controller := deps.newController(t)

Expand Down Expand Up @@ -1137,18 +1288,37 @@ func generatePods(clusterName string, rf int32, nodes int32, revision string) []
var pods []*corev1.Pod
for i := 0; i < int(rf); i++ {
for j := 0; j < int(nodes); j++ {
pods = append(pods, &corev1.Pod{
ObjectMeta: *newMeta(fmt.Sprintf("%s-rep%d-%d", clusterName, i, j), map[string]string{
"controller-revision-hash": fmt.Sprintf("%s-rep%d-%s", clusterName, i, revision),
"operator.m3db.io/stateful-set": fmt.Sprintf("%s-rep%d", clusterName, i),
}, nil),
})
pods = append(pods, generatePod(clusterName, i, j, revision))
}
}

return pods
}

func generatePod(clusterName string, rep, node int, revision string) *corev1.Pod {
statefulSetName := fmt.Sprintf("%s-rep%d", clusterName, rep)

return generatePodForStatefulSet(clusterName, statefulSetName, node, revision)
}

func generatePodForStatefulSet(
clusterName string,
statefulSetName string,
node int,
revision string,
) *corev1.Pod {
l := labels.BaseLabels(&myspec.M3DBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
},
})
l["controller-revision-hash"] = fmt.Sprintf("%s-%s", statefulSetName, revision)
l["operator.m3db.io/stateful-set"] = statefulSetName
return &corev1.Pod{
ObjectMeta: *newMeta(fmt.Sprintf("%s-%d", statefulSetName, node), l, nil),
}
}

func mockPlacement(deps *testDeps, replicas int32) {
deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{
Registry: &namespacepb.Registry{},
Expand Down