diff --git a/controllers/selfnoderemediation_controller.go b/controllers/selfnoderemediation_controller.go index 34d84f72e..f8e22abaf 100644 --- a/controllers/selfnoderemediation_controller.go +++ b/controllers/selfnoderemediation_controller.go @@ -463,16 +463,20 @@ func (r *SelfNodeRemediationReconciler) remediateWithResourceRemoval(ctx context var err error switch phase { case fencingStartedPhase: + r.logger.Info("remediateWithResourceRemoval: entered fencing start phase") result, err = r.handleFencingStartedPhase(ctx, node, snr) case preRebootCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered pre reboot completed phase") result, err = r.handlePreRebootCompletedPhase(node, snr) case rebootCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered reboot completed phase") result, err = r.handleRebootCompletedPhase(node, snr, rmNodeResources) case fencingCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered fencing complete phase") result, err = r.handleFencingCompletedPhase(node, snr) default: // this should never happen since we enforce valid values with kubebuilder - err = errors.New("unknown phase") + err = fmt.Errorf("remediateWithResourceRemoval: unknown phase (%s)", phase) r.logger.Error(err, "Undefined unknown phase", "phase", phase) } return result, err @@ -506,6 +510,7 @@ func (r *SelfNodeRemediationReconciler) prepareReboot(ctx context.Context, node } preRebootCompleted := string(preRebootCompletedPhase) + r.logger.Info("pre-reboot completed") snr.Status.Phase = &preRebootCompleted return ctrl.Result{}, nil @@ -636,7 +641,8 @@ func (r *SelfNodeRemediationReconciler) didIRebootMyself(snr *v1alpha1.SelfNodeR func (r *SelfNodeRemediationReconciler) isNodeRebootCapable(node *v1.Node) bool { //make sure that the unhealthy node has self node remediation pod on it which can reboot it if _, err := utils.GetSelfNodeRemediationAgentPod(node.Name, r.Client); err != nil { - r.logger.Error(err, "failed to get self node remediation agent pod resource") + r.logger.Error(err, "failed to get self node remediation agent pod resource, so that makes this node "+ + "not reboot capable") return false } diff --git a/controllers/tests/config/suite_test.go b/controllers/tests/config/suite_test.go index d6c2132d4..58c2a7061 100644 --- a/controllers/tests/config/suite_test.go +++ b/controllers/tests/config/suite_test.go @@ -126,7 +126,7 @@ var _ = BeforeSuite(func() { MyNodeName: shared.UnhealthyNodeName, CheckInterval: shared.ApiCheckInterval, MaxErrorsThreshold: shared.MaxErrorThreshold, - MinPeersForRemediation: shared.MinPeersForRemediation, + MinPeersForRemediation: shared.MinPeersForRemediationConfigDefaultValue, Peers: peers, Cfg: cfg, CertReader: certReader, diff --git a/controllers/tests/controller/control_plane_isolation_testdata.md b/controllers/tests/controller/control_plane_isolation_testdata.md new file mode 100644 index 000000000..0104f49d1 --- /dev/null +++ b/controllers/tests/controller/control_plane_isolation_testdata.md @@ -0,0 +1,12 @@ +# Control Plane Isolation Test Data + +Legacy behaviour: +- Worker-only feedback; no escalation to control-plane peers. +- Diagnostics run only after worker peers declare CR. +- Isolation inferred late, leading to false healthy outcome. + +Redesigned behaviour expectations: +- workers report CR -> query control-plane quorum -> treat absence as unhealthy. +- isolate after `PeerQuorumTimeout` without waiting for worker chatter. + +This file documents test expectations to keep the scenario reproducible. diff --git a/controllers/tests/controller/selfnoderemediation_controller_test.go b/controllers/tests/controller/selfnoderemediation_controller_test.go index 784419088..1ec80ec88 100644 --- a/controllers/tests/controller/selfnoderemediation_controller_test.go +++ b/controllers/tests/controller/selfnoderemediation_controller_test.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + labels2 "github.com/medik8s/common/pkg/labels" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -18,12 +19,16 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + logf "sigs.k8s.io/controller-runtime/pkg/log" machinev1beta1 "github.com/openshift/api/machine/v1beta1" + "github.com/medik8s/self-node-remediation/api" "github.com/medik8s/self-node-remediation/api/v1alpha1" "github.com/medik8s/self-node-remediation/controllers" "github.com/medik8s/self-node-remediation/controllers/tests/shared" + "github.com/medik8s/self-node-remediation/pkg/controlplane" + "github.com/medik8s/self-node-remediation/pkg/peers" "github.com/medik8s/self-node-remediation/pkg/utils" "github.com/medik8s/self-node-remediation/pkg/watchdog" ) @@ -32,53 +37,63 @@ const ( snrNamespace = "default" ) +var remediationStrategy v1alpha1.RemediationStrategyType + var _ = Describe("SNR Controller", func() { var snr *v1alpha1.SelfNodeRemediation - var remediationStrategy v1alpha1.RemediationStrategyType + var nodeRebootCapable = "true" - var isAdditionalSetupNeeded = false BeforeEach(func() { nodeRebootCapable = "true" - snr = &v1alpha1.SelfNodeRemediation{} - snr.Name = shared.UnhealthyNodeName - snr.Namespace = snrNamespace - snrConfig = shared.GenerateTestConfig() - time.Sleep(time.Second * 2) - // reset watchdog for each test! - dummyDog.Reset() - }) + By("Set default self-node-remediation configuration", func() { + snr = &v1alpha1.SelfNodeRemediation{} + snr.Name = shared.UnhealthyNodeName + snr.Namespace = snrNamespace + + snrConfig = shared.GenerateTestConfig() + time.Sleep(time.Second * 2) + }) - JustBeforeEach(func() { - if isAdditionalSetupNeeded { - createSelfNodeRemediationPod() - verifySelfNodeRemediationPodExist() - } - createConfig() DeferCleanup(func() { - deleteConfig() + deleteRemediations() + + //clear node's state, this is important to remove taints, label etc. + By(fmt.Sprintf("Clear node state for '%s'", shared.UnhealthyNodeName), func() { + live := &v1.Node{} + Expect(k8sClient.Client.Get(context.Background(), unhealthyNodeNamespacedName, live)).To(Succeed()) + clean := getNode(shared.UnhealthyNodeName) + patch := client.MergeFrom(live.DeepCopy()) + live.Spec = clean.Spec + live.Labels = clean.Labels + live.Annotations = clean.Annotations + Expect(k8sClient.Client.Patch(context.Background(), live, patch)).To(Succeed()) + }) + + By(fmt.Sprintf("Clear node state for '%s'", shared.PeerNodeName), func() { + live := &v1.Node{} + Expect(k8sClient.Client.Get(context.Background(), peerNodeNamespacedName, live)).To(Succeed()) + clean := getNode(shared.PeerNodeName) + patch := client.MergeFrom(live.DeepCopy()) + live.Spec = clean.Spec + live.Labels = clean.Labels + live.Annotations = clean.Annotations + Expect(k8sClient.Client.Patch(context.Background(), live, patch)).To(Succeed()) + }) + + time.Sleep(time.Second * 2) + + deleteRemediations() + clearEvents() + verifyCleanState() }) - updateIsRebootCapable(nodeRebootCapable) }) - AfterEach(func() { - k8sClient.ShouldSimulateFailure = false - k8sClient.ShouldSimulatePodDeleteFailure = false - isAdditionalSetupNeeded = false - - By("Restore default settings for api connectivity check") - apiConnectivityCheckConfig.MinPeersForRemediation = shared.MinPeersForRemediation - - deleteRemediations() - deleteSelfNodeRemediationPod() - //clear node's state, this is important to remove taints, label etc. - Expect(k8sClient.Update(context.Background(), getNode(shared.UnhealthyNodeName))) - Expect(k8sClient.Update(context.Background(), getNode(shared.PeerNodeName))) - time.Sleep(time.Second * 2) - deleteRemediations() - clearEvents() - verifyCleanState() + JustBeforeEach(func() { + createTestConfig() + updateIsRebootCapable(nodeRebootCapable) + resetWatchdogTimer() }) It("check nodes exist", func() { @@ -162,8 +177,8 @@ var _ = Describe("SNR Controller", func() { Context("Unhealthy node with api-server access", func() { - BeforeEach(func() { - isAdditionalSetupNeeded = true + JustBeforeEach(func() { + doAdditionalSetup() }) Context("Automatic strategy - ResourceDeletion selected", func() { @@ -502,29 +517,100 @@ var _ = Describe("SNR Controller", func() { }) }) - Context("Unhealthy node without api-server access", func() { - BeforeEach(func() { - By("Simulate api-server failure") - k8sClient.ShouldSimulateFailure = true - remediationStrategy = v1alpha1.ResourceDeletionRemediationStrategy + Context("Control-plane isolation (pre-redesign)", func() { + It("should mark control-plane unhealthy when workers report CR but no control-plane peers respond", func() { + configureUnhealthyNodeAsControlNode() + configureRemediationStrategy(v1alpha1.AutomaticRemediationStrategy) + configureApiServerSimulatedFailures(true) + configureSimulatedPeerResponses(true) + configurePeersOverride(func(role peers.Role) []v1.PodIP { + if role == peers.ControlPlane { + return nil + } + return []v1.PodIP{{IP: "10.0.0.11"}} + }) + createSNR(snr, v1alpha1.AutomaticRemediationStrategy) + apiCheck.AppendSimulatedPeerResponse(api.Unhealthy) + + verifyWatchdogTriggered() }) + }) + + Context("Unhealthy node without api-server access", func() { + + Context("two control node peers found, they tell me I'm unhealthy", func() { + + BeforeEach(func() { + additionalNodes := []newNodeConfig{ + { + nodeName: shared.Peer2NodeName, + labels: map[string]string{ + labels2.MasterRole: "true", + }, + + pods: []newPodConfig{ + { + name: shared.SnrPodName2, + simulatedResponse: api.Unhealthy, + }, + }, + }, + { + nodeName: shared.Peer3NodeName, + labels: map[string]string{ + labels2.MasterRole: "true", + }, + pods: []newPodConfig{ + { + name: shared.SnrPodName3, + simulatedResponse: api.Unhealthy, + }, + }, + }, + } + + configureClientWrapperToRandomizePodIpAddresses() + setMinPeersForRemediation(0) + configureUnhealthyNodeAsControlNode() + addNodes(additionalNodes) + + addControlPlaneManager() + resetWatchdogTimer() + configureApiServerSimulatedFailures(true) + configureRemediationStrategy(v1alpha1.ResourceDeletionRemediationStrategy) + configureSimulatedPeerResponses(true) + }) - Context("no peer found", func() { - It("Verify that watchdog is not triggered", func() { - verifyWatchdogNotTriggered() + It("check that we actually get a triggered watchdog reboot", func() { + // It's expected that the next line will fail, even though it shouldn't! + verifyWatchdogTriggered() }) }) - Context("no peer found and MinPeersForRemediation is configured to 0", func() { + Context("api-server should be failing throughout the entire test", func() { BeforeEach(func() { - By("Set MinPeersForRemedation to zero which should trigger the watchdog before the test") - apiConnectivityCheckConfig.MinPeersForRemediation = 0 + configureApiServerSimulatedFailures(true) }) - It("Does not receive peer communication and since configured to need zero peers, initiates a reboot", - func() { + Context("no peer found", func() { + BeforeEach(func() { + clearSimulatedPeerResponses() + }) + It("Verify that watchdog is not triggered", func() { + verifyWatchdogNotTriggered() + }) + }) + + Context("no peer found and MinPeersForRemediation is configured to 0", func() { + BeforeEach(func() { + setMinPeersForRemediation(0) + }) + + It("Does not receive peer communication and since configured to need zero peers, "+ + "initiates a reboot", func() { verifyWatchdogTriggered() }) + }) }) }) @@ -715,26 +801,27 @@ func verifySelfNodeRemediationPodExist() { }, 5*time.Second, 250*time.Millisecond).Should(Equal(1)) } func deleteRemediations() { + By("Delete any existing remediations", func() { + Eventually(func(g Gomega) { + snrs := &v1alpha1.SelfNodeRemediationList{} + g.Expect(k8sClient.List(context.Background(), snrs)).To(Succeed()) + if len(snrs.Items) == 0 { + return + } - Eventually(func(g Gomega) { - snrs := &v1alpha1.SelfNodeRemediationList{} - g.Expect(k8sClient.List(context.Background(), snrs)).To(Succeed()) - if len(snrs.Items) == 0 { - return - } - - for _, snr := range snrs.Items { - tmpSnr := snr - g.Expect(removeFinalizers(&tmpSnr)).To(Succeed()) - g.Expect(k8sClient.Client.Delete(context.Background(), &tmpSnr)).To(Succeed()) + for _, snr := range snrs.Items { + tmpSnr := snr + g.Expect(removeFinalizers(&tmpSnr)).To(Succeed()) + g.Expect(k8sClient.Client.Delete(context.Background(), &tmpSnr)).To(Succeed()) - } + } - expectedEmptySnrs := &v1alpha1.SelfNodeRemediationList{} - g.Expect(k8sClient.List(context.Background(), expectedEmptySnrs)).To(Succeed()) - g.Expect(len(expectedEmptySnrs.Items)).To(Equal(0)) + expectedEmptySnrs := &v1alpha1.SelfNodeRemediationList{} + g.Expect(k8sClient.List(context.Background(), expectedEmptySnrs)).To(Succeed()) + g.Expect(len(expectedEmptySnrs.Items)).To(Equal(0)) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }) } func deleteSNR(snr *v1alpha1.SelfNodeRemediation) { @@ -777,42 +864,101 @@ func createSNR(snr *v1alpha1.SelfNodeRemediation, strategy v1alpha1.RemediationS ExpectWithOffset(1, k8sClient.Client.Create(context.TODO(), snr)).To(Succeed(), "failed to create snr CR") } -func createSelfNodeRemediationPod() { - pod := &v1.Pod{} - pod.Spec.NodeName = shared.UnhealthyNodeName - pod.Labels = map[string]string{"app.kubernetes.io/name": "self-node-remediation", - "app.kubernetes.io/component": "agent"} +func createGenericSelfNodeRemediationPod(node *v1.Node, podName string) (pod *v1.Pod) { + By(fmt.Sprintf("Create pod '%s' under node '%s'", podName, node.Name), func() { + pod = &v1.Pod{} + pod.Spec.NodeName = node.Name + pod.Labels = map[string]string{"app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent"} + + pod.Name = podName + pod.Namespace = shared.Namespace + // Some tests need the containers to exist + container := v1.Container{ + Name: "foo", + Image: "foo", + } + pod.Spec.Containers = []v1.Container{container} - pod.Name = "self-node-remediation" - pod.Namespace = shared.Namespace - container := v1.Container{ - Name: "foo", - Image: "foo", - } - pod.Spec.Containers = []v1.Container{container} - ExpectWithOffset(1, k8sClient.Client.Create(context.Background(), pod)).To(Succeed()) + ExpectWithOffset(1, k8sClient.Client.Create(context.Background(), pod)).To(Succeed(), + "failed to create self-node-remediation pod (%s) for node: '%s'", podName, node.Name) + + time.Sleep(1 * time.Second) + + verifySelfNodeRemediationPodByExistsByName(podName) + + DeferCleanup(func() { + deleteSelfNodeRemediationPod(pod, false) + }) + }) + + return } -func deleteSelfNodeRemediationPod() { - pod := &v1.Pod{} +func verifySelfNodeRemediationPodByExistsByName(name string) { + By(fmt.Sprintf("Verify that pod '%s' exists", name), func() { + podList := &v1.PodList{} + selector := labels.NewSelector() + nameRequirement, _ := labels.NewRequirement("app.kubernetes.io/name", selection.Equals, []string{"self-node-remediation"}) + componentRequirement, _ := labels.NewRequirement("app.kubernetes.io/component", selection.Equals, []string{"agent"}) + selector = selector.Add(*nameRequirement, *componentRequirement) + + EventuallyWithOffset(1, func() (bool, error) { + err := k8sClient.Client.List(context.Background(), podList, &client.ListOptions{LabelSelector: selector}) + for _, item := range podList.Items { + if item.Name == name { + return true, nil + } + } + return false, err + }, 5*time.Second, 250*time.Millisecond).Should(BeTrue(), "expected that we should have"+ + " found the SNR pod") + }) - podKey := client.ObjectKey{ - Namespace: shared.Namespace, - Name: "self-node-remediation", - } + return +} - if err := k8sClient.Get(context.Background(), podKey, pod); err != nil { - Expect(apierrors.IsNotFound(err)).To(BeTrue()) - return +func getSnrPods() (pods *v1.PodList) { + pods = &v1.PodList{} + + By("Listing self-node-remediation pods") + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + "app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent", + }), } + Expect(k8sClient.List(context.Background(), pods, listOptions)).To(Succeed(), + "failed to list self-node-remediation pods") + return +} - var grace client.GracePeriodSeconds = 0 - ExpectWithOffset(1, k8sClient.Client.Delete(context.Background(), pod, grace)).To(Succeed()) +func deleteSelfNodeRemediationPod(pod *v1.Pod, throwErrorIfNotFound bool) { + By(fmt.Sprintf("Attempt to delete pod '%s'", pod.Name), func() { + var grace client.GracePeriodSeconds = 0 + err := k8sClient.Client.Delete(context.Background(), pod, grace) + if throwErrorIfNotFound { + ExpectWithOffset(1, err).To(Succeed(), "there should have been no error "+ + "deleting pod '%s'", pod.Name) + } else { + ExpectWithOffset(1, err).To(Or(Succeed(), shared.IsK8sNotFoundError()), + "expected the delete operation to succeed, or for it to have told us that node '%s'"+ + " didn't exist", pod.Name) + } - EventuallyWithOffset(1, func() bool { - err := k8sClient.Client.Get(context.Background(), podKey, pod) - return apierrors.IsNotFound(err) - }, 10*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + + By("Check that pod: '"+pod.Name+"' was actually deleted", func() { + EventuallyWithOffset(1, func() bool { + podTestAfterDelete := &v1.Pod{} + podKey := client.ObjectKey{ + Namespace: shared.Namespace, + Name: pod.Name, + } + err := k8sClient.Client.Get(context.Background(), podKey, podTestAfterDelete) + return apierrors.IsNotFound(err) + }, 10*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) } func createTerminatingPod() { @@ -916,42 +1062,43 @@ func eventuallyUpdateNode(updateFunc func(*v1.Node), isStatusUpdate bool) { } func verifyCleanState() { - //Verify nodes are at a clean state - nodes := &v1.NodeList{} - Expect(k8sClient.List(context.Background(), nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(BeEquivalentTo(2)) - var peerNodeActual, unhealthyNodeActual *v1.Node - if nodes.Items[0].Name == shared.UnhealthyNodeName { - Expect(nodes.Items[1].Name).To(Equal(shared.PeerNodeName)) - peerNodeActual = &nodes.Items[1] - unhealthyNodeActual = &nodes.Items[0] - } else { - Expect(nodes.Items[0].Name).To(Equal(shared.PeerNodeName)) - Expect(nodes.Items[1].Name).To(Equal(shared.UnhealthyNodeName)) - peerNodeActual = &nodes.Items[0] - unhealthyNodeActual = &nodes.Items[1] - } + By("Verifying that test(s) and AfterTest functions properly left us at an expected state", func() { + //Verify nodes are at a clean state + nodes := &v1.NodeList{} + Expect(k8sClient.List(context.Background(), nodes)).To(Succeed()) + Expect(len(nodes.Items)).To(BeEquivalentTo(2)) + var peerNodeActual, unhealthyNodeActual *v1.Node + if nodes.Items[0].Name == shared.UnhealthyNodeName { + Expect(nodes.Items[1].Name).To(Equal(shared.PeerNodeName)) + peerNodeActual = &nodes.Items[1] + unhealthyNodeActual = &nodes.Items[0] + } else { + Expect(nodes.Items[0].Name).To(Equal(shared.PeerNodeName)) + Expect(nodes.Items[1].Name).To(Equal(shared.UnhealthyNodeName)) + peerNodeActual = &nodes.Items[0] + unhealthyNodeActual = &nodes.Items[1] + } - peerNodeExpected, unhealthyNodeExpected := getNode(shared.PeerNodeName), getNode(shared.UnhealthyNodeName) - verifyNodesAreEqual(peerNodeExpected, peerNodeActual) - verifyNodesAreEqual(unhealthyNodeExpected, unhealthyNodeActual) + peerNodeExpected, unhealthyNodeExpected := getNode(shared.PeerNodeName), getNode(shared.UnhealthyNodeName) + verifyNodesAreEqual(peerNodeExpected, peerNodeActual) + verifyNodesAreEqual(unhealthyNodeExpected, unhealthyNodeActual) - //Verify no existing remediations - remediations := &v1alpha1.SelfNodeRemediationList{} - Expect(k8sClient.List(context.Background(), remediations)).To(Succeed()) - Expect(len(remediations.Items)).To(BeEquivalentTo(0)) + //Verify no existing remediations + remediations := &v1alpha1.SelfNodeRemediationList{} + Expect(k8sClient.List(context.Background(), remediations)).To(Succeed()) + Expect(len(remediations.Items)).To(BeEquivalentTo(0)) - //Verify SNR Pod Does not exist - pod := &v1.Pod{} - podKey := client.ObjectKey{ - Namespace: shared.Namespace, - Name: "self-node-remediation", - } - err := k8sClient.Get(context.Background(), podKey, pod) - Expect(apierrors.IsNotFound(err)).To(BeTrue()) - - verifyOutOfServiceTaintRemoved() + //Verify SNR Pod Does not exist + pod := &v1.Pod{} + podKey := client.ObjectKey{ + Namespace: shared.Namespace, + Name: shared.SnrPodName1, + } + err := k8sClient.Get(context.Background(), podKey, pod) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + verifyOutOfServiceTaintRemoved() + }) } func verifyNodesAreEqual(expected *v1.Node, actual *v1.Node) { @@ -963,14 +1110,16 @@ func verifyNodesAreEqual(expected *v1.Node, actual *v1.Node) { } func clearEvents() { - for { - select { - case _ = <-fakeRecorder.Events: + By("Clear any events in the channel", func() { + for { + select { + case _ = <-fakeRecorder.Events: - default: - return + default: + return + } } - } + }) } func verifyEvent(eventType, reason, message string) { @@ -1016,19 +1165,20 @@ func isEventOccurred(eventType string, reason string, message string) bool { } func deleteConfig() { - snrConfigTmp := &v1alpha1.SelfNodeRemediationConfig{} - // make sure config is already created - Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp)).To(Succeed()) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) - - //delete config and verify it's deleted - Expect(k8sClient.Delete(context.Background(), snrConfigTmp)).To(Succeed()) - Eventually(func(g Gomega) { - err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp) - g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) - + By("Delete SelfNodeRemediationConfig", func() { + snrConfigTmp := &v1alpha1.SelfNodeRemediationConfig{} + // make sure config is already created + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp)).To(Succeed()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + + //delete config and verify it's deleted + Expect(k8sClient.Delete(context.Background(), snrConfigTmp)).To(Succeed()) + Eventually(func(g Gomega) { + err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }) } func createConfig() { @@ -1041,3 +1191,250 @@ func createConfig() { }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) } + +func addControlPlaneManager() { + By("Add a control plane manager", func() { + controlPlaneMgr := controlplane.NewManager(shared.UnhealthyNodeName, k8sClient) + Expect(controlPlaneMgr.Start(context.Background())).To(Succeed(), "we should"+ + "have been able to enable a control plane manager for the current node") + + apiCheck.SetControlPlaneManager(controlPlaneMgr) + + Expect(apiConnectivityCheckConfig.Peers.UpdateControlPlanePeers(context.Background())).To(Succeed()) + + DeferCleanup(func() { + By("Removing the control plane manager", func() { + apiCheck.SetControlPlaneManager(nil) + Expect(apiConnectivityCheckConfig.Peers.UpdateControlPlanePeers(context.Background())).To(Succeed()) + }) + }) + }) +} + +func setMinPeersForRemediation(minimumNumberOfPeers int) { + + By(fmt.Sprintf("Setting MinPeersForRemediation to %d", minimumNumberOfPeers), func() { + orgMinPeersForRemediation := apiConnectivityCheckConfig.MinPeersForRemediation + apiConnectivityCheckConfig.MinPeersForRemediation = minimumNumberOfPeers + + time.Sleep(1 * time.Second) + + DeferCleanup(func() { + By("Restore MinPeersForRemediation back to its default value", func() { + apiConnectivityCheckConfig.MinPeersForRemediation = orgMinPeersForRemediation + }) + }) + }) + +} + +func configureClientWrapperToRandomizePodIpAddresses() { + By("Configure k8s client wrapper to return random IP address for pods", func() { + orgValue := k8sClient.ShouldReturnRandomPodIPs + + k8sClient.ShouldReturnRandomPodIPs = true + + DeferCleanup(func() { + By(fmt.Sprintf("Restore k8s client wrapper random pod IP address generation to %t", + orgValue), func() { + k8sClient.ShouldReturnRandomPodIPs = orgValue + }) + + return + }) + }) +} + +func configureUnhealthyNodeAsControlNode() { + var unhealthyNode = &v1.Node{} + By("Getting the existing unhealthy node object", func() { + Expect(k8sClient.Client.Get(context.TODO(), unhealthyNodeNamespacedName, unhealthyNode)). + To(Succeed(), "failed to get the unhealthy node object") + + logf.Log.Info("Successfully retrieved node", "unhealthyNode", + unhealthyNode) + }) + + By("Set the existing unhealthy node as a control node", func() { + previousRole, hadMasterLabel := unhealthyNode.Labels[labels2.MasterRole] + unhealthyNode.Labels[labels2.MasterRole] = "true" + Expect(k8sClient.Update(context.TODO(), unhealthyNode)).To(Succeed(), "failed to update unhealthy node") + + DeferCleanup(func() { + By("Revert the unhealthy node's role to its previous value", func() { + updated := &v1.Node{} + Expect(k8sClient.Client.Get(context.TODO(), unhealthyNodeNamespacedName, updated)).To(Succeed()) + if hadMasterLabel { + updated.Labels[labels2.MasterRole] = previousRole + } else { + delete(updated.Labels, labels2.MasterRole) + } + Expect(k8sClient.Update(context.TODO(), updated)).To(Succeed(), + "failed to restore the unhealthy node label after control-plane override") + }) + }) + }) +} + +func configureSimulatedPeerResponses(simulateResponses bool) { + By("Start simulating peer responses", func() { + orgValue := apiCheck.ShouldSimulatePeerResponses + orgResponses := apiCheck.SnapshotSimulatedPeerResponses() + apiCheck.ShouldSimulatePeerResponses = simulateResponses + if simulateResponses { + apiCheck.ClearSimulatedPeerResponses() + } + + DeferCleanup(func() { + apiCheck.ShouldSimulatePeerResponses = orgValue + apiCheck.RestoreSimulatedPeerResponses(orgResponses) + apiCheck.RememberSimulatedPeerResponses() + if !simulateResponses { + apiCheck.SetPeersOverride(nil) + } + }) + }) +} + +func configurePeersOverride(fn shared.PeersOverrideFunc) { + By("Configure peer override", func() { + apiCheck.SetPeersOverride(fn) + }) +} + +func configureApiServerSimulatedFailures(simulateResponses bool) { + By("Configure k8s client to simulate API server failures", func() { + orgValue := k8sClient.ShouldSimulateFailure + k8sClient.ShouldSimulateFailure = simulateResponses + apiCheck.ResetPeerTimers() + + DeferCleanup(func() { + By(fmt.Sprintf("Restore k8s client config value for API server failure simulation to %t", + orgValue), func() { + k8sClient.ShouldSimulateFailure = orgValue + apiCheck.ResetPeerTimers() + }) + + }) + }) + +} + +func configureRemediationStrategy(newRemediationStrategy v1alpha1.RemediationStrategyType) { + By(fmt.Sprintf("Configure remediation strategy to '%s'", newRemediationStrategy), func() { + orgValue := remediationStrategy + remediationStrategy = newRemediationStrategy + + DeferCleanup(func() { + By(fmt.Sprintf("Restore remediation strategy to '%s'", orgValue), func() { + remediationStrategy = orgValue + }) + + }) + }) + +} + +type newPodConfig struct { + name string + simulatedResponse api.HealthCheckResponseCode +} +type newNodeConfig struct { + nodeName string + labels map[string]string + + pods []newPodConfig +} + +func addNodes(nodes []newNodeConfig) { + By("Add peer nodes & pods", func() { + + for _, np := range nodes { + newNode := getNode(np.nodeName) + + for key, value := range np.labels { + newNode.Labels[key] = value + } + + By(fmt.Sprintf("Create node '%s'", np.nodeName), func() { + Expect(k8sClient.Create(context.Background(), newNode)).To(Succeed(), + "failed to create peer node '%s'", np.nodeName) + + DeferCleanup(func() { + By(fmt.Sprintf("Remove node '%s'", np.nodeName), func() { + Expect(k8sClient.Delete(context.Background(), newNode)).To(Or(Succeed(), shared.IsK8sNotFoundError())) + }) + }) + }) + + createdNode := &v1.Node{} + + By("Check that the newly created node actually was created", func() { + namespace := client.ObjectKey{ + Name: np.nodeName, + Namespace: "", + } + Eventually(func() error { + return k8sClient.Client.Get(context.TODO(), namespace, createdNode) + }, 10*time.Second, 250*time.Millisecond).Should(BeNil()) + Expect(createdNode.Name).To(Equal(np.nodeName)) + Expect(createdNode.CreationTimestamp).ToNot(BeZero()) + }) + + for _, pod := range np.pods { + By(fmt.Sprintf("Create pod '%s' under node '%s'", pod.name, np.nodeName), func() { + _ = createGenericSelfNodeRemediationPod(createdNode, pod.name) + apiCheck.AppendSimulatedPeerResponse(pod.simulatedResponse) + }) + + } + + } + + apiCheck.RememberSimulatedPeerResponses() + + }) +} + +func resetWatchdogTimer() { + By("Resetting watchdog timer", func() { + dummyDog.Reset() + apiCheck.ResetPeerTimers() + if apiCheck.ShouldSimulatePeerResponses { + apiCheck.RestoreBaselineSimulatedResponses() + apiCheck.RememberSimulatedPeerResponses() + } else { + apiCheck.ClearBaselineSimulatedResponses() + } + }) +} + +func clearSimulatedPeerResponses() { + By("Clearing simulated peer responses", func() { + apiCheck.ClearSimulatedPeerResponses() + apiCheck.ClearBaselineSimulatedResponses() + }) +} + +func doAdditionalSetup() { + By("Perform additional setups", func() { + + By("Create the default self-node-remediation agent pod", func() { + snrPod := createGenericSelfNodeRemediationPod(unhealthyNode, shared.SnrPodName1) + verifySelfNodeRemediationPodExist() + + DeferCleanup(func() { + deleteSelfNodeRemediationPod(snrPod, false) + }) + }) + }) +} + +func createTestConfig() { + By("Create test configuration", func() { + createConfig() + DeferCleanup(func() { + deleteConfig() + }) + }) +} diff --git a/controllers/tests/controller/suite_test.go b/controllers/tests/controller/suite_test.go index 070b26f54..44852b26f 100644 --- a/controllers/tests/controller/suite_test.go +++ b/controllers/tests/controller/suite_test.go @@ -60,6 +60,7 @@ var ( k8sClient *shared.K8sClientWrapper fakeRecorder *record.FakeRecorder snrConfig *selfnoderemediationv1alpha1.SelfNodeRemediationConfig + apiCheck *shared.ApiConnectivityCheckWrapper apiConnectivityCheckConfig *apicheck.ApiConnectivityCheckConfig ) @@ -149,6 +150,7 @@ var _ = BeforeSuite(func() { peerApiServerTimeout := 5 * time.Second peers := peers.New(shared.UnhealthyNodeName, shared.PeerUpdateInterval, k8sClient, ctrl.Log.WithName("peers"), peerApiServerTimeout) + err = k8sManager.Add(peers) Expect(err).ToNot(HaveOccurred()) @@ -161,9 +163,14 @@ var _ = BeforeSuite(func() { Peers: peers, Rebooter: rebooter, Cfg: cfg, - MinPeersForRemediation: shared.MinPeersForRemediation, + MinPeersForRemediation: shared.MinPeersForRemediationConfigDefaultValue, } - apiCheck := apicheck.New(apiConnectivityCheckConfig, nil) + + apiCheck = shared.NewApiConnectivityCheckWrapper(apiConnectivityCheckConfig, nil) + + // default to simulation so the ApiConnectivityCheck doesn't try to dial peers before tests install handlers + apiCheck.ShouldSimulatePeerResponses = true + err = k8sManager.Add(apiCheck) Expect(err).ToNot(HaveOccurred()) diff --git a/controllers/tests/shared/shared.go b/controllers/tests/shared/shared.go index 3e424b879..9bd221b40 100644 --- a/controllers/tests/shared/shared.go +++ b/controllers/tests/shared/shared.go @@ -2,31 +2,49 @@ package shared import ( "context" + "crypto/rand" "errors" + "net" + "sync" "time" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + types2 "github.com/onsi/gomega/types" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + selfNodeRemediation "github.com/medik8s/self-node-remediation/api" selfnoderemediationv1alpha1 "github.com/medik8s/self-node-remediation/api/v1alpha1" + "github.com/medik8s/self-node-remediation/pkg/apicheck" + "github.com/medik8s/self-node-remediation/pkg/controlplane" + "github.com/medik8s/self-node-remediation/pkg/peers" "github.com/medik8s/self-node-remediation/pkg/reboot" ) const ( - PeerUpdateInterval = 30 * time.Second - ApiCheckInterval = 1 * time.Second - MaxErrorThreshold = 1 - MinPeersForRemediation = 1 + PeerUpdateInterval = 30 * time.Second + ApiCheckInterval = 1 * time.Second + MaxErrorThreshold = 1 // CalculatedRebootDuration is the mock calculator's result CalculatedRebootDuration = 3 * time.Second Namespace = "self-node-remediation" UnhealthyNodeName = "node1" PeerNodeName = "node2" - DsDummyImageName = "dummy-image" + Peer2NodeName = "node3" + Peer3NodeName = "node4" + + SnrPodName1 = "self-node-remediation" + SnrPodName2 = "self-node-remediation-2" + SnrPodName3 = "self-node-remediation-3" + DsDummyImageName = "dummy-image" + + MinPeersForRemediationConfigDefaultValue = 1 ) type K8sClientWrapper struct { @@ -35,17 +53,231 @@ type K8sClientWrapper struct { ShouldSimulateFailure bool ShouldSimulatePodDeleteFailure bool SimulatedFailureMessage string + ShouldReturnRandomPodIPs bool +} + +type ApiConnectivityCheckWrapper struct { + *apicheck.ApiConnectivityCheck + ShouldSimulatePeerResponses bool + + responsesMu sync.Mutex + simulatedPeerResponses []selfNodeRemediation.HealthCheckResponseCode + peersOverride PeersOverrideFunc + workerLastResponse time.Time + controlPlaneLastResponse time.Time + baselineResponses []selfNodeRemediation.HealthCheckResponseCode } -func (kcw *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - if kcw.ShouldSimulateFailure { - return errors.New("simulation of client error") - } else if kcw.ShouldSimulatePodDeleteFailure { +// PeersOverrideFunc allows tests to supply synthetic peer address lists without +// altering the production wiring. Phase 0 only stores the function; upcoming +// phases decide how the ApiConnectivityCheck consumes it. +type PeersOverrideFunc func(role peers.Role) []corev1.PodIP + +func (kcw *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + switch { + case kcw.ShouldSimulateFailure: + err = errors.New("simulation of client error") + return + case kcw.ShouldSimulatePodDeleteFailure: if _, ok := list.(*corev1.NamespaceList); ok { - return errors.New(kcw.SimulatedFailureMessage) + err = errors.New(kcw.SimulatedFailureMessage) + return + } + fallthrough + default: + err = kcw.Client.List(ctx, list, opts...) + } + + if kcw.ShouldReturnRandomPodIPs { + logf.Log.Info("Returning random IP addresses for all the pods because ShouldReturnRandomPodIPs is true") + + if podList, ok := list.(*corev1.PodList); ok { + assignRandomIpAddressesPods(podList) } } - return kcw.Client.List(ctx, list, opts...) + + return +} + +func assignRandomIpAddressesPods(pods *corev1.PodList) { + for i := range pods.Items { + randomIP := GetRandomIpAddress() + pods.Items[i].Status.PodIP = randomIP + pods.Items[i].Status.PodIPs = []corev1.PodIP{{IP: randomIP}} + } + + return +} + +func GetRandomIpAddress() (randomIP string) { + // Generate a Unique Local Address (fd00::/8). This keeps addresses valid + // while avoiding collisions with routable ranges. + bytes := make([]byte, net.IPv6len) + bytes[0] = 0xfd + if _, err := rand.Read(bytes[1:]); err != nil { + logf.Log.Error(err, "random IPv6 generation failed; using fallback") + return "fd00::1" + } + randomIP = net.IP(bytes).String() + + return +} + +func NewApiConnectivityCheckWrapper(ck *apicheck.ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) (ckw *ApiConnectivityCheckWrapper) { + inner := apicheck.New(ck, controlPlaneManager) + ckw = &ApiConnectivityCheckWrapper{ + ApiConnectivityCheck: inner, + ShouldSimulatePeerResponses: false, + simulatedPeerResponses: []selfNodeRemediation.HealthCheckResponseCode{}, + } + + inner.SetHealthStatusFunc(func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + if ckw.ShouldSimulatePeerResponses { + // The caller expects exactly one response per peer; the helper enforces + // that contract to keep the bounded channel writes non-blocking. + resp := ckw.nextSimulatedPeerResponse() + results <- resp + return + } + + ckw.GetDefaultPeerHealthCheckFunc()(endpointIp, results) + }) + + return +} + +func (ckw *ApiConnectivityCheckWrapper) nextSimulatedPeerResponse() selfNodeRemediation.HealthCheckResponseCode { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + + if len(ckw.simulatedPeerResponses) == 0 { + return selfNodeRemediation.RequestFailed + } + + code := ckw.simulatedPeerResponses[0] + ckw.simulatedPeerResponses = append([]selfNodeRemediation.HealthCheckResponseCode{}, ckw.simulatedPeerResponses[1:]...) + + return code +} + +func (ckw *ApiConnectivityCheckWrapper) AppendSimulatedPeerResponse(code selfNodeRemediation.HealthCheckResponseCode) { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + ckw.simulatedPeerResponses = append(ckw.simulatedPeerResponses, code) +} + +func (ckw *ApiConnectivityCheckWrapper) ClearSimulatedPeerResponses() { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + ckw.simulatedPeerResponses = nil +} + +func (ckw *ApiConnectivityCheckWrapper) SnapshotSimulatedPeerResponses() []selfNodeRemediation.HealthCheckResponseCode { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + if len(ckw.simulatedPeerResponses) == 0 { + return nil + } + snapshot := make([]selfNodeRemediation.HealthCheckResponseCode, len(ckw.simulatedPeerResponses)) + copy(snapshot, ckw.simulatedPeerResponses) + return snapshot +} + +func (ckw *ApiConnectivityCheckWrapper) RestoreSimulatedPeerResponses(codes []selfNodeRemediation.HealthCheckResponseCode) { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + if len(codes) == 0 { + ckw.simulatedPeerResponses = nil + return + } + ckw.simulatedPeerResponses = make([]selfNodeRemediation.HealthCheckResponseCode, len(codes)) + copy(ckw.simulatedPeerResponses, codes) +} + +func (ckw *ApiConnectivityCheckWrapper) RememberSimulatedPeerResponses() { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + if len(ckw.simulatedPeerResponses) == 0 { + ckw.baselineResponses = nil + return + } + ckw.baselineResponses = make([]selfNodeRemediation.HealthCheckResponseCode, len(ckw.simulatedPeerResponses)) + copy(ckw.baselineResponses, ckw.simulatedPeerResponses) +} + +func (ckw *ApiConnectivityCheckWrapper) RestoreBaselineSimulatedResponses() { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + if len(ckw.baselineResponses) == 0 { + return + } + ckw.simulatedPeerResponses = make([]selfNodeRemediation.HealthCheckResponseCode, len(ckw.baselineResponses)) + copy(ckw.simulatedPeerResponses, ckw.baselineResponses) +} + +func (ckw *ApiConnectivityCheckWrapper) ClearBaselineSimulatedResponses() { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + ckw.baselineResponses = nil +} + +// SetPeersOverride registers a custom provider for peer address lists. +func (ckw *ApiConnectivityCheckWrapper) SetPeersOverride(fn PeersOverrideFunc) { + ckw.responsesMu.Lock() + ckw.peersOverride = fn + ckw.responsesMu.Unlock() + + var wrapped apicheck.PeersOverrideFunc + if fn != nil { + wrapped = func(role peers.Role) []corev1.PodIP { + return fn(role) + } + } + ckw.ApiConnectivityCheck.SetPeersOverride(wrapped) +} + +// ClearPeersOverride removes any previously configured peer provider. +func (ckw *ApiConnectivityCheckWrapper) ClearPeersOverride() { + ckw.SetPeersOverride(nil) +} + +// PeersOverride exposes the currently configured override, if any. +func (ckw *ApiConnectivityCheckWrapper) PeersOverride() PeersOverrideFunc { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + return ckw.peersOverride +} + +// RecordWorkerPeerResponse updates the last time a worker-role peer responded. +func (ckw *ApiConnectivityCheckWrapper) RecordWorkerPeerResponse(t time.Time) { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + ckw.workerLastResponse = t +} + +// WorkerPeerLastResponse returns the timestamp captured via RecordWorkerPeerResponse. +func (ckw *ApiConnectivityCheckWrapper) WorkerPeerLastResponse() time.Time { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + return ckw.workerLastResponse +} + +// RecordControlPlanePeerResponse updates the last time a control-plane peer responded. +func (ckw *ApiConnectivityCheckWrapper) RecordControlPlanePeerResponse(t time.Time) { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + ckw.controlPlaneLastResponse = t +} + +// ControlPlanePeerLastResponse returns the timestamp captured via RecordControlPlanePeerResponse. +func (ckw *ApiConnectivityCheckWrapper) ControlPlanePeerLastResponse() time.Time { + ckw.responsesMu.Lock() + defer ckw.responsesMu.Unlock() + return ckw.controlPlaneLastResponse +} + +func (ckw *ApiConnectivityCheckWrapper) ResetPeerTimers() { + ckw.ApiConnectivityCheck.ResetPeerTimers() } func GenerateTestConfig() *selfnoderemediationv1alpha1.SelfNodeRemediationConfig { @@ -81,3 +313,24 @@ func VerifySNRStatusExist(k8sClient client.Client, snr *selfnoderemediationv1alp g.Expect(meta.IsStatusConditionPresentAndEqual(tmpSNR.Status.Conditions, statusType, conditionStatus)).To(BeTrue()) }, 10*time.Second, 250*time.Millisecond).Should(Succeed()) } + +type K8sErrorTestingFunc func(err error) bool + +// Matches one of the k8s error types that the user wants to ignore +func IsK8sNotFoundError() types2.GomegaMatcher { + return gcustom.MakeMatcher(func(errorToTest error) (matches bool, err error) { + if errorToTest == nil { + matches = false + return + } + + if apierrors.IsNotFound(errorToTest) { + matches = true + return + } + + matches = false + + return + }) +} diff --git a/foo.txt b/foo.txt new file mode 100644 index 000000000..e69de29bb diff --git a/go.mod b/go.mod index 552158478..d1c457754 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( sigs.k8s.io/controller-runtime v0.15.0 ) +require github.com/google/uuid v1.3.0 + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -41,7 +43,6 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect - github.com/google/uuid v1.3.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/main.go b/main.go index a2785a312..fbca63e71 100644 --- a/main.go +++ b/main.go @@ -251,6 +251,19 @@ func getDurEnvVarOrDie(varName string) time.Duration { return time.Duration(intVar) } +func getOptionalDurEnvVar(varName string, fallback time.Duration) time.Duration { + val, exists := os.LookupEnv(varName) + if !exists || val == "" { + return fallback + } + parsed, err := strconv.Atoi(val) + if err != nil { + setupLog.Error(err, "failed to convert env variable to int", "var name", varName, "var value", val) + os.Exit(1) + } + return time.Duration(parsed) +} + func getIntEnvVarOrDie(varName string) int { // determine safe reboot time varVal := os.Getenv(varName) @@ -316,6 +329,8 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) { peerDialTimeout := getDurEnvVarOrDie("PEER_DIAL_TIMEOUT") //timeout for establishing connection to peer peerRequestTimeout := getDurEnvVarOrDie("PEER_REQUEST_TIMEOUT") //timeout for each peer request peerHealthDefaultPort := getIntEnvVarOrDie("HOST_PORT") + failureWindow := getOptionalDurEnvVar("MAX_API_FAILURE_WINDOW", apiCheckInterval*time.Duration(maxErrorThreshold)) + peerQuorumTimeout := getOptionalDurEnvVar("PEER_QUORUM_TIMEOUT", reboot.MaxTimeForNoPeersResponse) // it's fine when the watchdog is nil! rebooter := reboot.NewWatchdogRebooter(wd, ctrl.Log.WithName("rebooter")) @@ -345,6 +360,8 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) { PeerHealthPort: peerHealthDefaultPort, MaxTimeForNoPeersResponse: reboot.MaxTimeForNoPeersResponse, Recorder: mgr.GetEventRecorderFor("ApiConnectivityCheck"), + FailureWindow: failureWindow, + PeerQuorumTimeout: peerQuorumTimeout, } controlPlaneManager := controlplane.NewManager(myNodeName, mgr.GetClient()) diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index 55a708bc3..0e736120c 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -33,16 +33,25 @@ const ( eventReasonPeerTimeoutAdjusted = "PeerTimeoutAdjusted" ) +type PeersOverrideFunc func(role peers.Role) []corev1.PodIP + type ApiConnectivityCheck struct { client.Reader - config *ApiConnectivityCheckConfig - errorCount int - timeOfLastPeerResponse time.Time - clientCreds credentials.TransportCredentials - mutex sync.Mutex - controlPlaneManager *controlplane.Manager + config *ApiConnectivityCheckConfig + failureTracker *FailureTracker + workerLastPeerResponse time.Time + controlPlaneLastPeerResponse time.Time + workerPeerSilenceSince time.Time + controlPlanePeerSilenceSince time.Time + clientCreds credentials.TransportCredentials + mutex sync.Mutex + controlPlaneManager *controlplane.Manager + getHealthStatusFromRemoteFunc GetHealthStatusFromRemoteFunc + peerOverride PeersOverrideFunc } +type GetHealthStatusFromRemoteFunc func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) + type ApiConnectivityCheckConfig struct { Log logr.Logger MyNodeName string @@ -60,15 +69,80 @@ type ApiConnectivityCheckConfig struct { MaxTimeForNoPeersResponse time.Duration MinPeersForRemediation int Recorder record.EventRecorder + FailureWindow time.Duration + PeerQuorumTimeout time.Duration } -func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) *ApiConnectivityCheck { - return &ApiConnectivityCheck{ - config: config, - mutex: sync.Mutex{}, - controlPlaneManager: controlPlaneManager, - timeOfLastPeerResponse: time.Now(), +func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) (c *ApiConnectivityCheck) { + c = &ApiConnectivityCheck{ + config: config, + mutex: sync.Mutex{}, + controlPlaneManager: controlPlaneManager, + failureTracker: NewFailureTracker(), } + + c.SetHealthStatusFunc(c.GetDefaultPeerHealthCheckFunc()) + + return +} + +func (c *ApiConnectivityCheck) GetDefaultPeerHealthCheckFunc() (fun GetHealthStatusFromRemoteFunc) { + + fun = func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + logger := c.config.Log.WithValues("IP", endpointIp.IP) + logger.Info("getting health status from peer") + + if err := c.initClientCreds(); err != nil { + logger.Error(err, "failed to init client credentials") + results <- selfNodeRemediation.RequestFailed + return + } + + // TODO does this work with IPv6? + // MES: Yes it does, we've tested this + phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) + if err != nil { + logger.Error(err, "failed to init grpc client") + results <- selfNodeRemediation.RequestFailed + return + } + defer phClient.Close() + + effectiveTimeout := c.getEffectivePeerRequestTimeout() + ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout) + defer cancel() + + resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{ + NodeName: c.config.MyNodeName, + MachineName: c.config.MyMachineName, + }) + if err != nil { + logger.Error(err, "failed to read health response from peer") + results <- selfNodeRemediation.RequestFailed + return + } + + logger.Info("got response from peer", "status", resp.Status) + + results <- selfNodeRemediation.HealthCheckResponseCode(resp.Status) + return + } + + return +} + +func (c *ApiConnectivityCheck) GetControlPlaneManager() *controlplane.Manager { + return c.controlPlaneManager +} + +func (c *ApiConnectivityCheck) SetControlPlaneManager(manager *controlplane.Manager) { + c.controlPlaneManager = manager +} + +func (c *ApiConnectivityCheck) SetPeersOverride(fn PeersOverrideFunc) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.peerOverride = fn } func (c *ApiConnectivityCheck) Start(ctx context.Context) error { @@ -109,8 +183,10 @@ func (c *ApiConnectivityCheck) Start(ctx context.Context) error { return } - // reset error count after a successful API call - c.errorCount = 0 + // reset failure tracker after a successful API call + if c.failureTracker != nil { + c.failureTracker.Reset() + } }, c.config.CheckInterval) @@ -118,132 +194,354 @@ func (c *ApiConnectivityCheck) Start(ctx context.Context) error { } // isConsideredHealthy keeps track of the number of errors reported, and when a certain amount of error occur within a certain -// time, ask peers if this node is healthy. Returns if the node is considered to be healthy or not. +// time, ask peers if this node is healthy. Returns if the node is considered to be healthy or not. It is usable +// whether this is a control plane node or a worker node func (c *ApiConnectivityCheck) isConsideredHealthy() bool { - workerPeersResponse := c.getWorkerPeersResponse() - isWorkerNode := c.controlPlaneManager == nil || !c.controlPlaneManager.IsControlPlane() - if isWorkerNode { - return workerPeersResponse.IsHealthy - } else { - return c.controlPlaneManager.IsControlPlaneHealthy(workerPeersResponse, c.canOtherControlPlanesBeReached()) + now := time.Now() + tracker := c.ensureFailureTracker() + window := c.effectiveFailureWindow() + tracker.RecordFailure(now) + + if !tracker.ShouldEscalate(now, window) { + c.config.Log.V(1).Info("failure threshold not met, remaining healthy", + "now", now, + "window", window) + return true } + if c.isWorkerNode() { + outcome := c.evaluateWorker(now) + c.config.Log.Info("worker evaluation complete", "outcome", outcome) + return c.outcomeIsHealthyForWorker(outcome) + } + + outcome := c.evaluateControlPlane(now) + c.config.Log.Info("control-plane evaluation complete", "outcome", outcome) + if c.controlPlaneManager == nil { + return c.outcomeIsHealthyForControlPlane(outcome) + } + return c.controlPlaneManager.IsControlPlaneHealthy(outcome) +} + +type peerSummary struct { + role peers.Role + addresses []corev1.PodIP + healthy int + unhealthy int + apiErrors int + failures int + responded bool + hadMinimum bool } -func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { - c.errorCount++ - if c.errorCount < c.config.MaxErrorsThreshold { - c.config.Log.Info("Ignoring api-server error, error count below threshold", "current count", c.errorCount, "threshold", c.config.MaxErrorsThreshold) - return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseErrorsThresholdNotReached} +func (ps peerSummary) totalPeers() int { + return len(ps.addresses) +} + +func (ps peerSummary) responseCount() int { + return ps.healthy + ps.unhealthy + ps.apiErrors +} + +func (ps peerSummary) majorityApiError() bool { + total := ps.totalPeers() + if total == 0 { + return false } + return ps.apiErrors > total/2 +} - c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy") - peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) +func (c *ApiConnectivityCheck) evaluateWorker(now time.Time) controlplane.EvaluationOutcome { + summary := c.gatherPeerResponses(peers.Worker, now) - // We check to see if we have at least the number of peers that the user has configured as required. - // If we don't have this many peers (for instance there are zero peers, and the default value is set - // which requires at least one peer), we don't want to remediate. In this case we have some confusion - // and don't want to remediate a node when we shouldn't. Note: It would be unusual for MinPeersForRemediation - // to be greater than 1 unless the environment has specific requirements. - if len(peersToAsk) < c.config.MinPeersForRemediation { - c.config.Log.Info("Ignoring api-server error as we have an insufficient number of peers found, "+ - "so we aren't going to attempt to contact any to check for a SelfNodeRemediation CR"+ - " - we will consider it as if there was no CR present & as healthy.", "minPeersRequired", - c.config.MinPeersForRemediation, "actualNumPeersFound", len(peersToAsk)) + if summary.totalPeers() == 0 { + if c.config.MinPeersForRemediation == 0 { + return controlplane.EvaluationIsolation + } + if c.peerTimeoutExceeded(peers.Worker, now) { + return controlplane.EvaluationIsolation + } + return controlplane.EvaluationAwaitQuorum + } - // TODO: maybe we need to check if this happens too much and reboot - return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersWereFound} + if summary.healthy > 0 { + return controlplane.EvaluationHealthy } - // If we make it here and there are no peers, we can't proceed because we need at least one peer - // to check. So it doesn't make sense to continue on - we'll mark as unhealthy and exit fast - if len(peersToAsk) == 0 { - c.config.Log.Info("Marking node as unhealthy due to being isolated. We don't have any peers to ask "+ - "and MinPeersForRemediation must be greater than zero", "minPeersRequired", - c.config.MinPeersForRemediation, "actualNumPeersFound", len(peersToAsk)) - return peers.Response{IsHealthy: false, Reason: peers.UnHealthyBecauseNodeIsIsolated} + if summary.unhealthy > 0 { + outcome := c.escalateToControlPlanes(now) + if outcome == controlplane.EvaluationAwaitQuorum && c.peerTimeoutExceeded(peers.ControlPlane, now) { + return controlplane.EvaluationIsolation + } + return outcome } - apiErrorsResponsesSum := 0 - nrAllPeers := len(peersToAsk) - // peersToAsk is being reduced at every iteration, iterate until no peers left to ask - for i := 0; len(peersToAsk) > 0; i++ { + if summary.majorityApiError() { + return controlplane.EvaluationGlobalOutage + } - batchSize := utils.GetNextBatchSize(nrAllPeers, len(peersToAsk)) - chosenPeersIPs := c.popPeerIPs(&peersToAsk, batchSize) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) - if healthyResponses+unhealthyResponses+apiErrorsResponses > 0 { - c.timeOfLastPeerResponse = time.Now() + if !summary.responded || !summary.hadMinimum { + if !summary.responded && c.config.MinPeersForRemediation == 0 { + return controlplane.EvaluationIsolation } - c.config.Log.Info("Aggregate peer health responses", "healthyResponses", healthyResponses, - "unhealthyResponses", unhealthyResponses, "apiErrorsResponses", apiErrorsResponses) - - if healthyResponses > 0 { - c.config.Log.Info("There is at least one peer who thinks this node healthy, so we'll respond "+ - "with a healthy status", "healthyResponses", healthyResponses, "reason", - "peers.HealthyBecauseCRNotFound") - c.errorCount = 0 - return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseCRNotFound} + if c.peerTimeoutExceeded(peers.Worker, now) { + return controlplane.EvaluationIsolation } + return controlplane.EvaluationAwaitQuorum + } + + return controlplane.EvaluationHealthy +} + +func (c *ApiConnectivityCheck) evaluateControlPlane(now time.Time) controlplane.EvaluationOutcome { + workerSummary := c.gatherPeerResponses(peers.Worker, now) + + if workerSummary.majorityApiError() { + return controlplane.EvaluationGlobalOutage + } - if unhealthyResponses > 0 { - c.config.Log.Info("I got at least one peer who thinks I'm unhealthy, so we'll respond "+ - "with unhealthy", "unhealthyResponses", unhealthyResponses, "reason", - "peers.UnHealthyBecausePeersResponse") - return peers.Response{IsHealthy: false, Reason: peers.UnHealthyBecausePeersResponse} + if workerSummary.unhealthy > 0 { + outcome := c.escalateToControlPlanes(now) + if outcome == controlplane.EvaluationAwaitQuorum && c.peerTimeoutExceeded(peers.ControlPlane, now) { + return controlplane.EvaluationIsolation } + return outcome + } - if apiErrorsResponses > 0 { - c.config.Log.Info("If you see this, I didn't get any healthy or unhealthy peer responses, "+ - "instead they told me they can't access the API server either", "apiErrorsResponses", - apiErrorsResponses) - apiErrorsResponsesSum += apiErrorsResponses - // TODO: consider using [m|n]hc.spec.maxUnhealthy instead of 50% - if apiErrorsResponsesSum > nrAllPeers/2 { // already reached more than 50% of the peers and all of them returned api error - // assuming this is a control plane failure as others can't access api-server as well - c.config.Log.Info("More than 50% of the nodes couldn't access the api-server, assuming "+ - "this is a control plane failure, so we are going to return healthy in that case", - "reason", "HealthyBecauseMostPeersCantAccessAPIServer") - return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseMostPeersCantAccessAPIServer} - } + controlSummary := c.gatherPeerResponses(peers.ControlPlane, now) + if controlSummary.totalPeers() == 0 { + return controlplane.EvaluationIsolation + } + + if controlSummary.unhealthy > 0 { + return controlplane.EvaluationRemediate + } + + if controlSummary.majorityApiError() { + return controlplane.EvaluationGlobalOutage + } + + if controlSummary.healthy > 0 { + return controlplane.EvaluationHealthy + } + + if !controlSummary.responded || !controlSummary.hadMinimum { + if c.peerTimeoutExceeded(peers.ControlPlane, now) { + return controlplane.EvaluationIsolation } + return controlplane.EvaluationAwaitQuorum + } + if workerSummary.responded && workerSummary.healthy > 0 { + return controlplane.EvaluationHealthy } - c.config.Log.Info("We have attempted communication with all known peers, and haven't gotten either: " + - "a peer that believes we are healthy, a peer that believes we are unhealthy, or we haven't decided that " + - "there is a control plane failure") + if !workerSummary.responded && c.peerTimeoutExceeded(peers.Worker, now) { + return controlplane.EvaluationIsolation + } - //we asked all peers - now := time.Now() - // MaxTimeForNoPeersResponse check prevents the node from being considered unhealthy in case of short network outages - if now.After(c.timeOfLastPeerResponse.Add(c.config.MaxTimeForNoPeersResponse)) { - c.config.Log.Error(fmt.Errorf("failed health check"), "Failed to get health status peers. "+ - "Assuming unhealthy", "reason", "UnHealthyBecauseNodeIsIsolated") - return peers.Response{IsHealthy: false, Reason: peers.UnHealthyBecauseNodeIsIsolated} + return controlplane.EvaluationHealthy +} + +func (c *ApiConnectivityCheck) escalateToControlPlanes(now time.Time) controlplane.EvaluationOutcome { + summary := c.gatherPeerResponses(peers.ControlPlane, now) + + if summary.totalPeers() == 0 { + return controlplane.EvaluationIsolation + } + + if summary.unhealthy > 0 { + return controlplane.EvaluationRemediate + } + + if summary.majorityApiError() { + return controlplane.EvaluationGlobalOutage + } + + if summary.healthy > 0 { + return controlplane.EvaluationHealthy + } + + if !summary.responded || !summary.hadMinimum { + if c.peerTimeoutExceeded(peers.ControlPlane, now) { + return controlplane.EvaluationIsolation + } + return controlplane.EvaluationAwaitQuorum + } + + return controlplane.EvaluationHealthy +} + +func (c *ApiConnectivityCheck) gatherPeerResponses(role peers.Role, now time.Time) peerSummary { + addresses := c.listPeers(role) + summary := peerSummary{ + role: role, + addresses: addresses, + } + + minimum := c.config.MinPeersForRemediation + if minimum <= 0 { + summary.hadMinimum = true + } else { + summary.hadMinimum = len(addresses) >= minimum + } + + if len(addresses) == 0 { + c.recordPeerSilence(role, now) + return summary + } + + peersToAsk := make([]corev1.PodIP, len(addresses)) + copy(peersToAsk, addresses) + allPeers := len(peersToAsk) + + for len(peersToAsk) > 0 { + batchSize := utils.GetNextBatchSize(allPeers, len(peersToAsk)) + chosen := c.popPeerIPs(&peersToAsk, batchSize) + healthy, unhealthy, apiErrors, failures := c.getHealthStatusFromPeers(chosen) + summary.healthy += healthy + summary.unhealthy += unhealthy + summary.apiErrors += apiErrors + summary.failures += failures + if healthy+unhealthy+apiErrors > 0 { + summary.responded = true + } + } + + if summary.responded { + c.recordPeerActivity(role, now) + } else { + c.recordPeerSilence(role, now) + } + + c.config.Log.Info("peer summary", "role", role, "total", summary.totalPeers(), "healthy", summary.healthy, "unhealthy", summary.unhealthy, "apiErrors", summary.apiErrors, "failures", summary.failures, "responded", summary.responded, "hadMinimum", summary.hadMinimum) + + return summary +} + +func (c *ApiConnectivityCheck) listPeers(role peers.Role) []corev1.PodIP { + c.mutex.Lock() + override := c.peerOverride + c.mutex.Unlock() + + if override != nil { + custom := override(role) + if len(custom) == 0 { + return nil + } + copySlice := make([]corev1.PodIP, len(custom)) + copy(copySlice, custom) + return copySlice + } + + if c.config == nil || c.config.Peers == nil { + return nil + } + + return c.config.Peers.GetPeersAddresses(role) +} + +func (c *ApiConnectivityCheck) recordPeerActivity(role peers.Role, now time.Time) { + if role == peers.Worker { + c.workerLastPeerResponse = now + c.workerPeerSilenceSince = time.Time{} } else { - c.config.Log.Info("Ignoring no peers response error, time is below threshold for no peers response", - "time without peers response (seconds)", now.Sub(c.timeOfLastPeerResponse).Seconds(), - "threshold (seconds)", c.config.MaxTimeForNoPeersResponse.Seconds(), - "reason", "HealthyBecauseNoPeersResponseNotReachedTimeout") - return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersResponseNotReachedTimeout} + c.controlPlaneLastPeerResponse = now + c.controlPlanePeerSilenceSince = time.Time{} + } +} + +func (c *ApiConnectivityCheck) recordPeerSilence(role peers.Role, now time.Time) { + if role == peers.Worker { + if c.workerPeerSilenceSince.IsZero() { + c.workerPeerSilenceSince = now + } + } else { + if c.controlPlanePeerSilenceSince.IsZero() { + c.controlPlanePeerSilenceSince = now + } + } +} + +func (c *ApiConnectivityCheck) ResetPeerTimers() { + c.workerLastPeerResponse = time.Time{} + c.workerPeerSilenceSince = time.Time{} + c.controlPlaneLastPeerResponse = time.Time{} + c.controlPlanePeerSilenceSince = time.Time{} +} + +func (c *ApiConnectivityCheck) outcomeIsHealthyForWorker(outcome controlplane.EvaluationOutcome) bool { + switch outcome { + case controlplane.EvaluationRemediate, controlplane.EvaluationIsolation: + return false + default: + return true + } +} + +func (c *ApiConnectivityCheck) outcomeIsHealthyForControlPlane(outcome controlplane.EvaluationOutcome) bool { + switch outcome { + case controlplane.EvaluationRemediate, controlplane.EvaluationIsolation: + return false + default: + return true } +} +func (c *ApiConnectivityCheck) isWorkerNode() bool { + if c.controlPlaneManager == nil { + return true + } + return !c.controlPlaneManager.IsControlPlane() } -func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { - peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) - numOfControlPlanePeers := len(peersToAsk) - if numOfControlPlanePeers == 0 { - c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached") +func (c *ApiConnectivityCheck) ensureFailureTracker() *FailureTracker { + if c.failureTracker == nil { + c.failureTracker = NewFailureTracker() + } + return c.failureTracker +} + +func (c *ApiConnectivityCheck) effectiveFailureWindow() time.Duration { + if c.config == nil { + return 0 + } + if c.config.FailureWindow > 0 { + return c.config.FailureWindow + } + if c.config.CheckInterval > 0 && c.config.MaxErrorsThreshold > 0 { + return c.config.CheckInterval * time.Duration(c.config.MaxErrorsThreshold) + } + return 0 +} + +func (c *ApiConnectivityCheck) effectivePeerQuorumTimeout() time.Duration { + if c.config == nil { + return 0 + } + if c.config.PeerQuorumTimeout > 0 { + return c.config.PeerQuorumTimeout + } + return c.config.MaxTimeForNoPeersResponse +} + +func (c *ApiConnectivityCheck) peerTimeoutExceeded(role peers.Role, now time.Time) bool { + deadline := c.effectivePeerQuorumTimeout() + if deadline <= 0 { return false } - chosenPeersIPs := c.popPeerIPs(&peersToAsk, numOfControlPlanePeers) - healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs) + var silenceSince time.Time + if role == peers.Worker { + silenceSince = c.workerPeerSilenceSince + } else { + silenceSince = c.controlPlanePeerSilenceSince + } + + if silenceSince.IsZero() { + return false + } - // Any response is an indication of communication with a peer - return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0 + return now.Sub(silenceSince) >= deadline } func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP { @@ -277,6 +575,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP nrAddresses := len(addresses) responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses) + c.config.Log.Info("Attempting to get health status from peers", "addresses", addresses) + for _, address := range addresses { go c.getHealthStatusFromPeer(address, responsesChan) } @@ -290,7 +590,6 @@ func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration { minimumSafeTimeout := c.config.ApiServerTimeout + v1alpha1.MinimumBuffer if c.config.PeerRequestTimeout < minimumSafeTimeout { - // Log warning about timeout adjustment c.config.Log.Info("PeerRequestTimeout is too low, using adjusted value for safety", "configuredTimeout", c.config.PeerRequestTimeout, "apiServerTimeout", c.config.ApiServerTimeout, @@ -303,47 +602,20 @@ func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration { return c.config.PeerRequestTimeout } -// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel -func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { - - logger := c.config.Log.WithValues("IP", endpointIp.IP) - logger.Info("getting health status from peer") - - if err := c.initClientCreds(); err != nil { - logger.Error(err, "failed to init client credentials") - results <- selfNodeRemediation.RequestFailed - return - } - - // TODO does this work with IPv6? - phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) - if err != nil { - logger.Error(err, "failed to init grpc client") - results <- selfNodeRemediation.RequestFailed - return - } - defer phClient.Close() - - effectiveTimeout := c.getEffectivePeerRequestTimeout() - ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout) - defer cancel() - - resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{ - NodeName: c.config.MyNodeName, - MachineName: c.config.MyMachineName, - }) - if err != nil { - logger.Error(err, "failed to read health response from peer") - results <- selfNodeRemediation.RequestFailed - return - } - - logger.Info("got response from peer", "status", resp.Status) +func (c *ApiConnectivityCheck) SetHealthStatusFunc(f GetHealthStatusFromRemoteFunc) { + c.getHealthStatusFromRemoteFunc = f +} - results <- selfNodeRemediation.HealthCheckResponseCode(resp.Status) +func (c *ApiConnectivityCheck) GetHealthStatusFunc() (f GetHealthStatusFromRemoteFunc) { + f = c.getHealthStatusFromRemoteFunc return } +// GetHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel +func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + c.getHealthStatusFromRemoteFunc(endpointIp, results) +} + func (c *ApiConnectivityCheck) initClientCreds() error { c.mutex.Lock() defer c.mutex.Unlock() diff --git a/pkg/apicheck/check_internal_test.go b/pkg/apicheck/check_internal_test.go new file mode 100644 index 000000000..008826aef --- /dev/null +++ b/pkg/apicheck/check_internal_test.go @@ -0,0 +1,54 @@ +package apicheck + +import ( + "reflect" + "testing" + "time" + "unsafe" + + corev1 "k8s.io/api/core/v1" + + selfnode "github.com/medik8s/self-node-remediation/api" + "github.com/medik8s/self-node-remediation/pkg/peers" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// helper to mutate unexported slice fields in peers.Peers for test purposes. +func setPeerAddresses(p *peers.Peers, field string, addrs []corev1.PodIP) { + val := reflect.ValueOf(p).Elem().FieldByName(field) + ptr := unsafe.Pointer(val.UnsafeAddr()) + slice := (*[]corev1.PodIP)(ptr) + *slice = addrs +} + +func TestWorkerEscalatesWhenControlPlanePeersUnavailable(t *testing.T) { + workerAddr := []corev1.PodIP{{IP: "10.0.0.10"}} + peerStore := &peers.Peers{} + setPeerAddresses(peerStore, "workerPeersAddresses", workerAddr) + setPeerAddresses(peerStore, "controlPlanePeersAddresses", nil) + + cfg := &ApiConnectivityCheckConfig{ + Log: logf.Log.WithName("test"), + MyNodeName: "worker-1", + MyMachineName: "machine-1", + CheckInterval: 50 * time.Millisecond, + FailureWindow: 50 * time.Millisecond, + PeerQuorumTimeout: 100 * time.Millisecond, + MaxErrorsThreshold: 1, + Peers: peerStore, + MinPeersForRemediation: 1, + } + + check := New(cfg, nil) + check.SetHealthStatusFunc(func(_ corev1.PodIP, results chan<- selfnode.HealthCheckResponseCode) { + results <- selfnode.Unhealthy + }) + + tracker := check.ensureFailureTracker() + tracker.RecordFailure(time.Now().Add(-cfg.FailureWindow)) + tracker.RecordFailure(time.Now().Add(-cfg.FailureWindow / 2)) + + if healthy := check.isConsideredHealthy(); healthy { + t.Fatalf("expected worker to treat lack of control-plane peers as unhealthy") + } +} diff --git a/pkg/apicheck/failure_tracker.go b/pkg/apicheck/failure_tracker.go new file mode 100644 index 000000000..8357011c8 --- /dev/null +++ b/pkg/apicheck/failure_tracker.go @@ -0,0 +1,92 @@ +package apicheck + +import "time" + +// FailureTracker keeps lightweight state about recent API failures so callers can +// decide when to escalate to peer queries. It tracks the timestamp of the first +// failure in the current observation window and the spacing between subsequent +// failures so intermittent blips do not trigger escalation. +type FailureTracker struct { + consecutive int + firstFailure time.Time + lastFailure time.Time + lastGap time.Duration +} + +// NewFailureTracker creates an empty tracker. +func NewFailureTracker() *FailureTracker { + return &FailureTracker{} +} + +// RecordFailure notes that a failure occurred at the supplied time. +func (ft *FailureTracker) RecordFailure(at time.Time) { + if ft == nil { + return + } + + if ft.consecutive == 0 { + ft.consecutive = 1 + ft.firstFailure = at + ft.lastFailure = at + ft.lastGap = 0 + return + } + + if ft.lastFailure.IsZero() { + ft.lastFailure = at + } + + gap := at.Sub(ft.lastFailure) + if gap < 0 { + gap = 0 + } + + ft.lastGap = gap + ft.lastFailure = at + ft.consecutive++ +} + +// Reset clears any recorded failures. +func (ft *FailureTracker) Reset() { + if ft == nil { + return + } + + ft.consecutive = 0 + ft.firstFailure = time.Time{} + ft.lastFailure = time.Time{} + ft.lastGap = 0 +} + +// ShouldEscalate reports whether the accumulated failures warrant escalation. +// When the spacing between the most recent failures exceeds the observation window +// the tracker resets itself, treating the latest failure as the start of a new +// window so intermittent blips do not trip escalation. +func (ft *FailureTracker) ShouldEscalate(now time.Time, window time.Duration) bool { + if ft == nil || ft.consecutive == 0 { + return false + } + + if window <= 0 { + return true + } + + // If the most recent failure occurred after a large gap reset the window so + // intermittent failures outside the observation window do not escalate. Allow a + // small buffer above the configured window to tolerate scheduling jitter. + threshold := window + window/2 + if threshold <= 0 { + threshold = 0 + } + if threshold > 0 && ft.lastGap >= threshold { + ft.firstFailure = ft.lastFailure + ft.consecutive = 1 + ft.lastGap = 0 + } + + if now.Before(ft.firstFailure) { + return false + } + + return now.Sub(ft.firstFailure) >= window +} diff --git a/pkg/apicheck/failure_tracker_test.go b/pkg/apicheck/failure_tracker_test.go new file mode 100644 index 000000000..724a1c478 --- /dev/null +++ b/pkg/apicheck/failure_tracker_test.go @@ -0,0 +1,63 @@ +package apicheck + +import ( + "testing" + "time" +) + +func TestFailureTrackerEscalatesAfterWindow(t *testing.T) { + tracker := NewFailureTracker() + window := 3 * time.Second + start := time.Unix(0, 0) + + tracker.RecordFailure(start) + if tracker.ShouldEscalate(start, window) { + t.Fatalf("unexpected escalation after initial failure") + } + + tracker.RecordFailure(start.Add(1 * time.Second)) + if tracker.ShouldEscalate(start.Add(1*time.Second), window) { + t.Fatalf("unexpected escalation before window elapsed") + } + + tracker.RecordFailure(start.Add(window)) + if !tracker.ShouldEscalate(start.Add(window), window) { + t.Fatalf("expected escalation after sustained failures") + } +} + +func TestFailureTrackerResetClearsState(t *testing.T) { + tracker := NewFailureTracker() + window := 2 * time.Second + start := time.Unix(0, 0) + + tracker.RecordFailure(start) + tracker.RecordFailure(start.Add(1 * time.Second)) + if tracker.ShouldEscalate(start.Add(1*time.Second), window) { + t.Fatalf("unexpected escalation before reset") + } + + tracker.Reset() + + tracker.RecordFailure(start.Add(3 * time.Second)) + if tracker.ShouldEscalate(start.Add(3*time.Second), window) { + t.Fatalf("reset should clear escalation state") + } +} + +func TestFailureTrackerIntermittentFailuresDoNotEscalate(t *testing.T) { + tracker := NewFailureTracker() + window := 2 * time.Second + start := time.Unix(0, 0) + + tracker.RecordFailure(start) + if tracker.ShouldEscalate(start, window) { + t.Fatalf("unexpected escalation on first intermittent failure") + } + + // next failure occurs after the window, so escalation should not trigger + tracker.RecordFailure(start.Add(3 * time.Second)) + if tracker.ShouldEscalate(start.Add(3*time.Second), window) { + t.Fatalf("intermittent failures should not escalate") + } +} diff --git a/pkg/controlplane/manager.go b/pkg/controlplane/manager.go index f7c3aaba4..cc35f9ad3 100644 --- a/pkg/controlplane/manager.go +++ b/pkg/controlplane/manager.go @@ -25,6 +25,16 @@ const ( kubeletPort = "10250" ) +type EvaluationOutcome int + +const ( + EvaluationHealthy EvaluationOutcome = iota + EvaluationRemediate + EvaluationGlobalOutage + EvaluationIsolation + EvaluationAwaitQuorum +) + // Manager contains logic and info needed to fence and remediate controlplane nodes type Manager struct { nodeName string @@ -54,48 +64,36 @@ func (manager *Manager) Start(_ context.Context) error { } func (manager *Manager) IsControlPlane() bool { + manager.log.Info("Checking to see if node is a control plane node", + "nodeName", manager.nodeName, + "controlPlaneNodeExpectedValue", peers.ControlPlane, + "nodeRole", manager.nodeRole) return manager.nodeRole == peers.ControlPlane } -func (manager *Manager) IsControlPlaneHealthy(workerPeerResponse peers.Response, canOtherControlPlanesBeReached bool) bool { - switch workerPeerResponse.Reason { - //reported unhealthy by worker peers - case peers.UnHealthyBecausePeersResponse: - manager.log.Info("We are deciding the control plane is not healthy because the peer response was UnHealthyBecausePeersResponse") +func (manager *Manager) IsControlPlaneHealthy(outcome EvaluationOutcome) bool { + switch outcome { + case EvaluationRemediate: + manager.log.Info("control-plane evaluation requested remediation") return false - case peers.UnHealthyBecauseNodeIsIsolated: - manager.log.Info("While trying to determine if the control plane is healthy, the peer response was "+ - "UnHealthyBecauseNodeIsIsolated, so we are returning true if we could reach other control plane nodes", - "canOtherControlPlanesBeReached", canOtherControlPlanesBeReached) - return canOtherControlPlanesBeReached - //reported healthy by worker peers - case peers.HealthyBecauseErrorsThresholdNotReached, peers.HealthyBecauseCRNotFound, peers.HealthyBecauseNoPeersResponseNotReachedTimeout: - manager.log.Info("We are deciding that the control plane is healthy because either: "+ - "HealthyBecauseErrorsThresholdNotReached "+ - ", HealthyBecauseCRNotFound, or HealthyBecauseNoPeersResponseNotReachedTimeout", - "reason", workerPeerResponse.Reason) - return true - //controlPlane node has connection to most workers, we assume it's not isolated (or at least that the controlPlane node that does not have worker peers quorum will reboot) - case peers.HealthyBecauseMostPeersCantAccessAPIServer: + case EvaluationIsolation: + manager.log.Info("control-plane evaluation detected isolation") + return false + case EvaluationGlobalOutage: didDiagnosticsPass := manager.isDiagnosticsPassed() - manager.log.Info("The peers couldn't access the API server, so we are returning whether "+ - "diagnostics passed", "didDiagnosticsPass", didDiagnosticsPass) + manager.log.Info("peers report API outage, relying on diagnostics", + "diagnosticsPassed", didDiagnosticsPass) return didDiagnosticsPass - case peers.HealthyBecauseNoPeersWereFound: - didDiagnosticsPass := manager.isDiagnosticsPassed() - - manager.log.Info("We couldn't find any peers so we are returning didDiagnosticsPass && "+ - "canOtherControlPlanesBeReached", "didDiagnosticsPass", didDiagnosticsPass, - "canOtherControlPlanesBeReached", canOtherControlPlanesBeReached) - - return didDiagnosticsPass && canOtherControlPlanesBeReached - + case EvaluationAwaitQuorum: + manager.log.Info("peer quorum pending, deferring remediation") + return true + case EvaluationHealthy: + return true default: - errorText := "node is considered unhealthy by worker peers for an unknown reason" - manager.log.Error(errors.New(errorText), errorText, "reason", workerPeerResponse.Reason, "node name", manager.nodeName) + errorText := "unknown evaluation outcome" + manager.log.Error(errors.New(errorText), errorText, "outcome", outcome, "node name", manager.nodeName) return false } - } func (manager *Manager) isDiagnosticsPassed() bool { @@ -131,6 +129,9 @@ func (manager *Manager) initializeManager() error { } func (manager *Manager) setNodeRole(node corev1.Node) { + manager.log.Info("setNodeRole called", + "labels", node.Labels) + if nodes.IsControlPlane(&node) { manager.nodeRole = peers.ControlPlane } else { @@ -173,7 +174,10 @@ func (manager *Manager) isKubeletServiceRunning() bool { MinVersion: certificates.TLSMinVersion, }, } - httpClient := &http.Client{Transport: tr} + httpClient := &http.Client{ + Transport: tr, + Timeout: 5 * time.Second, + } req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -186,6 +190,14 @@ func (manager *Manager) isKubeletServiceRunning() bool { manager.log.Error(err, "kubelet service is down", "node name", manager.nodeName) return false } - defer resp.Body.Close() - return true + defer func() { + if cerr := resp.Body.Close(); cerr != nil { + manager.log.Error(cerr, "failed to close kubelet response body", "node name", manager.nodeName) + } + }() + if resp.StatusCode >= 200 && resp.StatusCode < 400 { + return true + } + manager.log.Info("kubelet responded with non-success status", "node name", manager.nodeName, "status", resp.StatusCode) + return false } diff --git a/pkg/controlplane/manager_internal_test.go b/pkg/controlplane/manager_internal_test.go new file mode 100644 index 000000000..4aa79f1d9 --- /dev/null +++ b/pkg/controlplane/manager_internal_test.go @@ -0,0 +1,21 @@ +package controlplane + +import "testing" + +func TestIsControlPlaneHealthyFlagsIsolation(t *testing.T) { + mgr := &Manager{} + mgr.nodeName = "cp-1" + + if healthy := mgr.IsControlPlaneHealthy(EvaluationIsolation); healthy { + t.Fatalf("expected isolation to mark node unhealthy when other control planes unreachable") + } +} + +func TestIsControlPlaneHealthyChecksDiagnosticsWhenPeersReportAPIFailure(t *testing.T) { + mgr := &Manager{} + mgr.nodeName = "cp-1" + + if healthy := mgr.IsControlPlaneHealthy(EvaluationGlobalOutage); healthy { + t.Fatalf("expected diagnostics to influence decision when peers report API outage") + } +} diff --git a/pkg/peerhealth/peerhealth.pb.go b/pkg/peerhealth/peerhealth.pb.go index e97011753..6d86fdbeb 100644 --- a/pkg/peerhealth/peerhealth.pb.go +++ b/pkg/peerhealth/peerhealth.pb.go @@ -7,11 +7,10 @@ package peerhealth import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/pkg/peerhealth/peerhealth_grpc.pb.go b/pkg/peerhealth/peerhealth_grpc.pb.go index 4c1c8a582..cf883900b 100644 --- a/pkg/peerhealth/peerhealth_grpc.pb.go +++ b/pkg/peerhealth/peerhealth_grpc.pb.go @@ -4,7 +4,6 @@ package peerhealth import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index bfdee63d1..3addb2684 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -84,13 +84,13 @@ func (p *Peers) Start(ctx context.Context) error { p.log.Info("peer starting", "name", p.myNodeName) wait.UntilWithContext(ctx, func(ctx context.Context) { updateWorkerPeersError := p.updateWorkerPeers(ctx) - updateControlPlanePeersError := p.updateControlPlanePeers(ctx) + updateControlPlanePeersError := p.UpdateControlPlanePeers(ctx) if updateWorkerPeersError != nil || updateControlPlanePeersError != nil { // the default update interval is quite long, in case of an error we want to retry quicker quickCtx, quickCancel := context.WithCancel(ctx) wait.UntilWithContext(quickCtx, func(ctx context.Context) { quickUpdateWorkerPeersError := p.updateWorkerPeers(ctx) - quickUpdateControlPlanePeersError := p.updateControlPlanePeers(ctx) + quickUpdateControlPlanePeersError := p.UpdateControlPlanePeers(ctx) if quickUpdateWorkerPeersError == nil && quickUpdateControlPlanePeersError == nil { quickCancel() } @@ -102,18 +102,40 @@ func (p *Peers) Start(ctx context.Context) error { } func (p *Peers) updateWorkerPeers(ctx context.Context) error { - setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } - selectorGetter := func() labels.Selector { return p.workerPeerSelector } - return p.updatePeers(ctx, selectorGetter, setterFunc) + p.log.Info("updateWorkerPeers entered") + setterFunc := func(addresses []v1.PodIP) { + p.log.Info("updateWorkerPeers setter called", "addresses", addresses) + p.workerPeersAddresses = addresses + } + selectorGetter := func() labels.Selector { + p.log.Info("updateWorkerPeers getter called", "workerPeerSelector", p.workerPeerSelector) + return p.workerPeerSelector + } + resetFunc := func() { + p.workerPeersAddresses = []v1.PodIP{} + } + return p.updatePeers(ctx, selectorGetter, setterFunc, resetFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) error { - setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } - selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - return p.updatePeers(ctx, selectorGetter, setterFunc) +func (p *Peers) UpdateControlPlanePeers(ctx context.Context) error { + p.log.Info("UpdateControlPlanePeers entered") + + setterFunc := func(addresses []v1.PodIP) { + p.log.Info("UpdateControlPlanePeers setter called", "addresses", addresses) + p.controlPlanePeersAddresses = addresses + } + selectorGetter := func() labels.Selector { + p.log.Info("UpdateControlPlanePeers getter called", "workerPeerSelector", p.workerPeerSelector) + return p.controlPlanePeerSelector + } + resetFunc := func() { + p.controlPlanePeersAddresses = []v1.PodIP{} + } + return p.updatePeers(ctx, selectorGetter, setterFunc, resetFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP), + resetPeers func()) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -121,16 +143,19 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec defer cancel() nodes := v1.NodeList{} + // get some nodes, but not ourself if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil { if apierrors.IsNotFound(err) { // we are the only node at the moment... reset peerList - p.workerPeersAddresses = []v1.PodIP{} + resetPeers() } p.log.Error(err, "failed to update peer list") return pkgerrors.Wrap(err, "failed to update peer list") } + p.log.Info("updatePeers", "nodes", nodes) + pods := v1.PodList{} listOptions := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{ @@ -163,6 +188,10 @@ func (p *Peers) mapNodesToPrimaryPodIPs(nodes v1.NodeList, pods v1.PodList) ([]v addresses = append(addresses, pod.Status.PodIPs[0]) } break + } else { + p.log.Info("Skipping current node/pod combo", + "node.Name", node.Name, + "pod.Spec.NodeName", pod.Spec.NodeName) } } if !found { @@ -177,11 +206,16 @@ func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP { p.mutex.Lock() defer p.mutex.Unlock() + p.log.Info("GetPeersAddresses", "workerPeersAddresses", p.workerPeersAddresses, + "controlPlanePeersAddresses", p.controlPlanePeersAddresses) + var addresses []v1.PodIP if role == Worker { addresses = p.workerPeersAddresses + p.log.Info("Got a request to see how many worker peers exist", "workerPeersAddresses", p.workerPeersAddresses) } else { addresses = p.controlPlanePeersAddresses + p.log.Info("Got a request to see how many control plane peers exist", "controlPlanePeersAddresses", p.controlPlanePeersAddresses) } //we don't want the caller to be able to change the addresses //so we create a deep copy and return it diff --git a/pkg/utils/pods.go b/pkg/utils/pods.go index 7cb0ae37b..6b902e5d5 100644 --- a/pkg/utils/pods.go +++ b/pkg/utils/pods.go @@ -2,7 +2,7 @@ package utils import ( "context" - "errors" + "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -20,6 +20,7 @@ func GetSelfNodeRemediationAgentPod(nodeName string, r client.Reader) (*v1.Pod, err := r.List(context.Background(), podList, &client.ListOptions{LabelSelector: selector}) if err != nil { + err = fmt.Errorf("failed to retrieve the self-node-remediation agent pod: %w", err) return nil, err } @@ -29,5 +30,5 @@ func GetSelfNodeRemediationAgentPod(nodeName string, r client.Reader) (*v1.Pod, } } - return nil, errors.New("failed to find self node remediation pod matching the given node") + return nil, fmt.Errorf("failed to find self node remediation pod matching the given node (%s)", nodeName) } diff --git a/vendor/github.com/onsi/gomega/gcustom/make_matcher.go b/vendor/github.com/onsi/gomega/gcustom/make_matcher.go new file mode 100644 index 000000000..5372fa441 --- /dev/null +++ b/vendor/github.com/onsi/gomega/gcustom/make_matcher.go @@ -0,0 +1,270 @@ +/* +package gcustom provides a simple mechanism for creating custom Gomega matchers +*/ +package gcustom + +import ( + "fmt" + "reflect" + "strings" + "text/template" + + "github.com/onsi/gomega/format" +) + +var interfaceType = reflect.TypeOf((*interface{})(nil)).Elem() +var errInterface = reflect.TypeOf((*error)(nil)).Elem() + +var defaultTemplate = template.Must(ParseTemplate("{{if .Failure}}Custom matcher failed for:{{else}}Custom matcher succeeded (but was expected to fail) for:{{end}}\n{{.FormattedActual}}")) + +func formatObject(object any, indent ...uint) string { + indentation := uint(0) + if len(indent) > 0 { + indentation = indent[0] + } + return format.Object(object, indentation) +} + +/* +ParseTemplate allows you to precompile templates for MakeMatcher's custom matchers. + +Use ParseTemplate if you are concerned about performance and would like to avoid repeatedly parsing failure message templates. The data made available to the template is documented in the WithTemplate() method of CustomGomegaMatcher. + +Once parsed you can pass the template in either as an argument to MakeMatcher(matchFunc,