Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ type ClusterSpec struct {
// SidecarVolumes is used to add any volumes that are required by sidecar
// containers.
SidecarVolumes []corev1.Volume `json:"sidecarVolumes,omitempty"`

// OnDeleteUpdateStrategy sets StatefulSets created by the operator to
// have OnDelete as the update strategy instead of RollingUpdate.
OnDeleteUpdateStrategy bool `json:"onDeleteUpdateStrategy,omitempty"`
}

// ExternalCoordinatorConfig defines parameters for using an external
Expand Down
297 changes: 252 additions & 45 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"sync"

"github.com/m3db/m3db-operator/pkg/apis/m3dboperator"
Expand Down Expand Up @@ -57,12 +58,12 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"

jsonpatch "github.com/evanphx/json-patch"
pkgerrors "github.com/pkg/errors"
"github.com/uber-go/tally"
"go.uber.org/zap"
"k8s.io/utils/pointer"
)

const (
Expand Down Expand Up @@ -315,7 +316,6 @@ func (c *M3DBController) processClusterQueueItem() bool {

return nil
}(obj)

if err != nil {
runtime.HandleError(err)
}
Expand Down Expand Up @@ -560,29 +560,23 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
return err
}

if !update {
continue
var processNext bool
// The expectation is that these update methods will return false when they have updated
// actual pods in the statefulset. This will stop any further processing of pods until
// the updated ones are back healthy
if cluster.Spec.OnDeleteUpdateStrategy {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if we were to update a StatefulSet's template and its updateStrategy if the controller will use the new updateStrategy to update the set or the old one? Just wondering about a potential edge case where we update both, and the StatefulSet controller used the old strategy for the update but because we're checking cluster.Spec.OnDeleteUpdateStrategy here we expect the new one will be used. I would think the controller would use the new strategy that's in the update but Kube has surprised me before 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so I just tested this out and it does respect the new updateStrategy. So when you change the template and set OnDelete, k8s (correctly, imo) waits for you to roll the statefulset pods to pick up the update.

processNext, err = c.updateWithOnDeleteStrategy(cluster, actual, expected, update)
} else {
processNext, err = c.updateWithRollingUpdateStrategy(cluster, actual, expected, update)
}

_, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected)
if err != nil {
c.logger.Error(err.Error())
return err
}

c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)
if processNext {
continue
}

return nil
}
Expand Down Expand Up @@ -715,31 +709,13 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
}
setLogger.Info("resizing set, desired != current", zap.Int32("newSize", newCount))

setBytes, err := json.Marshal(set)
if err != nil {
return err
}

set.Spec.Replicas = pointer.Int32Ptr(newCount)

setModifiedBytes, err := json.Marshal(set)
if err != nil {
return err
}

patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes)
if err != nil {
if err = c.patchStatefulSet(set, func(set *appsv1.StatefulSet) {
set.Spec.Replicas = pointer.Int32Ptr(newCount)
}); err != nil {
c.logger.Error("error patching statefulset", zap.Error(err))
return err
}

set, err = c.kubeClient.
AppsV1().
StatefulSets(set.Namespace).
Patch(set.Name, types.MergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("error updating statefulset %s: %v", set.Name, err)
}

return nil
}

Expand All @@ -765,6 +741,220 @@ func (c *M3DBController) handleClusterUpdate(cluster *myspec.M3DBCluster) error
return nil
}

func (c *M3DBController) patchStatefulSet(
set *appsv1.StatefulSet,
action func(set *appsv1.StatefulSet),
) error {
setBytes, err := json.Marshal(set)
if err != nil {
return err
}

action(set)

setModifiedBytes, err := json.Marshal(set)
if err != nil {
return err
}

patchBytes, err := jsonpatch.CreateMergePatch(setBytes, setModifiedBytes)
if err != nil {
return err
}

set, err = c.kubeClient.
AppsV1().
StatefulSets(set.Namespace).
Patch(set.Name, types.MergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("error updating statefulset %s: %v", set.Name, err)
}

return nil
}

func (c *M3DBController) updateWithRollingUpdateStrategy(
cluster *myspec.M3DBCluster,
actual *appsv1.StatefulSet,
expected *appsv1.StatefulSet,
update bool,
) (bool, error) {
if !update {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it make sense to move the logic for checking the update bool to where we call updateWithRollingUpdateStrategy or updateWithOnDeleteStrategy. Seems if update is false both functions become noop's so just wondering if it would be simpler to only call them when update is true.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually still want to call updateWithOnDeleteStrategy even if we're not updating the statefulset as we may still need to roll nodes in the set. Simplified the logic to make this a little more clear

return true, nil
}

_, err := c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected)
if err != nil {
c.logger.Error(err.Error())
return false, err
}

c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)

return false, err
}

func (c *M3DBController) updateWithOnDeleteStrategy(
cluster *myspec.M3DBCluster,
actual *appsv1.StatefulSet,
expected *appsv1.StatefulSet,
update bool,
) (bool, error) {
logger := c.logger.With(
zap.String("namespace", cluster.Namespace), zap.String("name", actual.Name),
)

if update {
updated, err := c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Update(expected)
if err != nil {
logger.Error("error updating statefulset", zap.Error(err))
return false, err
}
actual = updated

c.logger.Info("updated statefulset",
zap.String("name", expected.Name),
zap.Int32("actual_readyReplicas", actual.Status.ReadyReplicas),
zap.Int32("actual_updatedReplicas", actual.Status.UpdatedReplicas),
zap.String("actual_currentRevision", actual.Status.CurrentRevision),
zap.String("actual_updateRevision", actual.Status.UpdateRevision),
zap.Int32("expected_readyReplicas", expected.Status.ReadyReplicas),
zap.Int32("expected_updatedReplicas", expected.Status.UpdatedReplicas),
zap.String("expected_currentRevision", expected.Status.CurrentRevision),
zap.String("expected_updateRevision", expected.Status.UpdateRevision),
zap.Int64("generation", expected.Generation),
zap.Int64("observed", expected.Status.ObservedGeneration),
)
} else if _, ok := actual.Annotations[annotations.ParallelUpdateInProgress]; !ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this method is doing a lot with several different flows. is there anyway we can break this up?

it seems updating the sts is the same between the 2 different rollout strategies, so that's probably something we could break out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, simplified this to make it a little more clear.

logger.Debug("no update and no rollout in progress so move to next statefulset")
return true, nil
}

numPods, err := c.getMaxPodsToUpdate(actual)
if err != nil {
logger.Error("error retrieving number of pods to update in parallel", zap.Error(err))
return false, err
}

if numPods == 0 {
err = errors.New("parallel update annotation set to 0. will not perform pod updates")
logger.Error(err.Error())
return false, err
}

pods, err := c.podsToUpdate(cluster.Namespace, actual, numPods)
if err != nil {
logger.Error("error retrieving pods to update", zap.Error(err))
return false, err
}

if len(pods) > 0 {
names := make([]string, 0, len(pods))
for _, pod := range pods {
if err := c.kubeClient.CoreV1().
Pods(pod.Namespace).
Delete(pod.Name, &metav1.DeleteOptions{}); err != nil {
logger.Error("error deleting pod", zap.Error(err))
return false, err
}
names = append(names, pod.Name)
}
logger.Info("restarting pods", zap.Any("pods", names))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can use zap.Strings here since names is a []string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, TIL!

return false, nil
} else {
// If there are no pods to update, we're fully rolled out so remove
// the update annotation.
if err = c.patchStatefulSet(actual, func(set *appsv1.StatefulSet) {
delete(set.Annotations, annotations.ParallelUpdateInProgress)

// NB(nate): K8s handles this for you when using the RollingUpdate update strategy.
// However, since OnDelete removes k8s from the pod update process, it's our
// responsibility to set this once done rolling out.
set.Status.CurrentReplicas = set.Status.UpdatedReplicas
set.Status.CurrentRevision = set.Status.UpdateRevision
}); err != nil {
logger.Error("error patching statefulset", zap.Error(err))
return false, err
}
logger.Info("update complete")
}

return true, nil
}

func (c *M3DBController) getMaxPodsToUpdate(actual *appsv1.StatefulSet) (int, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a free function rather than a method on *M3DBController?

var (
rawVal string
ok bool
)
if rawVal, ok = actual.Annotations[annotations.ParallelUpdateInProgress]; !ok {
return 0, errors.New("parallel update annotation missing during statefulset update")
}

var (
maxPodsPerUpdate int
err error
)
if maxPodsPerUpdate, err = strconv.Atoi(rawVal); err != nil {
return 0, fmt.Errorf("failed to parse parallel update annotation: %v", rawVal)
}

return maxPodsPerUpdate, nil
}

func (c *M3DBController) podsToUpdate(
namespace string,
sts *appsv1.StatefulSet,
numPods int,
) ([]*corev1.Pod, error) {
currRev := sts.Status.CurrentRevision
if currRev == "" {
return nil, errors.New("currentRevision empty")
} else if currRev == sts.Status.UpdateRevision {
// No pods to update because current and update revision are the same
return nil, nil
}

label := map[string]string{
"controller-revision-hash": sts.Status.CurrentRevision,
}
pods, err := c.podLister.Pods(namespace).List(klabels.Set(label).AsSelector())
if err != nil {
return nil, err
} else if len(pods) == 0 {
return nil, nil
}

// NB(nate): Sort here so updates are always done in a consistent order.
// Statefulset 0 -> N: Pod 0 -> N
sortedPods, err := sortPods(pods)
if err != nil {
return nil, err
}

var toUpdate []*corev1.Pod
for _, pod := range sortedPods {
toUpdate = append(toUpdate, pod.pod)
if len(toUpdate) == numPods {
break
}
}

return toUpdate, nil
}

func instancesInIsoGroup(pl m3placement.Placement, isoGroup string) []m3placement.Instance {
insts := []m3placement.Instance{}
for _, inst := range pl.Instances() {
Expand Down Expand Up @@ -876,7 +1066,6 @@ func (c *M3DBController) processPodQueueItem() bool {

return nil
}(obj)

if err != nil {
runtime.HandleError(err)
}
Expand Down Expand Up @@ -1039,7 +1228,16 @@ func updatedStatefulSet(
// The operator will only perform an update if the current StatefulSet has been
// annotated to indicate that it is okay to update it.
if val, ok := actual.Annotations[annotations.Update]; !ok || val != annotations.EnabledVal {
return nil, false, nil
str, ok := actual.Annotations[annotations.ParallelUpdate]
if !ok {
return nil, false, nil
}

if parallelVal, err := strconv.Atoi(str); err != nil {
return nil, false, err
} else if parallelVal < 1 {
return nil, false, fmt.Errorf("parallel update value invalid: %v", str)
}
}

expected, err := m3db.GenerateStatefulSet(cluster, isoGroup.Name, isoGroup.NumInstances)
Expand Down Expand Up @@ -1067,11 +1265,12 @@ func updatedStatefulSet(
}

// If we don't need to perform an update to the StatefulSet's spec, but the StatefulSet
// has the update annotation, we'll still update the StatefulSet to remove the update
// annotation. This ensures that users can always set the update annotation and then
// has an update annotation, we'll still update the StatefulSet to remove the update
// annotation. This ensures that users can always set an update annotation and then
// wait for it to be removed to know if the operator has processed a StatefulSet.
if !update {
delete(actual.Annotations, annotations.Update)
delete(actual.Annotations, annotations.ParallelUpdate)
return actual, true, nil
}

Expand All @@ -1098,6 +1297,14 @@ func copyAnnotations(expected, actual *appsv1.StatefulSet) {
continue
}

// For parallel updates, remove the initial annotation added by the client and add rollout
// in progress annotation. This will ensure that in future passes we don't attempt to
// update the statefulset again unnecessarily and simply roll pods that need to pick up
// updates.
if k == annotations.ParallelUpdate {
expected.Annotations[annotations.ParallelUpdateInProgress] = v
}

if _, ok := expected.Annotations[k]; !ok {
expected.Annotations[k] = v
}
Expand Down
Loading