Skip to content

Commit d9036b3

Browse files
committed
Add remaining changes to PR
This commit includes all changes for feature #181 that have not been split up into small stacked PRs. It should not be merged and will later be undone and split into smaller logical chunks of work.
1 parent 6993adf commit d9036b3

File tree

4 files changed

+200
-18
lines changed

4 files changed

+200
-18
lines changed

internal/controller/etcdcluster_controller.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
9191
}
9292

9393
state := observables{}
94+
state.instance = instance
9495

9596
// create two services and the pdb
9697
err = r.ensureUnconditionalObjects(ctx, instance)
@@ -112,11 +113,40 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
112113
}
113114
state.endpointsFound = clusterClient != nil && singleClients != nil
114115

116+
if clusterClient != nil {
117+
state.endpoints = clusterClient.Endpoints()
118+
}
119+
120+
// fetch PVCs
121+
state.pvcs, err = factory.PVCs(ctx, instance, r.Client)
122+
if err != nil {
123+
return ctrl.Result{}, err
124+
}
125+
115126
if !state.endpointsFound {
116127
if !state.stsExists {
117-
// TODO: happy path for new cluster creation
118-
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
128+
return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing
119129
}
130+
// else try reconciling the sts
131+
existingSts := state.statefulSet.DeepCopy()
132+
desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing
133+
existingSts.Spec.Template.Spec = desiredSts.Spec.Template.Spec
134+
err := r.patchOrCreateObject(ctx, existingSts)
135+
if err != nil {
136+
return ctrl.Result{}, err
137+
}
138+
state.statefulSet = *existingSts
139+
if existingSts.Status.ReadyReplicas != *existingSts.Spec.Replicas { // TODO: this check might not be the best to check for a ready sts
140+
return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
141+
}
142+
if *existingSts.Spec.Replicas > 0 {
143+
return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)")
144+
}
145+
if *instance.Spec.Replicas == 0 {
146+
// cluster successfully scaled down to zero
147+
return ctrl.Result{}, nil
148+
}
149+
return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing
120150
}
121151

122152
// get status of every endpoint and member list from every endpoint
@@ -661,3 +691,33 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
661691
}
662692
return nil
663693
}
694+
695+
func (r *EtcdClusterReconciler) patchOrCreateObject(ctx context.Context, obj client.Object) error {
696+
err := r.Patch(ctx, obj, client.Apply, &client.PatchOptions{FieldManager: "etcd-operator"}, client.ForceOwnership)
697+
if err == nil {
698+
return nil
699+
}
700+
if client.IgnoreNotFound(err) == nil {
701+
err = r.Create(ctx, obj)
702+
}
703+
return err
704+
}
705+
706+
// TODO!
707+
func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, state *observables) (ctrl.Result, error) {
708+
cm := factory.TemplateClusterStateConfigMap(state.instance, "new", state.desiredReplicas())
709+
err := ctrl.SetControllerReference(state.instance, cm, r.Scheme)
710+
if err != nil {
711+
return ctrl.Result{}, err
712+
}
713+
err = r.patchOrCreateObject(ctx, cm)
714+
if err != nil {
715+
return ctrl.Result{}, err
716+
}
717+
panic("not yet implemented")
718+
}
719+
720+
// TODO!
721+
func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) {
722+
panic("not yet implemented")
723+
}

internal/controller/factory/pvc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
)
3232

33+
func PVCLabels(cluster *etcdaenixiov1alpha1.EtcdCluster) map[string]string {
34+
labels := PodLabels(cluster)
35+
for key, value := range cluster.Spec.Storage.VolumeClaimTemplate.Labels {
36+
labels[key] = value
37+
}
38+
return labels
39+
}
40+
3341
func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
3442
if len(cluster.Spec.Storage.VolumeClaimTemplate.Name) > 0 {
3543
return cluster.Spec.Storage.VolumeClaimTemplate.Name
@@ -38,6 +46,16 @@ func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
3846
return "data"
3947
}
4048

49+
func PVCs(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, cli client.Client) ([]corev1.PersistentVolumeClaim, error) {
50+
labels := PVCLabels(cluster)
51+
pvcs := corev1.PersistentVolumeClaimList{}
52+
err := cli.List(ctx, &pvcs, client.MatchingLabels(labels))
53+
if err != nil {
54+
return nil, err
55+
}
56+
return pvcs.Items, nil
57+
}
58+
4159
// UpdatePersistentVolumeClaims checks and updates the sizes of PVCs in an EtcdCluster if the specified storage size is larger than the current.
4260
func UpdatePersistentVolumeClaims(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, rclient client.Client) error {
4361
labelSelector := labels.SelectorFromSet(labels.Set{

internal/controller/factory/statefulset.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,30 @@ const (
4141
defaultBackendQuotaBytesFraction = 0.95
4242
)
4343

44+
// TODO!
45+
func TemplateStatefulSet() *appsv1.StatefulSet {
46+
panic("not yet implemented")
47+
}
48+
49+
func PodLabels(cluster *etcdaenixiov1alpha1.EtcdCluster) map[string]string {
50+
labels := NewLabelsBuilder().WithName().WithInstance(cluster.Name).WithManagedBy()
51+
52+
if cluster.Spec.PodTemplate.Labels != nil {
53+
for key, value := range cluster.Spec.PodTemplate.Labels {
54+
labels[key] = value
55+
}
56+
}
57+
58+
return labels
59+
}
60+
4461
func CreateOrUpdateStatefulSet(
4562
ctx context.Context,
4663
cluster *etcdaenixiov1alpha1.EtcdCluster,
4764
rclient client.Client,
4865
) error {
4966
podMetadata := metav1.ObjectMeta{
50-
Labels: NewLabelsBuilder().WithName().WithInstance(cluster.Name).WithManagedBy(),
51-
}
52-
53-
if cluster.Spec.PodTemplate.Labels != nil {
54-
for key, value := range cluster.Spec.PodTemplate.Labels {
55-
podMetadata.Labels[key] = value
56-
}
67+
Labels: PodLabels(cluster),
5768
}
5869

5970
if cluster.Spec.PodTemplate.Annotations != nil {

internal/controller/observables.go

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package controller
22

33
import (
44
"context"
5+
"strconv"
6+
"strings"
57
"sync"
68

9+
"github.com/aenix-io/etcd-operator/api/v1alpha1"
10+
"github.com/aenix-io/etcd-operator/pkg/set"
711
clientv3 "go.etcd.io/etcd/client/v3"
812
appsv1 "k8s.io/api/apps/v1"
913
corev1 "k8s.io/api/core/v1"
@@ -22,13 +26,14 @@ type etcdStatus struct {
2226
// observables stores observations that the operator can make about
2327
// states of objects in kubernetes
2428
type observables struct {
29+
instance *v1alpha1.EtcdCluster
2530
statefulSet appsv1.StatefulSet
2631
stsExists bool
32+
endpoints []string
2733
endpointsFound bool
2834
etcdStatuses []etcdStatus
2935
clusterID uint64
30-
_ int
31-
_ []corev1.PersistentVolumeClaim
36+
pvcs []corev1.PersistentVolumeClaim
3237
}
3338

3439
// setClusterID populates the clusterID field based on etcdStatuses
@@ -43,15 +48,43 @@ func (o *observables) setClusterID() {
4348

4449
// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
4550
// If more than one unique ID is reported, cluster is in splitbrain.
51+
// Also if members have different opinions on the list of members, this is
52+
// also a splitbrain.
4653
func (o *observables) inSplitbrain() bool {
54+
return o.clusterIDsAllEqual() && o.memberListsAllEqual()
55+
}
56+
57+
func (o *observables) clusterIDsAllEqual() bool {
58+
ids := set.New[uint64]()
4759
for i := range o.etcdStatuses {
4860
if o.etcdStatuses[i].endpointStatus != nil {
49-
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
50-
return true
61+
ids.Add(o.etcdStatuses[i].endpointStatus.Header.ClusterId)
62+
}
63+
}
64+
return len(ids) <= 1
65+
}
66+
67+
func (o *observables) memberListsAllEqual() bool {
68+
type m struct {
69+
Name string
70+
ID uint64
71+
}
72+
memberLists := make([]set.Set[m], 0, len(o.etcdStatuses))
73+
for i := range o.etcdStatuses {
74+
if o.etcdStatuses[i].memberList != nil {
75+
memberSet := set.New[m]()
76+
for _, member := range o.etcdStatuses[i].memberList.Members {
77+
memberSet.Add(m{member.Name, member.ID})
5178
}
79+
memberLists = append(memberLists, memberSet)
80+
}
81+
}
82+
for i := range memberLists {
83+
if !memberLists[0].Equals(memberLists[i]) {
84+
return false
5285
}
5386
}
54-
return false
87+
return true
5588
}
5689

5790
// fill takes a single-endpoint client and populates the fields of etcdStatus
@@ -67,14 +100,74 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
67100
wg.Wait()
68101
}
69102

70-
// TODO: make a real function
71-
func (o *observables) _() int {
103+
func (o *observables) pvcMaxIndex() (max int) {
104+
max = -1
105+
for i := range o.pvcs {
106+
tokens := strings.Split(o.pvcs[i].Name, "-")
107+
index, err := strconv.Atoi(tokens[len(tokens)-1])
108+
if err != nil {
109+
continue
110+
}
111+
if index > max {
112+
max = index
113+
}
114+
}
115+
return max
116+
}
117+
118+
func (o *observables) endpointMaxIndex() (max int) {
119+
for i := range o.endpoints {
120+
tokens := strings.Split(o.endpoints[i], ":")
121+
if len(tokens) < 2 {
122+
continue
123+
}
124+
tokens = strings.Split(tokens[len(tokens)-2], "-")
125+
index, err := strconv.Atoi(tokens[len(tokens)-1])
126+
if err != nil {
127+
continue
128+
}
129+
if index > max {
130+
max = index
131+
}
132+
}
133+
return max
134+
}
135+
136+
// TODO: make a real function to determine the right number of replicas.
137+
// Hint: if ClientURL in the member list is absent, the member has not yet
138+
// started, but if the name field is populated, this is a member of the
139+
// initial cluster. If the name field is empty, this member has just been
140+
// added with etcdctl member add (or equivalent API call).
141+
func (o *observables) desiredReplicas() (max int) {
142+
max = -1
72143
if o.etcdStatuses != nil {
73144
for i := range o.etcdStatuses {
74145
if o.etcdStatuses[i].memberList != nil {
75-
return len(o.etcdStatuses[i].memberList.Members)
146+
for j := range o.etcdStatuses[i].memberList.Members {
147+
tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-")
148+
index, err := strconv.Atoi(tokens[len(tokens)-1])
149+
if err != nil {
150+
continue
151+
}
152+
if index > max {
153+
max = index
154+
}
155+
}
76156
}
77157
}
78158
}
79-
return 0
159+
if max > -1 {
160+
return max + 1
161+
}
162+
163+
if epMax := o.endpointMaxIndex(); epMax > max {
164+
max = epMax
165+
}
166+
if pvcMax := o.pvcMaxIndex(); pvcMax > max {
167+
max = pvcMax
168+
}
169+
if max == -1 {
170+
return int(*o.instance.Spec.Replicas)
171+
}
172+
return max + 1
80173
}

0 commit comments

Comments
 (0)