diff --git a/Dockerfile b/Dockerfile
index ab23db9f..390ecef7 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -13,6 +13,7 @@ RUN go mod download
# Copy the go source
COPY cmd/ ./cmd/
COPY api/ ./api/
+COPY pkg/ ./pkg/
COPY internal/ ./internal/
# Build
diff --git a/cmd/app/commandline.go b/cmd/app/commandline.go
index b98bc011..2d32b69e 100644
--- a/cmd/app/commandline.go
+++ b/cmd/app/commandline.go
@@ -29,6 +29,7 @@ import (
type Flags struct {
Kubeconfig string
+ ClusterDomain string
MetricsAddress string
ProbeAddress string
LeaderElection bool
@@ -47,6 +48,7 @@ func ParseCmdLine() Flags {
pflag.CommandLine = pflag.NewFlagSet(os.Args[0], pflag.ContinueOnError)
pflag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
+ pflag.String("cluster-domain", "cluster.local", "The cluster domain configured in kube-dns")
pflag.String("metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.String("health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.Bool("leader-elect", false, "Enable leader election for controller manager. "+
@@ -78,6 +80,7 @@ func ParseCmdLine() Flags {
return Flags{
Kubeconfig: viper.GetString("kubeconfig"),
+ ClusterDomain: viper.GetString("cluster-domain"),
MetricsAddress: viper.GetString("metrics-bind-address"),
ProbeAddress: viper.GetString("health-probe-bind-address"),
LeaderElection: viper.GetBool("leader-elect"),
diff --git a/go.mod b/go.mod
index 66839769..a420aa97 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
go.etcd.io/etcd/client/v3 v3.5.14
+ golang.org/x/sync v0.7.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
diff --git a/go.sum b/go.sum
index d60eb6e0..386a34cb 100644
--- a/go.sum
+++ b/go.sum
@@ -190,6 +190,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go
index d9e47ad7..334878ff 100644
--- a/internal/controller/etcdcluster_controller.go
+++ b/internal/controller/etcdcluster_controller.go
@@ -75,8 +75,9 @@ type EtcdClusterReconciler struct {
// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debug(ctx, "reconciling object")
- instance := &etcdaenixiov1alpha1.EtcdCluster{}
- err := r.Get(ctx, req.NamespacedName, instance)
+ state := &observables{}
+ state.instance = &etcdaenixiov1alpha1.EtcdCluster{}
+ err := r.Get(ctx, req.NamespacedName, state.instance)
if err != nil {
if errors.IsNotFound(err) {
log.Debug(ctx, "object not found")
@@ -86,15 +87,12 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, err
}
// If object is being deleted, skipping reconciliation
- if !instance.DeletionTimestamp.IsZero() {
+ if !state.instance.DeletionTimestamp.IsZero() {
return reconcile.Result{}, nil
}
- state := observables{}
- state.instance = instance
-
// create two services and the pdb
- err = r.ensureUnconditionalObjects(ctx, instance)
+ err = r.ensureUnconditionalObjects(ctx, state)
if err != nil {
return ctrl.Result{}, err
}
@@ -104,20 +102,53 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
}
- state.stsExists = state.statefulSet.UID != ""
+ // state.stsExists = state.statefulSet.UID != ""
// fetch endpoints
- clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
+ clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, state.instance, r.Client)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+ // state.endpointsFound = clusterClient != nil && singleClients != nil
+
+ // if clusterClient != nil {
+ // state.endpoints = clusterClient.Endpoints()
+ // }
+ state.clusterClient = clusterClient
+ state.singleClients = singleClients
+
+ // fetch PVCs
+ state.pvcs, err = factory.PVCs(ctx, state.instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
- state.endpointsFound = clusterClient != nil && singleClients != nil
- if !state.endpointsFound {
- if !state.stsExists {
- // TODO: happy path for new cluster creation
- log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
+ if !state.endpointsFound() {
+ if !state.statefulSetExists() {
+ return r.createClusterFromScratch(ctx, state) // TODO: needs implementing
}
+
+ // update sts pod template (and only pod template) if it doesn't match desired state
+ if !state.statefulSetPodSpecCorrect() { // TODO: needs implementing
+ desiredSts := factory.TemplateStatefulSet(state.instance) // TODO: needs implementing
+ state.statefulSet.Spec.Template.Spec = desiredSts.Spec.Template.Spec
+ return ctrl.Result{}, r.patchOrCreateObject(ctx, &state.statefulSet)
+ }
+
+ if !state.statefulSetReady() { // TODO: needs improved implementation?
+ return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready")
+ }
+
+ if *state.statefulSet.Spec.Replicas > 0 {
+ return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)")
+ }
+
+ if *state.instance.Spec.Replicas == 0 {
+ // cluster successfully scaled down to zero
+ return ctrl.Result{}, nil
+ }
+
+ return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing
}
// get status of every endpoint and member list from every endpoint
@@ -135,11 +166,24 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
wg.Wait()
cancel()
}
+
+ memberReached := false
+ for i := range state.etcdStatuses {
+ if state.etcdStatuses[i].endpointStatus != nil {
+ memberReached = true
+ break
+ }
+ }
+
+ if !memberReached {
+ return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
+ }
+
state.setClusterID()
if state.inSplitbrain() {
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
meta.SetStatusCondition(
- &instance.Status.Conditions,
+ &state.instance.Status.Conditions,
metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionError,
Status: metav1.ConditionTrue,
@@ -147,43 +191,35 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage),
},
)
- return r.updateStatus(ctx, instance)
+ return r.updateStatus(ctx, state)
}
- // fill conditions
- if len(instance.Status.Conditions) == 0 {
- meta.SetStatusCondition(
- &instance.Status.Conditions,
- metav1.Condition{
- Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
- Status: metav1.ConditionFalse,
- Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitStarted),
- Message: string(etcdaenixiov1alpha1.EtcdInitCondNegMessage),
- },
- )
- meta.SetStatusCondition(
- &instance.Status.Conditions,
- metav1.Condition{
- Type: etcdaenixiov1alpha1.EtcdConditionReady,
- Status: metav1.ConditionFalse,
- Reason: string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum),
- Message: string(etcdaenixiov1alpha1.EtcdReadyCondNegWaitingForQuorum),
- },
- )
+
+ if !state.clusterHasQuorum() {
+ // we can't do anything about this but we still return an error to check on the cluster from time to time
+ return ctrl.Result{}, fmt.Errorf("cluster has lost quorum")
}
- // if size is different we have to remove statefulset it will be recreated in the next step
- if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, instance); err != nil {
+ if state.hasLearners() {
+ return ctrl.Result{}, r.promoteLearners(ctx)
+ }
+
+ if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil {
return ctrl.Result{}, err
}
- // ensure managed resources
- if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
- return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
+ if !state.statefulSetPodSpecCorrect() {
+ return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx)
+ }
+
+ // if size is different we have to remove statefulset it will be recreated in the next step
+ if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, state); err != nil {
+ return ctrl.Result{}, err
}
+ /* Saved as an example
// set cluster initialization condition
meta.SetStatusCondition(
- &instance.Status.Conditions,
+ &state.instance.Status.Conditions,
metav1.Condition{
Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
Status: metav1.ConditionTrue,
@@ -191,62 +227,18 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Message: string(etcdaenixiov1alpha1.EtcdInitCondPosMessage),
},
)
-
- // check sts condition
- clusterReady, err := r.isStatefulSetReady(ctx, instance)
- if err != nil {
- log.Error(ctx, err, "failed to check etcd cluster state")
- return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot check Cluster readiness: %w", err))
- }
-
- if clusterReady && *instance.Spec.Replicas != int32(0) {
- err := r.configureAuth(ctx, instance)
- if err != nil {
- return ctrl.Result{}, err
- }
- }
-
- // set cluster readiness condition
- existingCondition := meta.FindStatusCondition(instance.Status.Conditions, etcdaenixiov1alpha1.EtcdConditionReady)
- if existingCondition != nil &&
- existingCondition.Reason == string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum) &&
- !clusterReady {
- // if we are still "waiting for first quorum establishment" and the StatefulSet
- // isn't ready yet, don't update the EtcdConditionReady, but circuit-break.
- return r.updateStatus(ctx, instance)
- }
-
- // otherwise, EtcdConditionReady is set to true/false with the reason that the
- // StatefulSet is or isn't ready.
- reason := etcdaenixiov1alpha1.EtcdCondTypeStatefulSetNotReady
- message := etcdaenixiov1alpha1.EtcdReadyCondNegMessage
- ready := metav1.ConditionFalse
- if clusterReady {
- reason = etcdaenixiov1alpha1.EtcdCondTypeStatefulSetReady
- message = etcdaenixiov1alpha1.EtcdReadyCondPosMessage
- ready = metav1.ConditionTrue
- }
-
- meta.SetStatusCondition(
- &instance.Status.Conditions,
- metav1.Condition{
- Type: etcdaenixiov1alpha1.EtcdConditionReady,
- Status: ready,
- Reason: string(reason),
- Message: string(message),
- },
- )
- return r.updateStatus(ctx, instance)
+ */
+ return r.updateStatus(ctx, state)
}
// checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed.
-func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error {
+func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables) error {
for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates {
if volumeClaimTemplate.Name != "data" {
continue
}
currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
- desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
+ desiredStorage := state.instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
if desiredStorage.Cmp(currentStorage) != 0 {
deletePolicy := metav1.DeletePropagationOrphan
log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name)
@@ -262,21 +254,20 @@ func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context
}
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
-func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
- ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
+func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(ctx context.Context, state *observables) error {
- if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
+ if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile cluster state configmap failed")
return err
}
log.Debug(ctx, "cluster state configmap reconciled")
- if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
+ if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}
- if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil {
+ if err := factory.UpdatePersistentVolumeClaims(ctx, state.instance, r.Client); err != nil {
log.Error(ctx, err, "reconcile persistentVolumeClaims failed")
return err
}
@@ -286,11 +277,11 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
}
// updateStatusOnErr wraps error and updates EtcdCluster status
-func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
+func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, state *observables, err error) (ctrl.Result, error) {
// The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored.
// Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded.
// REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile@v0.17.3#Reconciler
- _, statusErr := r.updateStatus(ctx, cluster)
+ _, statusErr := r.updateStatus(ctx, state)
if statusErr != nil {
return ctrl.Result{}, goerrors.Join(statusErr, err)
}
@@ -298,8 +289,8 @@ func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *
}
// updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict
-func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) {
- err := r.Status().Update(ctx, cluster)
+func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, state *observables) (ctrl.Result, error) {
+ err := r.Status().Update(ctx, state.instance)
if err == nil {
return ctrl.Result{}, nil
}
@@ -312,9 +303,9 @@ func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcda
}
// isStatefulSetReady gets managed StatefulSet and checks its readiness.
-func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, c *etcdaenixiov1alpha1.EtcdCluster) (bool, error) {
+func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, state *observables) (bool, error) {
sts := &appsv1.StatefulSet{}
- err := r.Get(ctx, client.ObjectKeyFromObject(c), sts)
+ err := r.Get(ctx, client.ObjectKeyFromObject(state.instance), sts)
if err == nil {
return sts.Status.ReadyReplicas == *sts.Spec.Replicas, nil
}
@@ -332,11 +323,11 @@ func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
-func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
+func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, state *observables) error {
var err error
- cli, err := r.GetEtcdClient(ctx, cluster)
+ cli, err := r.GetEtcdClient(ctx, state.instance)
if err != nil {
return err
}
@@ -352,7 +343,7 @@ func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcd
auth := clientv3.NewAuth(cli)
- if cluster.Spec.Security != nil && cluster.Spec.Security.EnableAuth {
+ if state.instance.Spec.Security != nil && state.instance.Spec.Security.EnableAuth {
if err := r.createRoleIfNotExists(ctx, auth, "root"); err != nil {
return err
@@ -403,12 +394,12 @@ func testMemberList(ctx context.Context, cli *clientv3.Client) error {
return err
}
-func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) {
+func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) {
- endpoints := getEndpointsSlice(cluster)
+ endpoints := getEndpointsSlice(instance)
log.Debug(ctx, "endpoints built", "endpoints", endpoints)
- tlsConfig, err := r.getTLSConfig(ctx, cluster)
+ tlsConfig, err := r.getTLSConfig(ctx, instance)
if err != nil {
log.Error(ctx, err, "failed to build tls config")
return nil, err
@@ -431,17 +422,17 @@ func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcd
}
-func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) {
+func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) {
var err error
caCertPool := &x509.CertPool{}
- if cluster.IsServerTrustedCADefined() {
+ if instance.IsServerTrustedCADefined() {
serverCASecret := &corev1.Secret{}
- if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil {
+ if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil {
log.Error(ctx, err, "failed to get server trusted CA secret")
return nil, err
}
@@ -458,10 +449,10 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda
cert := tls.Certificate{}
- if cluster.IsClientSecurityEnabled() {
+ if instance.IsClientSecurityEnabled() {
rootSecret := &corev1.Secret{}
- if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil {
+ if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil {
log.Error(ctx, err, "failed to get root client secret")
return nil, err
}
@@ -475,7 +466,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda
}
tlsConfig := &tls.Config{
- InsecureSkipVerify: !cluster.IsServerTrustedCADefined(),
+ InsecureSkipVerify: !instance.IsServerTrustedCADefined(),
RootCAs: caCertPool,
Certificates: []tls.Certificate{
cert,
@@ -612,7 +603,7 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie
// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
-func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
+func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, state *observables) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
@@ -630,7 +621,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
defer wg.Done()
select {
case <-ctx.Done():
- case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
+ case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, state.instance, r.Client),
"couldn't ensure client service"):
}
}(c)
@@ -638,7 +629,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
defer wg.Done()
select {
case <-ctx.Done():
- case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
+ case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, state.instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
@@ -646,7 +637,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
defer wg.Done()
select {
case <-ctx.Done():
- case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
+ case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, state.instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)
@@ -688,11 +679,52 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st
if err != nil {
return ctrl.Result{}, err
}
+ meta.SetStatusCondition(
+ &state.instance.Status.Conditions,
+ metav1.Condition{
+ Type: etcdaenixiov1alpha1.EtcdConditionInitialized,
+ Status: metav1.ConditionFalse,
+ Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitStarted),
+ Message: string(etcdaenixiov1alpha1.EtcdInitCondNegMessage),
+ },
+ )
+ meta.SetStatusCondition(
+ &state.instance.Status.Conditions,
+ metav1.Condition{
+ Type: etcdaenixiov1alpha1.EtcdConditionReady,
+ Status: metav1.ConditionFalse,
+ Reason: string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum),
+ Message: string(etcdaenixiov1alpha1.EtcdReadyCondNegWaitingForQuorum),
+ },
+ )
+
+ // ensure managed resources
+ if err = r.ensureConditionalClusterObjects(ctx, state); err != nil {
+ return r.updateStatusOnErr(ctx, state, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
+ }
panic("not yet implemented")
}
// TODO!
// nolint:unused
-func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) {
- panic("not yet implemented")
+func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error {
+ return fmt.Errorf("not yet implemented")
+}
+
+// TODO!
+// nolint:unused
+func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error {
+ return fmt.Errorf("not yet implemented")
+}
+
+// TODO!
+// nolint:unused
+func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error {
+ return fmt.Errorf("not yet implemented")
+}
+
+// TODO!
+// nolint:unused
+func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error {
+ return fmt.Errorf("not yet implemented")
}
diff --git a/internal/controller/etcdcluster_controller_new.go b/internal/controller/etcdcluster_controller_new.go
new file mode 100644
index 00000000..e081c2e2
--- /dev/null
+++ b/internal/controller/etcdcluster_controller_new.go
@@ -0,0 +1,377 @@
+package controller
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
+ "github.com/aenix-io/etcd-operator/internal/controller/factory"
+ "github.com/aenix-io/etcd-operator/internal/log"
+ clientv3 "go.etcd.io/etcd/client/v3"
+ "golang.org/x/sync/errgroup"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ policyv1 "k8s.io/api/policy/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/builder"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+)
+
+type ClusterReconciler struct {
+ client.Client
+ Scheme *runtime.Scheme
+
+ ClusterDomain string
+}
+
+// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
+// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
+// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
+// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
+// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
+// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
+// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
+// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
+// +kubebuilder:rbac:groups="policy",resources=poddisruptionbudgets,verbs=get;create;delete;update;patch;list;watch
+// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;patch;watch
+// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch
+
+// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
+func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (reconcile.Result, error) {
+ log.Info(ctx, "Reconciling object")
+ cluster := &etcdaenixiov1alpha1.EtcdCluster{}
+ err := r.Get(ctx, req.NamespacedName, cluster)
+ if errors.IsNotFound(err) {
+ log.Info(ctx, "resource not found")
+ return reconcile.Result{}, nil
+ }
+ if err != nil {
+ return reconcile.Result{}, err
+ }
+ state := &observables{instance: cluster}
+ return r.reconcile(ctx, state)
+}
+
+// reconcile performs reconciliation of the cluster.
+func (r *ClusterReconciler) reconcile(ctx context.Context, state *observables) (reconcile.Result, error) {
+ if !state.instance.DeletionTimestamp.IsZero() {
+ log.Debug(ctx, "resource is being deleted")
+ return reconcile.Result{}, nil
+ }
+ if err := r.ensureUnconditionalObjects(ctx, state.instance); err != nil {
+ return reconcile.Result{}, err
+ }
+ clusterClient, singleClients, err := r.etcdClientSet(ctx, state.instance)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+
+ state.clusterClient = clusterClient
+ state.singleClients = singleClients
+
+ // checking whether any endpoints exist.
+ if !state.endpointsFound() {
+ // no endpoints found: right branch in flowchart
+ return r.reconcileEndpointsAbsent(ctx, state)
+ }
+ // endpoints found: left branch in flowchart
+ return r.reconcileEndpointsPresent(ctx, state)
+}
+
+// reconcileEndpointsAbsent is called in case there are no endpoints observed.
+// It checks if statefulset exists and if not creates it.
+func (r *ClusterReconciler) reconcileEndpointsAbsent(ctx context.Context, state *observables) (reconcile.Result, error) {
+ err := r.Get(ctx, client.ObjectKeyFromObject(state.instance), &state.statefulSet)
+ if client.IgnoreNotFound(err) != nil {
+ return reconcile.Result{}, err
+ }
+
+ if !state.statefulSetExists() {
+ return reconcile.Result{}, r.createClusterFromScratch(ctx, state) // todo: not implemented yet
+ }
+ if !state.statefulSetPodSpecCorrect() { // todo: not implemented yet
+ return reconcile.Result{}, r.patchStatefulSetPodSpec(ctx, state) // todo: not implemented yet
+ }
+ if !state.statefulSetReady() { // todo: not implemented yet
+ log.Debug(ctx, "waiting etcd cluster statefulset to become ready")
+ return reconcile.Result{}, nil
+ }
+ if !state.statefulSetReplicasIsZero() {
+ log.Error(ctx, fmt.Errorf("invalid statefulset replicas with no endpoints: %d", state.statefulSet.Spec.Replicas),
+ "cluster is in invalid state, dropping from reconciliation queue")
+ return reconcile.Result{}, nil
+ }
+ if state.etcdClusterReplicasIsZero() {
+ return reconcile.Result{}, nil
+ }
+ return reconcile.Result{}, r.scaleUpFromZero(ctx, state) // todo: not implemented yet
+}
+
+// reconcileEndpointsPresent is called in case there are endpoints observed.
+func (r *ClusterReconciler) reconcileEndpointsPresent(ctx context.Context, state *observables) (reconcile.Result, error) {
+ memberReached, err := r.collectEtcdStatuses(ctx, state)
+ if err != nil {
+ return reconcile.Result{}, err
+ }
+
+ // checking whether members are reachable
+ if !memberReached {
+ // no members reachable: right branch in flowchart
+ return r.reconcileMembersUnreachable(ctx, state)
+ }
+ // at least one member reachable: left branch in flowchart
+ return r.reconcileMembersReachable(ctx, state)
+}
+
+// reconcileMembersUnreachable is called in case there are endpoints observed but not all members are reachable.
+func (r *ClusterReconciler) reconcileMembersUnreachable(ctx context.Context, state *observables) (reconcile.Result, error) {
+ err := r.Get(ctx, client.ObjectKeyFromObject(state.instance), &state.statefulSet)
+ if client.IgnoreNotFound(err) != nil {
+ return reconcile.Result{}, err
+ }
+
+ if !state.statefulSetExists() {
+ return reconcile.Result{}, r.createOrUpdateStatefulSet(ctx, state) // todo: not implemented yet
+ }
+ if !state.statefulSetPodSpecCorrect() { // todo: not implemented yet
+ return reconcile.Result{}, r.patchStatefulSetPodSpec(ctx, state) // todo: not implemented yet
+ }
+ return reconcile.Result{}, nil
+}
+
+// reconcileMembersReachable is called in case there are endpoints observed and some(all) members are reachable.
+func (r *ClusterReconciler) reconcileMembersReachable(ctx context.Context, state *observables) (reconcile.Result, error) {
+ state.setClusterID()
+ if state.inSplitbrain() {
+ log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in split-brain, dropping from reconciliation queue")
+ baseEtcdCluster := state.instance.DeepCopy()
+ meta.SetStatusCondition(
+ &state.instance.Status.Conditions,
+ metav1.Condition{
+ Type: etcdaenixiov1alpha1.EtcdConditionError,
+ Status: metav1.ConditionTrue,
+ Reason: string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain),
+ Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage),
+ },
+ )
+ return reconcile.Result{}, r.Status().Patch(ctx, state.instance, client.MergeFrom(baseEtcdCluster))
+ }
+ if !state.clusterHasQuorum() {
+ log.Error(ctx, fmt.Errorf("cluster has lost quorum"), "cluster has lost quorum, dropping from reconciliation queue")
+ return reconcile.Result{}, nil
+ }
+ if !state.allMembersAreManaged() { // todo: not implemented yet
+ log.Error(ctx, fmt.Errorf("not all members are managed"), "not all members are managed, dropping from reconciliation queue")
+ return reconcile.Result{}, nil
+ }
+ if state.hasLearners() {
+ return reconcile.Result{}, r.promoteLearners(ctx, state) // todo: not implemented yet
+ }
+ if !state.allMembersAreHealthy() { // todo: not implemented yet
+ // todo: enqueue unhealthy member(s) eviction
+ // then delete pod, pvc & update config map respectively
+ return reconcile.Result{}, nil
+ }
+ if err := r.createOrUpdateClusterStateConfigMap(ctx, state); err != nil { // todo: not implemented yet
+ return reconcile.Result{}, err
+ }
+ if !state.statefulSetPodSpecCorrect() { // todo: not implemented yet
+ return reconcile.Result{}, r.patchStatefulSetPodSpec(ctx, state) // todo: not implemented yet
+ }
+ if !state.podsPresentInMembersList() { // todo: not implemented yet
+ // todo: delete pod, pvc & update config map respectively
+ return reconcile.Result{}, nil
+ }
+ return r.reconcileReplicas(ctx, state)
+}
+
+// reconcileReplicas is called in case there are endpoints observed and all members are reachable,
+// healthy and present in members list.
+func (r *ClusterReconciler) reconcileReplicas(ctx context.Context, state *observables) (reconcile.Result, error) {
+ if *state.instance.Spec.Replicas == 0 && *state.statefulSet.Spec.Replicas == 1 {
+ return reconcile.Result{}, r.scaleDownToZero(ctx, state) // todo: not implemented yet
+ }
+ if *state.instance.Spec.Replicas < *state.statefulSet.Spec.Replicas {
+ return reconcile.Result{}, r.scaleDown(ctx, state) // todo: not implemented yet
+ }
+ if *state.instance.Spec.Replicas > *state.statefulSet.Spec.Replicas {
+ return reconcile.Result{}, r.scaleUp(ctx, state) // todo: not implemented yet
+ }
+
+ baseEtcdCluster := state.instance.DeepCopy()
+ meta.SetStatusCondition(
+ &state.instance.Status.Conditions,
+ metav1.Condition{
+ Type: etcdaenixiov1alpha1.EtcdConditionReady,
+ Status: metav1.ConditionTrue,
+ Reason: string(etcdaenixiov1alpha1.EtcdCondTypeInitComplete),
+ },
+ )
+
+ return reconcile.Result{}, r.Status().Patch(ctx, state.instance, client.MergeFrom(baseEtcdCluster))
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ return ctrl.NewControllerManagedBy(mgr).
+ For(&etcdaenixiov1alpha1.EtcdCluster{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
+ Owns(&appsv1.StatefulSet{}).
+ Owns(&corev1.ConfigMap{}).
+ Owns(&corev1.Service{}).
+ Owns(&policyv1.PodDisruptionBudget{}).
+ Complete(r)
+}
+
+// ensureUnconditionalObjects ensures that objects that should always exist are created.
+func (r *ClusterReconciler) ensureUnconditionalObjects(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
+ g, ctx := errgroup.WithContext(ctx)
+ wrapWithMessage := func(err error, msg string) error {
+ if err != nil {
+ return fmt.Errorf("%s: %w", msg, err)
+ }
+ return nil
+ }
+ g.Go(func() error {
+ return wrapWithMessage(factory.CreateOrUpdateClientService(ctx, cluster, r.Client), "failed to ensure client service")
+ })
+ g.Go(func() error {
+ return wrapWithMessage(factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client), "failed to ensure headless service")
+ })
+ g.Go(func() error {
+ return wrapWithMessage(factory.CreateOrUpdatePdb(ctx, cluster, r.Client), "failed to ensure pod disruption budget")
+ })
+ return g.Wait()
+}
+
+// etcdClientSet returns etcd client set for given cluster.
+func (r *ClusterReconciler) etcdClientSet(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, []*clientv3.Client, error) {
+ cfg, err := r.etcdClusterConfig(ctx, cluster)
+ if err != nil {
+ return nil, nil, err
+ }
+ if len(cfg.Endpoints) == 0 {
+ return nil, nil, nil
+ }
+ eps := cfg.Endpoints
+ clusterClient, err := clientv3.New(cfg)
+ if err != nil {
+ return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
+ }
+ membersClients := make([]*clientv3.Client, len(eps))
+ for i, ep := range eps {
+ cfg.Endpoints = []string{ep}
+ membersClients[i], err = clientv3.New(cfg)
+ if err != nil {
+ return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
+ }
+ }
+ return clusterClient, membersClients, nil
+}
+
+// collectEtcdStatuses collects etcd members statuses for given cluster.
+func (r *ClusterReconciler) collectEtcdStatuses(ctx context.Context, state *observables) (bool, error) {
+ state.etcdStatuses = make([]etcdStatus, len(state.singleClients))
+ {
+ var wg sync.WaitGroup
+ ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
+ for i := range state.singleClients {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ state.etcdStatuses[i].fill(ctx, state.singleClients[i])
+ }(i)
+ }
+ wg.Wait()
+ cancel()
+ }
+
+ memberReached := false
+ for i := range state.etcdStatuses {
+ if state.etcdStatuses[i].endpointStatus != nil {
+ memberReached = true
+ break
+ }
+ }
+ return memberReached, nil
+}
+
+// etcdClusterConfig returns etcd client config for given cluster.
+func (r *ClusterReconciler) etcdClusterConfig(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (clientv3.Config, error) {
+ ep := corev1.Endpoints{}
+ err := r.Get(ctx, types.NamespacedName{Name: factory.GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
+ if client.IgnoreNotFound(err) != nil {
+ return clientv3.Config{}, err
+ }
+ if err != nil {
+ return clientv3.Config{Endpoints: []string{}}, nil
+ }
+
+ names := map[string]struct{}{}
+ urls := make([]string, 0, 8)
+ for _, v := range ep.Subsets {
+ for _, addr := range v.Addresses {
+ names[addr.Hostname] = struct{}{}
+ }
+ for _, addr := range v.NotReadyAddresses {
+ names[addr.Hostname] = struct{}{}
+ }
+ }
+ for name := range names {
+ urls = append(urls, fmt.Sprintf("%s.%s.%s.svc.%s:%s", name, ep.Name, cluster.Namespace, r.ClusterDomain, "2379"))
+ }
+
+ return clientv3.Config{Endpoints: urls}, nil
+}
+
+// todo: implement this
+func (r *ClusterReconciler) createClusterFromScratch(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) patchStatefulSetPodSpec(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) scaleUp(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) scaleDown(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) scaleDownToZero(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) createOrUpdateStatefulSet(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) promoteLearners(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
+
+// todo: implement this
+func (r *ClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context, state *observables) error {
+ panic("not implemented")
+}
diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go
index 4725171b..9b15dfa7 100644
--- a/internal/controller/factory/etcd_client.go
+++ b/internal/controller/factory/etcd_client.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/aenix-io/etcd-operator/api/v1alpha1"
+ "github.com/spf13/viper"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
@@ -56,7 +57,7 @@ func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli c
}
}
for name := range names {
- urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
+ urls = append(urls, fmt.Sprintf("%s.%s.%s.svc.%s:%s", name, ep.Name, cluster.Namespace, viper.GetString("cluster-domain"), "2379"))
}
return clientv3.Config{Endpoints: urls}, nil
diff --git a/internal/controller/factory/statefulset.go b/internal/controller/factory/statefulset.go
index 202419b2..54bfcd90 100644
--- a/internal/controller/factory/statefulset.go
+++ b/internal/controller/factory/statefulset.go
@@ -31,8 +31,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ "github.com/aenix-io/etcd-operator/api/v1alpha1"
etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
- "github.com/aenix-io/etcd-operator/internal/k8sutils"
"github.com/aenix-io/etcd-operator/internal/log"
)
@@ -42,8 +42,90 @@ const (
)
// TODO!
-func TemplateStatefulSet() *appsv1.StatefulSet {
- panic("not yet implemented")
+func TemplateStatefulSet(c *v1alpha1.EtcdCluster) *appsv1.StatefulSet {
+ labels := func(c *v1alpha1.EtcdCluster) map[string]string {
+ return map[string]string{
+ "app.kubernetes.io/name": "etcd",
+ "app.kubernetes.io/instance": c.Name,
+ "app.kubernetes.io/managed-by": "etcd-operator",
+ }
+ }
+ volumeClaimTemplates := func(c *v1alpha1.EtcdCluster) []corev1.PersistentVolumeClaim {
+ if c.Spec.Storage.EmptyDir != nil {
+ return nil
+ }
+ return []corev1.PersistentVolumeClaim{
+ {
+ ObjectMeta: metav1.ObjectMeta{Name: "data"},
+ Spec: c.Spec.Storage.VolumeClaimTemplate.Spec,
+ },
+ }
+ }
+ s := &appsv1.StatefulSet{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: c.Name,
+ Namespace: c.Namespace,
+ },
+ Spec: appsv1.StatefulSetSpec{
+ Replicas: c.Spec.Replicas,
+ PodManagementPolicy: appsv1.ParallelPodManagement,
+ ServiceName: fmt.Sprintf("%s-headless", c.Name),
+ Selector: &metav1.LabelSelector{
+ MatchLabels: labels(c),
+ },
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: labels(c),
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{{
+ Name: etcdContainerName,
+ Image: etcdaenixiov1alpha1.DefaultEtcdImage,
+ Args: generateEtcdArgs(c),
+ Ports: []corev1.ContainerPort{
+ {Name: "metrics", ContainerPort: 2381},
+ {Name: "peer", ContainerPort: 2380},
+ {Name: "client", ContainerPort: 2379},
+ },
+ EnvFrom: []corev1.EnvFromSource{
+ {
+ ConfigMapRef: &corev1.ConfigMapEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: fmt.Sprintf("%s-cluster-state", c.Name),
+ },
+ },
+ },
+ },
+ Env: []corev1.EnvVar{
+ {
+ Name: "POD_NAME",
+ ValueFrom: &corev1.EnvVarSource{
+ FieldRef: &corev1.ObjectFieldSelector{
+ FieldPath: "metadata.name",
+ },
+ },
+ },
+ {
+ Name: "POD_NAMESPACE",
+ ValueFrom: &corev1.EnvVarSource{
+ FieldRef: &corev1.ObjectFieldSelector{
+ FieldPath: "metadata.namespace",
+ },
+ },
+ },
+ },
+ StartupProbe: getStartupProbe(),
+ LivenessProbe: getLivenessProbe(),
+ ReadinessProbe: getReadinessProbe(),
+ VolumeMounts: generateVolumeMounts(c),
+ }},
+ Volumes: generateVolumes(c),
+ },
+ },
+ VolumeClaimTemplates: volumeClaimTemplates(c),
+ },
+ }
+ return s
}
func PodLabels(cluster *etcdaenixiov1alpha1.EtcdCluster) map[string]string {
@@ -63,62 +145,27 @@ func CreateOrUpdateStatefulSet(
cluster *etcdaenixiov1alpha1.EtcdCluster,
rclient client.Client,
) error {
- podMetadata := metav1.ObjectMeta{
- Labels: PodLabels(cluster),
- }
-
- if cluster.Spec.PodTemplate.Annotations != nil {
- podMetadata.Annotations = cluster.Spec.PodTemplate.Annotations
- }
-
- volumeClaimTemplates := make([]corev1.PersistentVolumeClaim, 0)
- if cluster.Spec.Storage.EmptyDir == nil {
- volumeClaimTemplates = append(volumeClaimTemplates, corev1.PersistentVolumeClaim{
- ObjectMeta: metav1.ObjectMeta{
- Name: GetPVCName(cluster),
- Labels: cluster.Spec.Storage.VolumeClaimTemplate.Labels,
- Annotations: cluster.Spec.Storage.VolumeClaimTemplate.Annotations,
- },
- Spec: cluster.Spec.Storage.VolumeClaimTemplate.Spec,
- Status: cluster.Spec.Storage.VolumeClaimTemplate.Status,
- })
- }
+ /*
+ // This is kept as an example of how override pod spec is merged with base pod spec
+ // for the future, when we get round to restoring this feature
+ basePodSpec := corev1.PodSpec{
+ Containers: []corev1.Container{generateContainer(cluster)},
+ Volumes: volumes,
+ }
+ if cluster.Spec.PodTemplate.Spec.Containers == nil {
+ cluster.Spec.PodTemplate.Spec.Containers = make([]corev1.Container, 0)
+ }
+ finalPodSpec, err := k8sutils.StrategicMerge(basePodSpec, cluster.Spec.PodTemplate.Spec)
+ if err != nil {
+ return fmt.Errorf("cannot strategic-merge base podspec with podTemplate.spec: %w", err)
+ }
+ */
- volumes := generateVolumes(cluster)
+ statefulSet := TemplateStatefulSet(cluster)
- basePodSpec := corev1.PodSpec{
- Containers: []corev1.Container{generateContainer(cluster)},
- Volumes: volumes,
- }
- if cluster.Spec.PodTemplate.Spec.Containers == nil {
- cluster.Spec.PodTemplate.Spec.Containers = make([]corev1.Container, 0)
- }
- finalPodSpec, err := k8sutils.StrategicMerge(basePodSpec, cluster.Spec.PodTemplate.Spec)
- if err != nil {
- return fmt.Errorf("cannot strategic-merge base podspec with podTemplate.spec: %w", err)
- }
-
- statefulSet := &appsv1.StatefulSet{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: cluster.Namespace,
- Name: cluster.Name,
- },
- Spec: appsv1.StatefulSetSpec{
- // initialize static fields that cannot be changed across updates.
- Replicas: cluster.Spec.Replicas,
- ServiceName: GetHeadlessServiceName(cluster),
- PodManagementPolicy: appsv1.ParallelPodManagement,
- Selector: &metav1.LabelSelector{
- MatchLabels: NewLabelsBuilder().WithName().WithInstance(cluster.Name).WithManagedBy(),
- },
- Template: corev1.PodTemplateSpec{
- ObjectMeta: podMetadata,
- Spec: finalPodSpec,
- },
- VolumeClaimTemplates: volumeClaimTemplates,
- },
- }
- ctx, err = contextWithGVK(ctx, statefulSet, rclient.Scheme())
+ // This line used to be an assignment rather than a declaration and redefining
+ // the surrounding function's arguments looks really really wrong
+ ctx, err := contextWithGVK(ctx, statefulSet, rclient.Scheme())
if err != nil {
return err
}
@@ -134,31 +181,17 @@ func CreateOrUpdateStatefulSet(
func generateVolumes(cluster *etcdaenixiov1alpha1.EtcdCluster) []corev1.Volume {
volumes := []corev1.Volume{}
- var dataVolumeSource corev1.VolumeSource
-
if cluster.Spec.Storage.EmptyDir != nil {
- dataVolumeSource = corev1.VolumeSource{EmptyDir: cluster.Spec.Storage.EmptyDir}
- } else {
- dataVolumeSource = corev1.VolumeSource{
- PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
- ClaimName: GetPVCName(cluster),
- },
- }
- }
-
- volumes = append(
- volumes,
-
- corev1.Volume{
+ volumes = append(volumes, corev1.Volume{
Name: "data",
- VolumeSource: dataVolumeSource,
- },
- )
+ VolumeSource: corev1.VolumeSource{EmptyDir: cluster.Spec.Storage.EmptyDir},
+ })
+ }
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.PeerSecret != "" {
- volumes = append(volumes,
- []corev1.Volume{
- {
+ if cluster.Spec.Security != nil {
+ if cluster.Spec.Security.TLS.PeerSecret != "" {
+ volumes = append(volumes,
+ corev1.Volume{
Name: "peer-trusted-ca-certificate",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
@@ -166,7 +199,7 @@ func generateVolumes(cluster *etcdaenixiov1alpha1.EtcdCluster) []corev1.Volume {
},
},
},
- {
+ corev1.Volume{
Name: "peer-certificate",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
@@ -174,35 +207,30 @@ func generateVolumes(cluster *etcdaenixiov1alpha1.EtcdCluster) []corev1.Volume {
},
},
},
- }...)
- }
+ )
+ }
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.ServerSecret != "" {
- volumes = append(volumes,
- []corev1.Volume{
- {
- Name: "server-certificate",
- VolumeSource: corev1.VolumeSource{
- Secret: &corev1.SecretVolumeSource{
- SecretName: cluster.Spec.Security.TLS.ServerSecret,
- },
+ if cluster.Spec.Security.TLS.ServerSecret != "" {
+ volumes = append(volumes, corev1.Volume{
+ Name: "server-certificate",
+ VolumeSource: corev1.VolumeSource{
+ Secret: &corev1.SecretVolumeSource{
+ SecretName: cluster.Spec.Security.TLS.ServerSecret,
},
},
- }...)
- }
+ })
+ }
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.ClientSecret != "" {
- volumes = append(volumes,
- []corev1.Volume{
- {
- Name: "client-trusted-ca-certificate",
- VolumeSource: corev1.VolumeSource{
- Secret: &corev1.SecretVolumeSource{
- SecretName: cluster.Spec.Security.TLS.ClientTrustedCASecret,
- },
+ if cluster.Spec.Security.TLS.ClientSecret != "" {
+ volumes = append(volumes, corev1.Volume{
+ Name: "client-trusted-ca-certificate",
+ VolumeSource: corev1.VolumeSource{
+ Secret: &corev1.SecretVolumeSource{
+ SecretName: cluster.Spec.Security.TLS.ClientTrustedCASecret,
},
},
- }...)
+ })
+ }
}
return volumes
@@ -219,51 +247,42 @@ func generateVolumeMounts(cluster *etcdaenixiov1alpha1.EtcdCluster) []corev1.Vol
MountPath: "/var/run/etcd",
})
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.PeerSecret != "" {
- volumeMounts = append(volumeMounts, []corev1.VolumeMount{
- {
- Name: "peer-trusted-ca-certificate",
- ReadOnly: true,
- MountPath: "/etc/etcd/pki/peer/ca",
- },
- {
- Name: "peer-certificate",
- ReadOnly: true,
- MountPath: "/etc/etcd/pki/peer/cert",
- },
- }...)
- }
+ if cluster.Spec.Security != nil {
+ if cluster.Spec.Security.TLS.PeerSecret != "" {
+ volumeMounts = append(volumeMounts,
+ corev1.VolumeMount{
+ Name: "peer-trusted-ca-certificate",
+ ReadOnly: true,
+ MountPath: "/etc/etcd/pki/peer/ca",
+ },
+ corev1.VolumeMount{
+ Name: "peer-certificate",
+ ReadOnly: true,
+ MountPath: "/etc/etcd/pki/peer/cert",
+ },
+ )
+ }
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.ServerSecret != "" {
- volumeMounts = append(volumeMounts, []corev1.VolumeMount{
- {
+ if cluster.Spec.Security.TLS.ServerSecret != "" {
+ volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: "server-certificate",
ReadOnly: true,
MountPath: "/etc/etcd/pki/server/cert",
- },
- }...)
- }
-
- if cluster.Spec.Security != nil && cluster.Spec.Security.TLS.ClientSecret != "" {
+ })
+ }
- volumeMounts = append(volumeMounts, []corev1.VolumeMount{
- {
+ if cluster.Spec.Security.TLS.ClientSecret != "" {
+ volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: "client-trusted-ca-certificate",
ReadOnly: true,
MountPath: "/etc/etcd/pki/client/ca",
- },
- }...)
+ })
+ }
}
return volumeMounts
}
-func generateEtcdCommand() []string {
- return []string{
- "etcd",
- }
-}
-
func generateEtcdArgs(cluster *etcdaenixiov1alpha1.EtcdCluster) []string {
args := []string{}
@@ -355,54 +374,6 @@ func generateEtcdArgs(cluster *etcdaenixiov1alpha1.EtcdCluster) []string {
return args
}
-func generateContainer(cluster *etcdaenixiov1alpha1.EtcdCluster) corev1.Container {
- podEnv := []corev1.EnvVar{
- {
- Name: "POD_NAME",
- ValueFrom: &corev1.EnvVarSource{
- FieldRef: &corev1.ObjectFieldSelector{
- FieldPath: "metadata.name",
- },
- },
- },
- {
- Name: "POD_NAMESPACE",
- ValueFrom: &corev1.EnvVarSource{
- FieldRef: &corev1.ObjectFieldSelector{
- FieldPath: "metadata.namespace",
- },
- },
- },
- }
-
- c := corev1.Container{}
- c.Name = etcdContainerName
- c.Image = etcdaenixiov1alpha1.DefaultEtcdImage
- c.Command = generateEtcdCommand()
- c.Args = generateEtcdArgs(cluster)
- c.Ports = []corev1.ContainerPort{
- {Name: "peer", ContainerPort: 2380},
- {Name: "client", ContainerPort: 2379},
- }
- clusterStateConfigMapName := GetClusterStateConfigMapName(cluster)
- c.EnvFrom = []corev1.EnvFromSource{
- {
- ConfigMapRef: &corev1.ConfigMapEnvSource{
- LocalObjectReference: corev1.LocalObjectReference{
- Name: clusterStateConfigMapName,
- },
- },
- },
- }
- c.StartupProbe = getStartupProbe()
- c.LivenessProbe = getLivenessProbe()
- c.ReadinessProbe = getReadinessProbe()
- c.Env = podEnv
- c.VolumeMounts = generateVolumeMounts(cluster)
-
- return c
-}
-
func getStartupProbe() *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
diff --git a/internal/controller/factory/statefulset_test.go b/internal/controller/factory/statefulset_test.go
index 64d4660e..9d0ad9ce 100644
--- a/internal/controller/factory/statefulset_test.go
+++ b/internal/controller/factory/statefulset_test.go
@@ -179,10 +179,6 @@ var _ = Describe("CreateOrUpdateStatefulSet handler", func() {
Expect(statefulSet.Spec.Template.ObjectMeta.Annotations).To(Equal(etcdcluster.Spec.PodTemplate.Annotations))
})
- By("Checking the command", func() {
- Expect(statefulSet.Spec.Template.Spec.Containers[0].Command).To(Equal(generateEtcdCommand()))
- })
-
By("Checking the extraArgs", func() {
Expect(statefulSet.Spec.Template.Spec.Containers[0].Args).To(Equal(generateEtcdArgs(&etcdcluster)))
By("Checking args are sorted", func() {
diff --git a/internal/controller/observables.go b/internal/controller/observables.go
index 4d080f4c..18cd10e9 100644
--- a/internal/controller/observables.go
+++ b/internal/controller/observables.go
@@ -2,12 +2,12 @@ package controller
import (
"context"
- // "strconv"
- // "strings"
+ "strconv"
+ "strings"
"sync"
"github.com/aenix-io/etcd-operator/api/v1alpha1"
- // "github.com/aenix-io/etcd-operator/pkg/set"
+ "github.com/aenix-io/etcd-operator/pkg/set"
clientv3 "go.etcd.io/etcd/client/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -27,14 +27,13 @@ type etcdStatus struct {
// observables stores observations that the operator can make about
// states of objects in kubernetes
type observables struct {
- instance *v1alpha1.EtcdCluster
- statefulSet appsv1.StatefulSet
- stsExists bool
- endpoints []string //nolint:unused
- endpointsFound bool
- etcdStatuses []etcdStatus
- clusterID uint64
- pvcs []corev1.PersistentVolumeClaim //nolint:unused
+ instance *v1alpha1.EtcdCluster
+ statefulSet appsv1.StatefulSet
+ etcdStatuses []etcdStatus
+ clusterID uint64
+ pvcs []corev1.PersistentVolumeClaim //nolint:unused
+ clusterClient *clientv3.Client
+ singleClients []*clientv3.Client
}
// setClusterID populates the clusterID field based on etcdStatuses
@@ -49,15 +48,43 @@ func (o *observables) setClusterID() {
// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
// If more than one unique ID is reported, cluster is in splitbrain.
+// Also if members have different opinions on the list of members, this is
+// also a splitbrain.
func (o *observables) inSplitbrain() bool {
+ return !o.clusterIDsAllEqual() || !o.memberListsAllEqual()
+}
+
+func (o *observables) clusterIDsAllEqual() bool {
+ ids := set.New[uint64]()
for i := range o.etcdStatuses {
if o.etcdStatuses[i].endpointStatus != nil {
- if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
- return true
+ ids.Add(o.etcdStatuses[i].endpointStatus.Header.ClusterId)
+ }
+ }
+ return len(ids) <= 1
+}
+
+func (o *observables) memberListsAllEqual() bool {
+ type m struct {
+ Name string
+ ID uint64
+ }
+ memberLists := make([]set.Set[m], 0, len(o.etcdStatuses))
+ for i := range o.etcdStatuses {
+ if o.etcdStatuses[i].memberList != nil {
+ memberSet := set.New[m]()
+ for _, member := range o.etcdStatuses[i].memberList.Members {
+ memberSet.Add(m{member.Name, member.ID})
}
+ memberLists = append(memberLists, memberSet)
}
}
- return false
+ for i := range memberLists {
+ if !memberLists[0].Equals(memberLists[i]) {
+ return false
+ }
+ }
+ return true
}
// fill takes a single-endpoint client and populates the fields of etcdStatus
@@ -73,15 +100,140 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
wg.Wait()
}
-// TODO: make a real function
+func (o *observables) pvcMaxIndex() (max int) {
+ max = -1
+ for i := range o.pvcs {
+ tokens := strings.Split(o.pvcs[i].Name, "-")
+ index, err := strconv.Atoi(tokens[len(tokens)-1])
+ if err != nil {
+ continue
+ }
+ if index > max {
+ max = index
+ }
+ }
+ return max
+}
+
+func (o *observables) endpointMaxIndex() (max int) {
+ for i := range o.endpoints() {
+ tokens := strings.Split(o.endpoints()[i], ":")
+ if len(tokens) < 2 {
+ continue
+ }
+ tokens = strings.Split(tokens[len(tokens)-2], "-")
+ index, err := strconv.Atoi(tokens[len(tokens)-1])
+ if err != nil {
+ continue
+ }
+ if index > max {
+ max = index
+ }
+ }
+ return max
+}
+
+// TODO: make a real function to determine the right number of replicas.
+// Hint: if ClientURL in the member list is absent, the member has not yet
+// started, but if the name field is populated, this is a member of the
+// initial cluster. If the name field is empty, this member has just been
+// added with etcdctl member add (or equivalent API call).
// nolint:unused
-func (o *observables) desiredReplicas() int {
+func (o *observables) desiredReplicas() (max int) {
+ max = -1
if o.etcdStatuses != nil {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].memberList != nil {
- return len(o.etcdStatuses[i].memberList.Members)
+ for j := range o.etcdStatuses[i].memberList.Members {
+ tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-")
+ index, err := strconv.Atoi(tokens[len(tokens)-1])
+ if err != nil {
+ continue
+ }
+ if index > max {
+ max = index
+ }
+ }
}
}
}
- return 0
+ if max > -1 {
+ return max + 1
+ }
+
+ if epMax := o.endpointMaxIndex(); epMax > max {
+ max = epMax
+ }
+ if pvcMax := o.pvcMaxIndex(); pvcMax > max {
+ max = pvcMax
+ }
+ if max == -1 {
+ return int(*o.instance.Spec.Replicas)
+ }
+ return max + 1
+}
+
+func (o *observables) statefulSetExists() bool {
+ return o.statefulSet.UID != ""
+}
+
+func (o *observables) endpoints() []string {
+ return o.clusterClient.Endpoints()
+}
+
+func (o *observables) endpointsFound() bool {
+ return o.clusterClient != nil && o.singleClients != nil
+}
+
+// TODO: compare the desired sts with what exists
+func (o *observables) statefulSetPodSpecCorrect() bool {
+ return true
+}
+
+// TODO: also use updated replicas field?
+func (o *observables) statefulSetReady() bool {
+ return o.statefulSet.Status.ReadyReplicas == *o.statefulSet.Spec.Replicas
+}
+
+func (o *observables) statefulSetReplicasIsZero() bool {
+ return *o.statefulSet.Spec.Replicas == 0
+}
+
+func (o *observables) etcdClusterReplicasIsZero() bool {
+ return *o.instance.Spec.Replicas == 0
+}
+
+func (o *observables) clusterHasQuorum() bool {
+ size := len(o.etcdStatuses)
+ membersInQuorum := size
+ for i := range o.etcdStatuses {
+ if o.etcdStatuses[i].endpointStatus == nil || o.etcdStatuses[i].endpointStatus.Leader == 0 {
+ membersInQuorum--
+ }
+ }
+ return membersInQuorum*2 > size
+}
+
+func (o *observables) hasLearners() bool {
+ for i := range o.etcdStatuses {
+ if stat := o.etcdStatuses[i].endpointStatus; stat != nil && stat.IsLearner {
+ return true
+ }
+ }
+ return false
+}
+
+// TODO: check if the pods are in the member list
+func (o *observables) podsPresentInMembersList() bool {
+ return true
+}
+
+// TODO: check whether all members are healthy
+func (o *observables) allMembersAreHealthy() bool {
+ return true
+}
+
+// TODO: check whether all members are managed
+func (o *observables) allMembersAreManaged() bool {
+ return true
}
diff --git a/site/content/en/docs/v0.4/reference/api.md b/site/content/en/docs/v0.4/reference/api.md
index 4ce45aa5..7c972d28 100644
--- a/site/content/en/docs/v0.4/reference/api.md
+++ b/site/content/en/docs/v0.4/reference/api.md
@@ -167,8 +167,8 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
-| `minAvailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MaxUnavailable. | | |
-| `maxUnavailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MinAvailable | | |
+| `minAvailable` _[IntOrString](#intorstring)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MaxUnavailable. | | |
+| `maxUnavailable` _[IntOrString](#intorstring)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MinAvailable | | |
#### PodTemplate