diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c338038..ec2573a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -684,6 +684,17 @@ func (c *M3DBController) handleClusterUpdate( return fmt.Errorf("set %s has unset spec replica", set.Name) } + // NB(cerkauskas): if statefulset is managed using on delete strategy, then operator + // should not expand nor shrink the cluster without the annotation + if set.Spec.UpdateStrategy.Type == appsv1.OnDeleteStatefulSetStrategyType { + _, inProgressAnnotationExists := set.Annotations[annotations.ParallelUpdateInProgress] + if !inProgressAnnotationExists { + c.logger.Warn("skipping statefulset resize because it does not have progress annotation", + zap.String("sts", set.Name)) + continue + } + } + // Number of pods we want in the group. desired := group.NumInstances // Number of pods currently in the group. @@ -755,6 +766,11 @@ func (c *M3DBController) handleClusterUpdate( return fmt.Errorf("error reconciling bootstrap status: %v", err) } + err = c.cleanupAnnotations(ctx, childrenSets) + if err != nil { + return fmt.Errorf("error cleaning up annotations for on delete strategy: %w", err) + } + c.logger.Info("nothing to do", zap.Int("childrensets", len(childrenSets)), zap.Int("zones", len(isoGroups)), @@ -764,6 +780,36 @@ func (c *M3DBController) handleClusterUpdate( return nil } +func (c *M3DBController) cleanupAnnotations( + ctx context.Context, childrenSets []*appsv1.StatefulSet, +) error { + c.logger.Debug("cleaning up progress annotations") + for _, set := range childrenSets { + // NB(cerkauskas): we want to delete annotation no matter the strategy of cluster. + // Strategy could be changed while update is happening and we still want to remove + // the annotation since there is nothing more to do for operator. + if _, ok := set.Annotations[annotations.ParallelUpdateInProgress]; !ok { + c.logger.Debug("skipping set because it does not have progress annotation", + zap.String("sts", set.Name)) + continue + } + + c.logger.Info("removing update annotation for statefulset", + zap.String("sts", set.Name)) + + if err := c.patchStatefulSet(ctx, set, func(set *appsv1.StatefulSet) { + delete(set.Annotations, annotations.ParallelUpdateInProgress) + }); err != nil { + c.logger.Error("failed to remove annotation", + zap.String("sts", set.Name), + zap.Error(err)) + return err + } + } + c.logger.Info("cleaned up progress annotations") + return nil +} + func (c *M3DBController) patchStatefulSet( ctx context.Context, set *appsv1.StatefulSet, @@ -878,17 +924,12 @@ func (c *M3DBController) updateStatefulSetPods( sts.Status.CurrentReplicas = sts.Status.UpdatedReplicas sts.Status.CurrentRevision = sts.Status.UpdateRevision - if sts, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace). + if _, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace). UpdateStatus(ctx, sts, metav1.UpdateOptions{}); err != nil { return false, err } - if err = c.patchStatefulSet(ctx, sts, func(set *appsv1.StatefulSet) { - delete(set.Annotations, annotations.ParallelUpdateInProgress) - }); err != nil { - return false, err - } - logger.Info("update complete") + logger.Info("update of existing pods complete") return false, nil } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 6f75d85..c7008a3 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -22,6 +22,7 @@ package controller import ( "context" + "encoding/json" "fmt" "math" "sort" @@ -38,7 +39,9 @@ import ( "github.com/m3db/m3db-operator/pkg/k8sops/m3db" "github.com/m3db/m3db-operator/pkg/k8sops/podidentity" + "github.com/m3db/m3/src/cluster/generated/proto/placementpb" "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/cluster/shard" namespacepb "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/query/generated/proto/admin" @@ -84,6 +87,7 @@ func setupTestCluster( replicationFactor int, numInstances int32, onDeleteUpdateStrategy bool, + addRevision bool, ) (*myspec.M3DBCluster, *testDeps) { cfgMapName := defaultConfigMapName cluster := &myspec.M3DBCluster{ @@ -122,6 +126,11 @@ func setupTestCluster( set.Annotations[k] = v } + if addRevision { + set.Status.CurrentRevision = "current-revision" + set.Status.UpdateRevision = "current-revision" + } + statefulSets[i] = set objects[i] = set set.OwnerReferences = []metav1.OwnerReference{ @@ -725,7 +734,7 @@ func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cluster, deps := setupTestCluster( - t, *test.cluster, test.sets, nil, test.replicationFactor, 1, false, + t, *test.cluster, test.sets, nil, test.replicationFactor, 1, false, false, ) defer deps.cleanup() c := deps.newController(t) @@ -902,7 +911,7 @@ func TestHandleUpdateClusterUpdatesStatefulSets(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { const replicas = 3 - cluster, deps := setupTestCluster(t, *test.cluster, test.sets, nil, replicas, 1, false) + cluster, deps := setupTestCluster(t, *test.cluster, test.sets, nil, replicas, 1, false, false) defer deps.cleanup() c := deps.newController(t) @@ -1057,7 +1066,7 @@ func TestHandleUpdateClusterOnDeleteStrategy(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodes := int32(len(test.pods) / len(test.sets)) cluster, deps := setupTestCluster( - t, *rawCluster, test.sets, test.pods, int(replicas), nodes, true, + t, *rawCluster, test.sets, test.pods, int(replicas), nodes, true, false, ) defer deps.cleanup() c := deps.newController(t) @@ -1077,6 +1086,148 @@ func TestHandleUpdateClusterOnDeleteStrategy(t *testing.T) { } } +func TestHandleResizeClusterOnDeleteStrategy(t *testing.T) { + var ( + clusterMeta = newMeta("cluster1", map[string]string{ + "operator.m3db.io/app": "m3db", + "operator.m3db.io/cluster": "cluster1", + }, nil) + sets = []*metav1.ObjectMeta{ + newMeta("cluster1-rep0", nil, nil), + newMeta("cluster1-rep1", nil, map[string]string{ + annotations.ParallelUpdateInProgress: "1", + }), + newMeta("cluster1-rep2", nil, nil), + } + + pods = generatePods("cluster1", 3, 1, "current-revision") + ) + cluster, deps := setupTestCluster(t, *clusterMeta, sets, pods, 3, 1, true, true) + defer deps.cleanup() + c := deps.newController(t) + + pl := placement.NewPlacement() + insts := []placement.Instance{} + for _, pod := range pods { + podIdentity := &myspec.PodIdentity{Name: pod.Name} + deps.idProvider.EXPECT(). + Identity(pod, gomock.Any()). + AnyTimes(). + Return(podIdentity, nil) + + podID, err := podidentity.IdentityJSON(podIdentity) + require.NoError(t, err) + + statefulSetName, ok := pod.Labels[labels.StatefulSet] + require.True(t, ok) + sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(statefulSetName) + require.NoError(t, err) + + instMock := placement.NewMockInstance(deps.mockController) + instMock.EXPECT().IsAvailable().AnyTimes().Return(true) + instMock.EXPECT().ID().AnyTimes().Return(podID) + instMock.EXPECT().Shards().AnyTimes().Return(shard.NewShards(nil)) + instMock.EXPECT().Hostname().AnyTimes().Return(pod.Name) + instMock.EXPECT().IsolationGroup().AnyTimes().Return(sts.Labels[labels.IsolationGroup]) + insts = append(insts, instMock) + } + pl.SetInstances(insts) + + deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{ + Registry: &namespacepb.Registry{}, + }, nil) + deps.placementClient.EXPECT().Get().AnyTimes().Return(pl, nil) + deps.placementClient.EXPECT().Add(gomock.Any()).AnyTimes().Do( + func(instsToAdd []*placementpb.Instance) { + insts := pl.Instances() + for _, instProto := range instsToAdd { + inst, err := placement.NewInstanceFromProto(instProto) + require.NoError(t, err) + instMock := placement.NewMockInstance(deps.mockController) + instMock.EXPECT().IsAvailable().AnyTimes().Return(true) + instMock.EXPECT().ID().AnyTimes().Return(inst.ID()) + instMock.EXPECT().Shards().AnyTimes().Return(inst.Shards()) + instMock.EXPECT().Hostname().AnyTimes().Return(inst.Hostname()) + instMock.EXPECT().IsolationGroup().AnyTimes().Return(inst.IsolationGroup()) + insts = append(insts, instMock) + } + + pl.SetInstances(insts) + }).Return(nil) + + // Scale up all isolation groups + for i := 0; i < len(cluster.Spec.IsolationGroups); i++ { + cluster.Spec.IsolationGroups[i].NumInstances = 3 + } + + //nolint:forcetypeassert + c.kubeClient.(*kubefake.Clientset). + PrependReactor("patch", "statefulsets", + func(action ktesting.Action) (bool, runtime.Object, error) { + // We are expecting patch only for Spec.Replicas as this test tests scaling + patchAction := action.(kubetesting.PatchActionImpl) + var patched appsv1.StatefulSet + require.NoError(t, json.Unmarshal(patchAction.Patch, &patched)) + + // If replicas are nil, then it must be deletion of annotations + if patched.Spec.Replicas == nil { + return false, nil, nil + } + + numReplicas := *patched.Spec.Replicas + + // Pretending to be stateful set controller. We want to simulate creation of pod because + // scaling is happening one pod at a time. + sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(patchAction.Name) + require.NoError(t, err) + sts.Spec.Replicas = &numReplicas + sts.Status.UpdatedReplicas = numReplicas + sts.Status.ReadyReplicas = numReplicas + + pod := generatePodForStatefulSet("cluster1", sts.Name, int(numReplicas), "current-revision") + // Need to copy labels for pod to be found in placement update + for k, v := range sts.Labels { + pod.Labels[k] = v + } + require.NoError(t, deps.kubeClient.Tracker().Add(pod)) + deps.idProvider.EXPECT(). + Identity(pod, gomock.Any()). + AnyTimes(). + Return(&myspec.PodIdentity{Name: pod.Name}, nil) + + return true, sts, nil + }) + + for i := 0; i < 10; i++ { + require.NoError(t, c.handleClusterUpdate(context.Background(), cluster)) + } + + requireStatefulSetReplicas(t, deps, "cluster1-rep0", 1) + requireStatefulSetReplicas(t, deps, "cluster1-rep1", 3) + requireStatefulSetReplicas(t, deps, "cluster1-rep2", 1) + + sts, err := deps.statefulSetLister.StatefulSets("namespace").Get("cluster1-rep1") + require.NoError(t, err) + + // Let's make sure that annotation is removed when everything is done + _, ok := sts.Annotations[annotations.ParallelUpdateInProgress] + require.False(t, ok) +} + +func requireStatefulSetReplicas( + t *testing.T, + deps *testDeps, + statefulSetName string, + expectedReplicas int32, +) { + t.Helper() + + sts, err := deps.statefulSetLister.StatefulSets("namespace").Get(statefulSetName) + require.NoError(t, err) + require.NotNil(t, t, sts.Spec.Replicas) + require.Equal(t, expectedReplicas, *sts.Spec.Replicas) +} + func TestHandleUpdateClusterFrozen(t *testing.T) { var ( clusterMeta = newMeta("cluster1", map[string]string{ @@ -1088,7 +1239,7 @@ func TestHandleUpdateClusterFrozen(t *testing.T) { newMeta("cluster1-rep0", nil, nil), } ) - cluster, deps := setupTestCluster(t, *clusterMeta, sets, nil, 3, 1, false) + cluster, deps := setupTestCluster(t, *clusterMeta, sets, nil, 3, 1, false, false) defer deps.cleanup() controller := deps.newController(t) @@ -1137,18 +1288,37 @@ func generatePods(clusterName string, rf int32, nodes int32, revision string) [] var pods []*corev1.Pod for i := 0; i < int(rf); i++ { for j := 0; j < int(nodes); j++ { - pods = append(pods, &corev1.Pod{ - ObjectMeta: *newMeta(fmt.Sprintf("%s-rep%d-%d", clusterName, i, j), map[string]string{ - "controller-revision-hash": fmt.Sprintf("%s-rep%d-%s", clusterName, i, revision), - "operator.m3db.io/stateful-set": fmt.Sprintf("%s-rep%d", clusterName, i), - }, nil), - }) + pods = append(pods, generatePod(clusterName, i, j, revision)) } } return pods } +func generatePod(clusterName string, rep, node int, revision string) *corev1.Pod { + statefulSetName := fmt.Sprintf("%s-rep%d", clusterName, rep) + + return generatePodForStatefulSet(clusterName, statefulSetName, node, revision) +} + +func generatePodForStatefulSet( + clusterName string, + statefulSetName string, + node int, + revision string, +) *corev1.Pod { + l := labels.BaseLabels(&myspec.M3DBCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + }, + }) + l["controller-revision-hash"] = fmt.Sprintf("%s-%s", statefulSetName, revision) + l["operator.m3db.io/stateful-set"] = statefulSetName + return &corev1.Pod{ + ObjectMeta: *newMeta(fmt.Sprintf("%s-%d", statefulSetName, node), l, nil), + } +} + func mockPlacement(deps *testDeps, replicas int32) { deps.namespaceClient.EXPECT().List().AnyTimes().Return(&admin.NamespaceGetResponse{ Registry: &namespacepb.Registry{},