From 2b68d286ef82e569913eb841f5cdbccf1864c17b Mon Sep 17 00:00:00 2001 From: "Mark E. Scott, Jr." Date: Tue, 25 Feb 2025 17:24:15 -0600 Subject: [PATCH 01/16] Added new failing test to showcase bug ticket --- controllers/selfnoderemediation_controller.go | 10 +- controllers/tests/config/suite_test.go | 2 +- .../selfnoderemediation_controller_test.go | 645 ++++++++++++++---- controllers/tests/controller/suite_test.go | 8 +- controllers/tests/shared/shared.go | 145 +++- go.mod | 3 +- pkg/apicheck/check.go | 156 +++-- pkg/controlplane/manager.go | 7 + pkg/peers/peers.go | 56 +- pkg/utils/pods.go | 5 +- .../onsi/gomega/gcustom/make_matcher.go | 270 ++++++++ vendor/modules.txt | 1 + 12 files changed, 1088 insertions(+), 220 deletions(-) create mode 100644 vendor/github.com/onsi/gomega/gcustom/make_matcher.go diff --git a/controllers/selfnoderemediation_controller.go b/controllers/selfnoderemediation_controller.go index 34d84f72e..f8e22abaf 100644 --- a/controllers/selfnoderemediation_controller.go +++ b/controllers/selfnoderemediation_controller.go @@ -463,16 +463,20 @@ func (r *SelfNodeRemediationReconciler) remediateWithResourceRemoval(ctx context var err error switch phase { case fencingStartedPhase: + r.logger.Info("remediateWithResourceRemoval: entered fencing start phase") result, err = r.handleFencingStartedPhase(ctx, node, snr) case preRebootCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered pre reboot completed phase") result, err = r.handlePreRebootCompletedPhase(node, snr) case rebootCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered reboot completed phase") result, err = r.handleRebootCompletedPhase(node, snr, rmNodeResources) case fencingCompletedPhase: + r.logger.Info("remediateWithResourceRemoval: entered fencing complete phase") result, err = r.handleFencingCompletedPhase(node, snr) default: // this should never happen since we enforce valid values with kubebuilder - err = errors.New("unknown phase") + err = fmt.Errorf("remediateWithResourceRemoval: unknown phase (%s)", phase) r.logger.Error(err, "Undefined unknown phase", "phase", phase) } return result, err @@ -506,6 +510,7 @@ func (r *SelfNodeRemediationReconciler) prepareReboot(ctx context.Context, node } preRebootCompleted := string(preRebootCompletedPhase) + r.logger.Info("pre-reboot completed") snr.Status.Phase = &preRebootCompleted return ctrl.Result{}, nil @@ -636,7 +641,8 @@ func (r *SelfNodeRemediationReconciler) didIRebootMyself(snr *v1alpha1.SelfNodeR func (r *SelfNodeRemediationReconciler) isNodeRebootCapable(node *v1.Node) bool { //make sure that the unhealthy node has self node remediation pod on it which can reboot it if _, err := utils.GetSelfNodeRemediationAgentPod(node.Name, r.Client); err != nil { - r.logger.Error(err, "failed to get self node remediation agent pod resource") + r.logger.Error(err, "failed to get self node remediation agent pod resource, so that makes this node "+ + "not reboot capable") return false } diff --git a/controllers/tests/config/suite_test.go b/controllers/tests/config/suite_test.go index d6c2132d4..58c2a7061 100644 --- a/controllers/tests/config/suite_test.go +++ b/controllers/tests/config/suite_test.go @@ -126,7 +126,7 @@ var _ = BeforeSuite(func() { MyNodeName: shared.UnhealthyNodeName, CheckInterval: shared.ApiCheckInterval, MaxErrorsThreshold: shared.MaxErrorThreshold, - MinPeersForRemediation: shared.MinPeersForRemediation, + MinPeersForRemediation: shared.MinPeersForRemediationConfigDefaultValue, Peers: peers, Cfg: cfg, CertReader: certReader, diff --git a/controllers/tests/controller/selfnoderemediation_controller_test.go b/controllers/tests/controller/selfnoderemediation_controller_test.go index 784419088..6cdfdd22f 100644 --- a/controllers/tests/controller/selfnoderemediation_controller_test.go +++ b/controllers/tests/controller/selfnoderemediation_controller_test.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + labels2 "github.com/medik8s/common/pkg/labels" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -18,12 +19,15 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + logf "sigs.k8s.io/controller-runtime/pkg/log" machinev1beta1 "github.com/openshift/api/machine/v1beta1" + "github.com/medik8s/self-node-remediation/api" "github.com/medik8s/self-node-remediation/api/v1alpha1" "github.com/medik8s/self-node-remediation/controllers" "github.com/medik8s/self-node-remediation/controllers/tests/shared" + "github.com/medik8s/self-node-remediation/pkg/controlplane" "github.com/medik8s/self-node-remediation/pkg/utils" "github.com/medik8s/self-node-remediation/pkg/watchdog" ) @@ -32,53 +36,49 @@ const ( snrNamespace = "default" ) +var remediationStrategy v1alpha1.RemediationStrategyType + var _ = Describe("SNR Controller", func() { var snr *v1alpha1.SelfNodeRemediation - var remediationStrategy v1alpha1.RemediationStrategyType + var nodeRebootCapable = "true" - var isAdditionalSetupNeeded = false BeforeEach(func() { nodeRebootCapable = "true" - snr = &v1alpha1.SelfNodeRemediation{} - snr.Name = shared.UnhealthyNodeName - snr.Namespace = snrNamespace - snrConfig = shared.GenerateTestConfig() - time.Sleep(time.Second * 2) - // reset watchdog for each test! - dummyDog.Reset() - }) + By("Set default self-node-remediation configuration", func() { + snr = &v1alpha1.SelfNodeRemediation{} + snr.Name = shared.UnhealthyNodeName + snr.Namespace = snrNamespace + + snrConfig = shared.GenerateTestConfig() + time.Sleep(time.Second * 2) + }) - JustBeforeEach(func() { - if isAdditionalSetupNeeded { - createSelfNodeRemediationPod() - verifySelfNodeRemediationPodExist() - } - createConfig() DeferCleanup(func() { - deleteConfig() + deleteRemediations() + + //clear node's state, this is important to remove taints, label etc. + By(fmt.Sprintf("Clear node state for '%s'", shared.UnhealthyNodeName), func() { + Expect(k8sClient.Update(context.Background(), getNode(shared.UnhealthyNodeName))) + }) + + By(fmt.Sprintf("Clear node state for '%s'", shared.PeerNodeName), func() { + Expect(k8sClient.Update(context.Background(), getNode(shared.PeerNodeName))) + }) + + time.Sleep(time.Second * 2) + + deleteRemediations() + clearEvents() + verifyCleanState() }) - updateIsRebootCapable(nodeRebootCapable) }) - AfterEach(func() { - k8sClient.ShouldSimulateFailure = false - k8sClient.ShouldSimulatePodDeleteFailure = false - isAdditionalSetupNeeded = false - - By("Restore default settings for api connectivity check") - apiConnectivityCheckConfig.MinPeersForRemediation = shared.MinPeersForRemediation - - deleteRemediations() - deleteSelfNodeRemediationPod() - //clear node's state, this is important to remove taints, label etc. - Expect(k8sClient.Update(context.Background(), getNode(shared.UnhealthyNodeName))) - Expect(k8sClient.Update(context.Background(), getNode(shared.PeerNodeName))) - time.Sleep(time.Second * 2) - deleteRemediations() - clearEvents() - verifyCleanState() + JustBeforeEach(func() { + createTestConfig() + updateIsRebootCapable(nodeRebootCapable) + resetWatchdogTimer() }) It("check nodes exist", func() { @@ -162,8 +162,8 @@ var _ = Describe("SNR Controller", func() { Context("Unhealthy node with api-server access", func() { - BeforeEach(func() { - isAdditionalSetupNeeded = true + JustBeforeEach(func() { + doAdditionalSetup() }) Context("Automatic strategy - ResourceDeletion selected", func() { @@ -503,28 +503,78 @@ var _ = Describe("SNR Controller", func() { }) Context("Unhealthy node without api-server access", func() { - BeforeEach(func() { - By("Simulate api-server failure") - k8sClient.ShouldSimulateFailure = true - remediationStrategy = v1alpha1.ResourceDeletionRemediationStrategy - }) - Context("no peer found", func() { - It("Verify that watchdog is not triggered", func() { + Context("two control node peers found, they tell me I'm unhealthy", func() { + + BeforeEach(func() { + additionalNodes := []newNodeConfig{ + { + nodeName: shared.Peer2NodeName, + labels: map[string]string{ + labels2.MasterRole: "true", + }, + + pods: []newPodConfig{ + { + name: shared.SnrPodName2, + simulatedResponse: api.Unhealthy, + }, + }, + }, + { + nodeName: shared.Peer3NodeName, + labels: map[string]string{ + labels2.MasterRole: "true", + }, + pods: []newPodConfig{ + { + name: shared.SnrPodName3, + simulatedResponse: api.Unhealthy, + }, + }, + }, + } + + configureClientWrapperToRandomizePodIpAddresses() + setMinPeersForRemediation(0) + configureUnhealthyNodeAsControlNode() + addNodes(additionalNodes) + + addControlPlaneManager() + resetWatchdogTimer() + configureApiServerSimulatedFailures(true) + configureRemediationStrategy(v1alpha1.ResourceDeletionRemediationStrategy) + configureSimulatedPeerResponses(true) + }) + + It("check that we actually get a triggered watchdog reboot", func() { + // it actually should be verifyWatchdogTriggered() but this is currently proving + // that the code is broken! verifyWatchdogNotTriggered() }) }) - Context("no peer found and MinPeersForRemediation is configured to 0", func() { + Context("api-server should be failing throughout the entire test", func() { BeforeEach(func() { - By("Set MinPeersForRemedation to zero which should trigger the watchdog before the test") - apiConnectivityCheckConfig.MinPeersForRemediation = 0 + configureApiServerSimulatedFailures(true) + }) + + Context("no peer found", func() { + It("Verify that watchdog is not triggered", func() { + verifyWatchdogNotTriggered() + }) }) - It("Does not receive peer communication and since configured to need zero peers, initiates a reboot", - func() { + Context("no peer found and MinPeersForRemediation is configured to 0", func() { + BeforeEach(func() { + setMinPeersForRemediation(0) + }) + + It("Does not receive peer communication and since configured to need zero peers, "+ + "initiates a reboot", func() { verifyWatchdogTriggered() }) + }) }) }) @@ -715,26 +765,27 @@ func verifySelfNodeRemediationPodExist() { }, 5*time.Second, 250*time.Millisecond).Should(Equal(1)) } func deleteRemediations() { + By("Delete any existing remediations", func() { + Eventually(func(g Gomega) { + snrs := &v1alpha1.SelfNodeRemediationList{} + g.Expect(k8sClient.List(context.Background(), snrs)).To(Succeed()) + if len(snrs.Items) == 0 { + return + } - Eventually(func(g Gomega) { - snrs := &v1alpha1.SelfNodeRemediationList{} - g.Expect(k8sClient.List(context.Background(), snrs)).To(Succeed()) - if len(snrs.Items) == 0 { - return - } - - for _, snr := range snrs.Items { - tmpSnr := snr - g.Expect(removeFinalizers(&tmpSnr)).To(Succeed()) - g.Expect(k8sClient.Client.Delete(context.Background(), &tmpSnr)).To(Succeed()) + for _, snr := range snrs.Items { + tmpSnr := snr + g.Expect(removeFinalizers(&tmpSnr)).To(Succeed()) + g.Expect(k8sClient.Client.Delete(context.Background(), &tmpSnr)).To(Succeed()) - } + } - expectedEmptySnrs := &v1alpha1.SelfNodeRemediationList{} - g.Expect(k8sClient.List(context.Background(), expectedEmptySnrs)).To(Succeed()) - g.Expect(len(expectedEmptySnrs.Items)).To(Equal(0)) + expectedEmptySnrs := &v1alpha1.SelfNodeRemediationList{} + g.Expect(k8sClient.List(context.Background(), expectedEmptySnrs)).To(Succeed()) + g.Expect(len(expectedEmptySnrs.Items)).To(Equal(0)) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }) } func deleteSNR(snr *v1alpha1.SelfNodeRemediation) { @@ -777,42 +828,136 @@ func createSNR(snr *v1alpha1.SelfNodeRemediation, strategy v1alpha1.RemediationS ExpectWithOffset(1, k8sClient.Client.Create(context.TODO(), snr)).To(Succeed(), "failed to create snr CR") } -func createSelfNodeRemediationPod() { - pod := &v1.Pod{} - pod.Spec.NodeName = shared.UnhealthyNodeName - pod.Labels = map[string]string{"app.kubernetes.io/name": "self-node-remediation", - "app.kubernetes.io/component": "agent"} +func createGenericSelfNodeRemediationPod(node *v1.Node, podName string) (pod *v1.Pod) { + By(fmt.Sprintf("Create pod '%s' under node '%s'", podName, node.Name), func() { + pod = &v1.Pod{} + pod.Spec.NodeName = node.Name + pod.Labels = map[string]string{"app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent"} + + pod.Name = podName + pod.Namespace = shared.Namespace + // Some tests need the containers to exist + container := v1.Container{ + Name: "foo", + Image: "foo", + } + pod.Spec.Containers = []v1.Container{container} - pod.Name = "self-node-remediation" - pod.Namespace = shared.Namespace - container := v1.Container{ - Name: "foo", - Image: "foo", + ExpectWithOffset(1, k8sClient.Client.Create(context.Background(), pod)).To(Succeed(), + "failed to create self-node-remediation pod (%s) for node: '%s'", podName, node.Name) + + time.Sleep(1 * time.Second) + + verifySelfNodeRemediationPodByExistsByName(podName) + + DeferCleanup(func() { + deleteSelfNodeRemediationPod(pod, false) + }) + }) + + return +} + +func verifySelfNodeRemediationPodByExistsByName(name string) { + By(fmt.Sprintf("Verify that pod '%s' exists", name), func() { + podList := &v1.PodList{} + selector := labels.NewSelector() + nameRequirement, _ := labels.NewRequirement("app.kubernetes.io/name", selection.Equals, []string{"self-node-remediation"}) + componentRequirement, _ := labels.NewRequirement("app.kubernetes.io/component", selection.Equals, []string{"agent"}) + selector = selector.Add(*nameRequirement, *componentRequirement) + + EventuallyWithOffset(1, func() (bool, error) { + err := k8sClient.Client.List(context.Background(), podList, &client.ListOptions{LabelSelector: selector}) + for _, item := range podList.Items { + if item.Name == name { + return true, nil + } + } + return false, err + }, 5*time.Second, 250*time.Millisecond).Should(BeTrue(), "expected that we should have"+ + " found the SNR pod") + }) + + return +} + +func getSnrPods() (pods *v1.PodList) { + pods = &v1.PodList{} + + By("Listing self-node-remediation pods") + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + "app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent", + }), } - pod.Spec.Containers = []v1.Container{container} - ExpectWithOffset(1, k8sClient.Client.Create(context.Background(), pod)).To(Succeed()) + Expect(k8sClient.List(context.Background(), pods, listOptions)).To(Succeed(), + "failed to list self-node-remediation pods") + return } -func deleteSelfNodeRemediationPod() { - pod := &v1.Pod{} +func deleteSelfNodeRemediationPod(pod *v1.Pod, throwErrorIfNotFound bool) { + By(fmt.Sprintf("Attempt to delete pod '%s'", pod.Name), func() { + var grace client.GracePeriodSeconds = 0 + err := k8sClient.Client.Delete(context.Background(), pod, grace) + if throwErrorIfNotFound { + ExpectWithOffset(1, err).To(Succeed(), "there should have been no error "+ + "deleting pod '%s'", pod.Name) + } else { + ExpectWithOffset(1, err).To(Or(Succeed(), shared.IsK8sNotFoundError()), + "expected the delete operation to succeed, or for it to have told us that node '%s'"+ + " didn't exist", pod.Name) + } + + }) + + By("Check that pod: '"+pod.Name+"' was actually deleted", func() { + EventuallyWithOffset(1, func() bool { + podTestAfterDelete := &v1.Pod{} + podKey := client.ObjectKey{ + Namespace: shared.Namespace, + Name: pod.Name, + } + err := k8sClient.Client.Get(context.Background(), podKey, podTestAfterDelete) + return apierrors.IsNotFound(err) + }, 10*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) +} +func deleteSelfNodeRemediationPodByName(podName string, throwErrorIfNotFound bool) (err error) { + pod := &v1.Pod{} podKey := client.ObjectKey{ Namespace: shared.Namespace, - Name: "self-node-remediation", + Name: podName, } - if err := k8sClient.Get(context.Background(), podKey, pod); err != nil { - Expect(apierrors.IsNotFound(err)).To(BeTrue()) - return - } + By(fmt.Sprintf("Attempting to get pod '%s' before deleting it", podName), func() { + if err := k8sClient.Client.Get(context.Background(), podKey, pod); err != nil { + if apierrors.IsNotFound(err) && !throwErrorIfNotFound { + logf.Log.Info("pod with name '%s' not found, we're not going to do anything", podName) + err = nil + return + } - var grace client.GracePeriodSeconds = 0 - ExpectWithOffset(1, k8sClient.Client.Delete(context.Background(), pod, grace)).To(Succeed()) + err = fmt.Errorf("unable to get pod with name '%s' in order to delete it", err) + return + } + }) - EventuallyWithOffset(1, func() bool { - err := k8sClient.Client.Get(context.Background(), podKey, pod) - return apierrors.IsNotFound(err) - }, 10*time.Second, 100*time.Millisecond).Should(BeTrue()) + deleteSelfNodeRemediationPod(pod, throwErrorIfNotFound) + + return +} + +func deleteAllSelfNodeRemediationPods() { + pods := getSnrPods() + + By("Deleting self-node-remediation pods") + + for _, pod := range pods.Items { + deleteSelfNodeRemediationPod(&pod, false) + } } func createTerminatingPod() { @@ -916,42 +1061,43 @@ func eventuallyUpdateNode(updateFunc func(*v1.Node), isStatusUpdate bool) { } func verifyCleanState() { - //Verify nodes are at a clean state - nodes := &v1.NodeList{} - Expect(k8sClient.List(context.Background(), nodes)).To(Succeed()) - Expect(len(nodes.Items)).To(BeEquivalentTo(2)) - var peerNodeActual, unhealthyNodeActual *v1.Node - if nodes.Items[0].Name == shared.UnhealthyNodeName { - Expect(nodes.Items[1].Name).To(Equal(shared.PeerNodeName)) - peerNodeActual = &nodes.Items[1] - unhealthyNodeActual = &nodes.Items[0] - } else { - Expect(nodes.Items[0].Name).To(Equal(shared.PeerNodeName)) - Expect(nodes.Items[1].Name).To(Equal(shared.UnhealthyNodeName)) - peerNodeActual = &nodes.Items[0] - unhealthyNodeActual = &nodes.Items[1] - } - - peerNodeExpected, unhealthyNodeExpected := getNode(shared.PeerNodeName), getNode(shared.UnhealthyNodeName) - verifyNodesAreEqual(peerNodeExpected, peerNodeActual) - verifyNodesAreEqual(unhealthyNodeExpected, unhealthyNodeActual) + By("Verifying that test(s) and AfterTest functions properly left us at an expected state", func() { + //Verify nodes are at a clean state + nodes := &v1.NodeList{} + Expect(k8sClient.List(context.Background(), nodes)).To(Succeed()) + Expect(len(nodes.Items)).To(BeEquivalentTo(2)) + var peerNodeActual, unhealthyNodeActual *v1.Node + if nodes.Items[0].Name == shared.UnhealthyNodeName { + Expect(nodes.Items[1].Name).To(Equal(shared.PeerNodeName)) + peerNodeActual = &nodes.Items[1] + unhealthyNodeActual = &nodes.Items[0] + } else { + Expect(nodes.Items[0].Name).To(Equal(shared.PeerNodeName)) + Expect(nodes.Items[1].Name).To(Equal(shared.UnhealthyNodeName)) + peerNodeActual = &nodes.Items[0] + unhealthyNodeActual = &nodes.Items[1] + } - //Verify no existing remediations - remediations := &v1alpha1.SelfNodeRemediationList{} - Expect(k8sClient.List(context.Background(), remediations)).To(Succeed()) - Expect(len(remediations.Items)).To(BeEquivalentTo(0)) + peerNodeExpected, unhealthyNodeExpected := getNode(shared.PeerNodeName), getNode(shared.UnhealthyNodeName) + verifyNodesAreEqual(peerNodeExpected, peerNodeActual) + verifyNodesAreEqual(unhealthyNodeExpected, unhealthyNodeActual) - //Verify SNR Pod Does not exist - pod := &v1.Pod{} - podKey := client.ObjectKey{ - Namespace: shared.Namespace, - Name: "self-node-remediation", - } - err := k8sClient.Get(context.Background(), podKey, pod) - Expect(apierrors.IsNotFound(err)).To(BeTrue()) + //Verify no existing remediations + remediations := &v1alpha1.SelfNodeRemediationList{} + Expect(k8sClient.List(context.Background(), remediations)).To(Succeed()) + Expect(len(remediations.Items)).To(BeEquivalentTo(0)) - verifyOutOfServiceTaintRemoved() + //Verify SNR Pod Does not exist + pod := &v1.Pod{} + podKey := client.ObjectKey{ + Namespace: shared.Namespace, + Name: shared.SnrPodName1, + } + err := k8sClient.Get(context.Background(), podKey, pod) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + verifyOutOfServiceTaintRemoved() + }) } func verifyNodesAreEqual(expected *v1.Node, actual *v1.Node) { @@ -963,14 +1109,16 @@ func verifyNodesAreEqual(expected *v1.Node, actual *v1.Node) { } func clearEvents() { - for { - select { - case _ = <-fakeRecorder.Events: + By("Clear any events in the channel", func() { + for { + select { + case _ = <-fakeRecorder.Events: - default: - return + default: + return + } } - } + }) } func verifyEvent(eventType, reason, message string) { @@ -1016,19 +1164,20 @@ func isEventOccurred(eventType string, reason string, message string) bool { } func deleteConfig() { - snrConfigTmp := &v1alpha1.SelfNodeRemediationConfig{} - // make sure config is already created - Eventually(func(g Gomega) { - g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp)).To(Succeed()) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) - - //delete config and verify it's deleted - Expect(k8sClient.Delete(context.Background(), snrConfigTmp)).To(Succeed()) - Eventually(func(g Gomega) { - err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp) - g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) - }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) - + By("Delete SelfNodeRemediationConfig", func() { + snrConfigTmp := &v1alpha1.SelfNodeRemediationConfig{} + // make sure config is already created + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp)).To(Succeed()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + + //delete config and verify it's deleted + Expect(k8sClient.Delete(context.Background(), snrConfigTmp)).To(Succeed()) + Eventually(func(g Gomega) { + err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(snrConfig), snrConfigTmp) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) + }) } func createConfig() { @@ -1041,3 +1190,217 @@ func createConfig() { }, 10*time.Second, 100*time.Millisecond).Should(Succeed()) } + +func addControlPlaneManager() { + By("Add a control plane manager", func() { + controlPlaneMgr := controlplane.NewManager(shared.UnhealthyNodeName, k8sClient) + Expect(controlPlaneMgr.Start(context.Background())).To(Succeed(), "we should"+ + "have been able to enable a control plane manager for the current node") + + apiCheck.SetControlPlaneManager(controlPlaneMgr) + + Expect(apiConnectivityCheckConfig.Peers.UpdateControlPlanePeers(context.Background())).To(Succeed()) + + DeferCleanup(func() { + By("Removing the control plane manager", func() { + apiCheck.SetControlPlaneManager(nil) + Expect(apiConnectivityCheckConfig.Peers.UpdateControlPlanePeers(context.Background())).To(Succeed()) + }) + }) + }) +} + +func setMinPeersForRemediation(minimumNumberOfPeers int) { + + By(fmt.Sprintf("Setting MinPeersForRemediation to %d", minimumNumberOfPeers), func() { + orgMinPeersForRemediation := apiConnectivityCheckConfig.MinPeersForRemediation + apiConnectivityCheckConfig.MinPeersForRemediation = minimumNumberOfPeers + + time.Sleep(1 * time.Second) + + DeferCleanup(func() { + By("Restore MinPeersForRemediation back to its default value", func() { + apiConnectivityCheckConfig.MinPeersForRemediation = orgMinPeersForRemediation + }) + }) + }) + +} + +func configureClientWrapperToRandomizePodIpAddresses() { + By("Configure k8s client wrapper to return random IP address for pods", func() { + orgValue := k8sClient.ShouldReturnRandomPodIPs + + k8sClient.ShouldReturnRandomPodIPs = true + + DeferCleanup(func() { + By(fmt.Sprintf("Restore k8s client wrapper random pod IP address generation to %t", + orgValue), func() { + k8sClient.ShouldReturnRandomPodIPs = orgValue + }) + + return + }) + }) +} + +func configureUnhealthyNodeAsControlNode() { + var unhealthyNode = &v1.Node{} + By("Getting the existing unhealthy node object", func() { + Expect(k8sClient.Client.Get(context.TODO(), unhealthyNodeNamespacedName, unhealthyNode)). + To(Succeed(), "failed to get the unhealthy node object") + + logf.Log.Info("Successfully retrieved node", "unhealthyNode", + unhealthyNode) + }) + + By("Set the existing unhealthy node as a control node", func() { + previousRole := unhealthyNode.Labels[labels2.MasterRole] + unhealthyNode.Labels[labels2.MasterRole] = "true" + Expect(k8sClient.Update(context.TODO(), unhealthyNode)).To(Succeed(), "failed to update unhealthy node") + + DeferCleanup(func() { + By("Revert the unhealthy node's role to its previous value", func() { + unhealthyNode.Labels[labels2.MasterRole] = previousRole + }) + }) + }) +} + +func configureSimulatedPeerResponses(simulateResponses bool) { + By("Start simulating peer responses", func() { + orgValue := apiCheck.ShouldSimulatePeerResponses + apiCheck.ShouldSimulatePeerResponses = simulateResponses + + DeferCleanup(func() { + apiCheck.ShouldSimulatePeerResponses = orgValue + }) + }) +} + +func configureApiServerSimulatedFailures(simulateResponses bool) { + By("Configure k8s client to simulate API server failures", func() { + orgValue := k8sClient.ShouldSimulateFailure + k8sClient.ShouldSimulateFailure = simulateResponses + + DeferCleanup(func() { + By(fmt.Sprintf("Restore k8s client config value for API server failure simulation to %t", + orgValue), func() { + k8sClient.ShouldSimulateFailure = orgValue + }) + + }) + }) + +} + +func configureRemediationStrategy(newRemediationStrategy v1alpha1.RemediationStrategyType) { + By(fmt.Sprintf("Configure remediation strategy to '%s'", newRemediationStrategy), func() { + orgValue := remediationStrategy + remediationStrategy = newRemediationStrategy + + DeferCleanup(func() { + By(fmt.Sprintf("Restore remediation strategy to '%s'", orgValue), func() { + remediationStrategy = orgValue + }) + + }) + }) + +} + +type newPodConfig struct { + name string + simulatedResponse api.HealthCheckResponseCode +} +type newNodeConfig struct { + nodeName string + labels map[string]string + + pods []newPodConfig +} + +func addNodes(nodes []newNodeConfig) { + By("Add peer nodes & pods", func() { + + for _, np := range nodes { + newNode := getNode(np.nodeName) + + for key, value := range np.labels { + newNode.Labels[key] = value + } + + By(fmt.Sprintf("Create node '%s'", np.nodeName), func() { + Expect(k8sClient.Create(context.Background(), newNode)).To(Succeed(), + "failed to create peer node '%s'", np.nodeName) + + DeferCleanup(func() { + By(fmt.Sprintf("Remove node '%s'", np.nodeName), func() { + Expect(k8sClient.Delete(context.Background(), newNode)).To(Or(Succeed(), shared.IsK8sNotFoundError())) + }) + }) + }) + + createdNode := &v1.Node{} + + By("Check that the newly created node actually was created", func() { + namespace := client.ObjectKey{ + Name: np.nodeName, + Namespace: "", + } + Eventually(func() error { + return k8sClient.Client.Get(context.TODO(), namespace, createdNode) + }, 10*time.Second, 250*time.Millisecond).Should(BeNil()) + Expect(createdNode.Name).To(Equal(np.nodeName)) + Expect(createdNode.CreationTimestamp).ToNot(BeZero()) + }) + + for _, pod := range np.pods { + By(fmt.Sprintf("Create pod '%s' under node '%s'", pod.name, np.nodeName), func() { + _ = createGenericSelfNodeRemediationPod(createdNode, pod.name) + apiCheck.SimulatePeerResponses = append(apiCheck.SimulatePeerResponses, pod.simulatedResponse) + }) + + } + + } + + DeferCleanup(func() { + By("Removing the additional peer nodes when all relevant tests are complete", func() { + Expect(k8sClient.Delete(context.Background(), getNode(shared.Peer2NodeName))).To(Succeed()) + Expect(k8sClient.Delete(context.Background(), getNode(shared.Peer3NodeName))).To(Succeed()) + }) + return + }) + + }) +} + +func resetWatchdogTimer() { + By("Resetting watchdog timer", func() { + dummyDog.Reset() + }) +} + +func doAdditionalSetup() { + By("Perform additional setups", func() { + + By("Create the default self-node-remediation agent pod", func() { + snrPod := createGenericSelfNodeRemediationPod(unhealthyNode, shared.SnrPodName1) + verifySelfNodeRemediationPodExist() + + DeferCleanup(func() { + deleteSelfNodeRemediationPod(snrPod, false) + }) + }) + }) +} + +func createTestConfig() { + By("Create test configuration", func() { + createConfig() + DeferCleanup(func() { + deleteConfig() + }) + }) +} diff --git a/controllers/tests/controller/suite_test.go b/controllers/tests/controller/suite_test.go index 070b26f54..d806c420e 100644 --- a/controllers/tests/controller/suite_test.go +++ b/controllers/tests/controller/suite_test.go @@ -60,6 +60,7 @@ var ( k8sClient *shared.K8sClientWrapper fakeRecorder *record.FakeRecorder snrConfig *selfnoderemediationv1alpha1.SelfNodeRemediationConfig + apiCheck *shared.ApiConnectivityCheckWrapper apiConnectivityCheckConfig *apicheck.ApiConnectivityCheckConfig ) @@ -149,6 +150,7 @@ var _ = BeforeSuite(func() { peerApiServerTimeout := 5 * time.Second peers := peers.New(shared.UnhealthyNodeName, shared.PeerUpdateInterval, k8sClient, ctrl.Log.WithName("peers"), peerApiServerTimeout) + err = k8sManager.Add(peers) Expect(err).ToNot(HaveOccurred()) @@ -161,9 +163,11 @@ var _ = BeforeSuite(func() { Peers: peers, Rebooter: rebooter, Cfg: cfg, - MinPeersForRemediation: shared.MinPeersForRemediation, + MinPeersForRemediation: shared.MinPeersForRemediationConfigDefaultValue, } - apiCheck := apicheck.New(apiConnectivityCheckConfig, nil) + + apiCheck = shared.NewApiConnectivityCheckWrapper(apiConnectivityCheckConfig, nil) + err = k8sManager.Add(apiCheck) Expect(err).ToNot(HaveOccurred()) diff --git a/controllers/tests/shared/shared.go b/controllers/tests/shared/shared.go index 3e424b879..1b889f5f3 100644 --- a/controllers/tests/shared/shared.go +++ b/controllers/tests/shared/shared.go @@ -3,30 +3,48 @@ package shared import ( "context" "errors" + "net" + logf "sigs.k8s.io/controller-runtime/pkg/log" "time" + "github.com/google/uuid" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + types2 "github.com/onsi/gomega/types" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + selfNodeRemediation "github.com/medik8s/self-node-remediation/api" selfnoderemediationv1alpha1 "github.com/medik8s/self-node-remediation/api/v1alpha1" + "github.com/medik8s/self-node-remediation/pkg/apicheck" + "github.com/medik8s/self-node-remediation/pkg/controlplane" "github.com/medik8s/self-node-remediation/pkg/reboot" ) const ( - PeerUpdateInterval = 30 * time.Second - ApiCheckInterval = 1 * time.Second - MaxErrorThreshold = 1 - MinPeersForRemediation = 1 + PeerUpdateInterval = 30 * time.Second + ApiCheckInterval = 1 * time.Second + MaxErrorThreshold = 1 // CalculatedRebootDuration is the mock calculator's result CalculatedRebootDuration = 3 * time.Second Namespace = "self-node-remediation" UnhealthyNodeName = "node1" PeerNodeName = "node2" - DsDummyImageName = "dummy-image" + Peer2NodeName = "node3" + Peer3NodeName = "node4" + + SnrPodName1 = "self-node-remediation" + SnrPodName2 = "self-node-remediation-2" + SnrPodName3 = "self-node-remediation-3" + DsDummyImageName = "dummy-image" + + K8sClientReturnRandomPodIPAddressesByDefault = false + + MinPeersForRemediationConfigDefaultValue = 1 ) type K8sClientWrapper struct { @@ -35,17 +53,81 @@ type K8sClientWrapper struct { ShouldSimulateFailure bool ShouldSimulatePodDeleteFailure bool SimulatedFailureMessage string + ShouldReturnRandomPodIPs bool +} + +type ApiConnectivityCheckWrapper struct { + apicheck.ApiConnectivityCheck + ShouldSimulatePeerResponses bool + + // store responses that we should override for any peer responses + SimulatePeerResponses []selfNodeRemediation.HealthCheckResponseCode } -func (kcw *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - if kcw.ShouldSimulateFailure { - return errors.New("simulation of client error") - } else if kcw.ShouldSimulatePodDeleteFailure { +func (kcw *K8sClientWrapper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + switch { + case kcw.ShouldSimulateFailure: + err = errors.New("simulation of client error") + return + case kcw.ShouldSimulatePodDeleteFailure: if _, ok := list.(*corev1.NamespaceList); ok { - return errors.New(kcw.SimulatedFailureMessage) + err = errors.New(kcw.SimulatedFailureMessage) + return + } + fallthrough + default: + err = kcw.Client.List(ctx, list, opts...) + } + + if kcw.ShouldReturnRandomPodIPs { + logf.Log.Info("Returning random IP addresses for all the pods because ShouldReturnRandomPodIPs is true") + + if podList, ok := list.(*corev1.PodList); ok { + assignRandomIpAddressesPods(podList) } } - return kcw.Client.List(ctx, list, opts...) + + return +} + +func assignRandomIpAddressesPods(pods *corev1.PodList) { + for i := range pods.Items { + pods.Items[i].Status.PodIPs = []corev1.PodIP{{IP: GetRandomIpAddress()}} + } + + return +} + +func GetRandomIpAddress() (randomIP string) { + u := uuid.New() + ip := net.IP(u[:net.IPv6len]) + randomIP = ip.String() + + return +} + +func NewApiConnectivityCheckWrapper(ck *apicheck.ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) (ckw *ApiConnectivityCheckWrapper) { + ckw = &ApiConnectivityCheckWrapper{ + ApiConnectivityCheck: *apicheck.New(ck, controlPlaneManager), + ShouldSimulatePeerResponses: false, + SimulatePeerResponses: []selfNodeRemediation.HealthCheckResponseCode{}, + } + + ckw.ApiConnectivityCheck.SetHealthStatusFunc(func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + switch { + case ckw.ShouldSimulatePeerResponses: + for _, code := range ckw.SimulatePeerResponses { + results <- code + } + + return + default: + ckw.ApiConnectivityCheck.GetDefaultPeerHealthCheckFunc()(endpointIp, results) + break + } + }) + + return } func GenerateTestConfig() *selfnoderemediationv1alpha1.SelfNodeRemediationConfig { @@ -81,3 +163,44 @@ func VerifySNRStatusExist(k8sClient client.Client, snr *selfnoderemediationv1alp g.Expect(meta.IsStatusConditionPresentAndEqual(tmpSNR.Status.Conditions, statusType, conditionStatus)).To(BeTrue()) }, 10*time.Second, 250*time.Millisecond).Should(Succeed()) } + +type K8sErrorTestingFunc func(err error) bool + +// Matches one of the k8s error types that the user wants to ignore +func IsIgnoredK8sError(k8sErrorsToIgnore []K8sErrorTestingFunc) types2.GomegaMatcher { + return gcustom.MakeMatcher(func(errorToTest error) (matches bool, err error) { + if errorToTest == nil { + matches = false + return + } + + for _, testingFunc := range k8sErrorsToIgnore { + if testingFunc(errorToTest) { + matches = true + return + } + } + + matches = false + + return + }) +} + +func IsK8sNotFoundError() types2.GomegaMatcher { + return gcustom.MakeMatcher(func(errorToTest error) (matches bool, err error) { + if errorToTest == nil { + matches = false + return + } + + if apierrors.IsNotFound(errorToTest) { + matches = true + return + } + + matches = false + + return + }) +} diff --git a/go.mod b/go.mod index 552158478..d1c457754 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( sigs.k8s.io/controller-runtime v0.15.0 ) +require github.com/google/uuid v1.3.0 + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -41,7 +43,6 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect - github.com/google/uuid v1.3.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/pkg/apicheck/check.go b/pkg/apicheck/check.go index 55a708bc3..9151a2e41 100644 --- a/pkg/apicheck/check.go +++ b/pkg/apicheck/check.go @@ -35,14 +35,17 @@ const ( type ApiConnectivityCheck struct { client.Reader - config *ApiConnectivityCheckConfig - errorCount int - timeOfLastPeerResponse time.Time - clientCreds credentials.TransportCredentials - mutex sync.Mutex - controlPlaneManager *controlplane.Manager + config *ApiConnectivityCheckConfig + errorCount int + timeOfLastPeerResponse time.Time + clientCreds credentials.TransportCredentials + mutex sync.Mutex + controlPlaneManager *controlplane.Manager + getHealthStatusFromRemoteFunc GetHealthStatusFromRemoteFunc } +type GetHealthStatusFromRemoteFunc func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) + type ApiConnectivityCheckConfig struct { Log logr.Logger MyNodeName string @@ -62,13 +65,70 @@ type ApiConnectivityCheckConfig struct { Recorder record.EventRecorder } -func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) *ApiConnectivityCheck { - return &ApiConnectivityCheck{ +func New(config *ApiConnectivityCheckConfig, controlPlaneManager *controlplane.Manager) (c *ApiConnectivityCheck) { + c = &ApiConnectivityCheck{ config: config, mutex: sync.Mutex{}, controlPlaneManager: controlPlaneManager, timeOfLastPeerResponse: time.Now(), } + + c.SetHealthStatusFunc(c.GetDefaultPeerHealthCheckFunc()) + + return +} + +func (c *ApiConnectivityCheck) GetDefaultPeerHealthCheckFunc() (fun GetHealthStatusFromRemoteFunc) { + + fun = func(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + logger := c.config.Log.WithValues("IP", endpointIp.IP) + logger.Info("getting health status from peer") + + if err := c.initClientCreds(); err != nil { + logger.Error(err, "failed to init client credentials") + results <- selfNodeRemediation.RequestFailed + return + } + + // TODO does this work with IPv6? + // MES: Yes it does, we've tested this + phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) + if err != nil { + logger.Error(err, "failed to init grpc client") + results <- selfNodeRemediation.RequestFailed + return + } + defer phClient.Close() + + effectiveTimeout := c.getEffectivePeerRequestTimeout() + ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout) + defer cancel() + + resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{ + NodeName: c.config.MyNodeName, + MachineName: c.config.MyMachineName, + }) + if err != nil { + logger.Error(err, "failed to read health response from peer") + results <- selfNodeRemediation.RequestFailed + return + } + + logger.Info("got response from peer", "status", resp.Status) + + results <- selfNodeRemediation.HealthCheckResponseCode(resp.Status) + return + } + + return +} + +func (c *ApiConnectivityCheck) GetControlPlaneManager() *controlplane.Manager { + return c.controlPlaneManager +} + +func (c *ApiConnectivityCheck) SetControlPlaneManager(manager *controlplane.Manager) { + c.controlPlaneManager = manager } func (c *ApiConnectivityCheck) Start(ctx context.Context) error { @@ -120,12 +180,27 @@ func (c *ApiConnectivityCheck) Start(ctx context.Context) error { // isConsideredHealthy keeps track of the number of errors reported, and when a certain amount of error occur within a certain // time, ask peers if this node is healthy. Returns if the node is considered to be healthy or not. func (c *ApiConnectivityCheck) isConsideredHealthy() bool { + isControlPlaneManagerNil := c.controlPlaneManager == nil + + isWorkerNode := isControlPlaneManagerNil || !c.controlPlaneManager.IsControlPlane() + + c.config.Log.Info("isConsideredHealthy called", + "isControlPlaneManagerNil", isControlPlaneManagerNil, + "isWorkerNode", isWorkerNode) + workerPeersResponse := c.getWorkerPeersResponse() - isWorkerNode := c.controlPlaneManager == nil || !c.controlPlaneManager.IsControlPlane() + if isWorkerNode { + c.config.Log.Info("isConsideredHealthy: returning result from getWorkerPeersResponse", + "workerPeersResponse.IsHealthy", workerPeersResponse.IsHealthy) return workerPeersResponse.IsHealthy } else { - return c.controlPlaneManager.IsControlPlaneHealthy(workerPeersResponse, c.canOtherControlPlanesBeReached()) + canOtherControlPlanesBeReached := c.canOtherControlPlanesBeReached() + isControlPlaneHealthy := c.controlPlaneManager.IsControlPlaneHealthy(workerPeersResponse, canOtherControlPlanesBeReached) + c.config.Log.Info("isConsideredHealthy: returning result from IsControlPlaneHealthy", + "c.canOtherControlPlanesBeReached()", canOtherControlPlanesBeReached, + "c.controlPlaneManager.IsControlPlaneHealthy", isControlPlaneHealthy) + return isControlPlaneHealthy } } @@ -137,9 +212,11 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseErrorsThresholdNotReached} } - c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy") peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker) + c.config.Log.Info("Error count exceeds threshold, trying to ask other peer nodes if I'm healthy", + "minPeersRequired", c.config.MinPeersForRemediation, "actualNumPeersFound", len(peersToAsk)) + // We check to see if we have at least the number of peers that the user has configured as required. // If we don't have this many peers (for instance there are zero peers, and the default value is set // which requires at least one peer), we don't want to remediate. In this case we have some confusion @@ -232,8 +309,15 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response { } func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool { + c.config.Log.Info("canOtherControlPlanesBeReached", "c.config.Peers", + c.config.Peers) + peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane) numOfControlPlanePeers := len(peersToAsk) + + c.config.Log.Info("Getting peer control plane addresses", "peersToAsk", + peersToAsk, "numOfControlPlanePeers", numOfControlPlanePeers) + if numOfControlPlanePeers == 0 { c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached") return false @@ -277,6 +361,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP nrAddresses := len(addresses) responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses) + c.config.Log.Info("Attempting to get health status from peers", "addresses", addresses) + for _, address := range addresses { go c.getHealthStatusFromPeer(address, responsesChan) } @@ -290,7 +376,6 @@ func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration { minimumSafeTimeout := c.config.ApiServerTimeout + v1alpha1.MinimumBuffer if c.config.PeerRequestTimeout < minimumSafeTimeout { - // Log warning about timeout adjustment c.config.Log.Info("PeerRequestTimeout is too low, using adjusted value for safety", "configuredTimeout", c.config.PeerRequestTimeout, "apiServerTimeout", c.config.ApiServerTimeout, @@ -303,47 +388,20 @@ func (c *ApiConnectivityCheck) getEffectivePeerRequestTimeout() time.Duration { return c.config.PeerRequestTimeout } -// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel -func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { - - logger := c.config.Log.WithValues("IP", endpointIp.IP) - logger.Info("getting health status from peer") - - if err := c.initClientCreds(); err != nil { - logger.Error(err, "failed to init client credentials") - results <- selfNodeRemediation.RequestFailed - return - } - - // TODO does this work with IPv6? - phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds) - if err != nil { - logger.Error(err, "failed to init grpc client") - results <- selfNodeRemediation.RequestFailed - return - } - defer phClient.Close() - - effectiveTimeout := c.getEffectivePeerRequestTimeout() - ctx, cancel := context.WithTimeout(context.Background(), effectiveTimeout) - defer cancel() - - resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{ - NodeName: c.config.MyNodeName, - MachineName: c.config.MyMachineName, - }) - if err != nil { - logger.Error(err, "failed to read health response from peer") - results <- selfNodeRemediation.RequestFailed - return - } - - logger.Info("got response from peer", "status", resp.Status) +func (c *ApiConnectivityCheck) SetHealthStatusFunc(f GetHealthStatusFromRemoteFunc) { + c.getHealthStatusFromRemoteFunc = f +} - results <- selfNodeRemediation.HealthCheckResponseCode(resp.Status) +func (c *ApiConnectivityCheck) GetHealthStatusFunc() (f GetHealthStatusFromRemoteFunc) { + f = c.getHealthStatusFromRemoteFunc return } +// GetHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel +func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) { + c.getHealthStatusFromRemoteFunc(endpointIp, results) +} + func (c *ApiConnectivityCheck) initClientCreds() error { c.mutex.Lock() defer c.mutex.Unlock() diff --git a/pkg/controlplane/manager.go b/pkg/controlplane/manager.go index f7c3aaba4..a39703e14 100644 --- a/pkg/controlplane/manager.go +++ b/pkg/controlplane/manager.go @@ -54,6 +54,10 @@ func (manager *Manager) Start(_ context.Context) error { } func (manager *Manager) IsControlPlane() bool { + manager.log.Info("Checking to see if node is a control plane node", + "nodeName", manager.nodeName, + "controlPlaneNodeExpectedValue", peers.ControlPlane, + "nodeRole", manager.nodeRole) return manager.nodeRole == peers.ControlPlane } @@ -131,6 +135,9 @@ func (manager *Manager) initializeManager() error { } func (manager *Manager) setNodeRole(node corev1.Node) { + manager.log.Info("setNodeRole called", + "labels", node.Labels) + if nodes.IsControlPlane(&node) { manager.nodeRole = peers.ControlPlane } else { diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index bfdee63d1..3addb2684 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -84,13 +84,13 @@ func (p *Peers) Start(ctx context.Context) error { p.log.Info("peer starting", "name", p.myNodeName) wait.UntilWithContext(ctx, func(ctx context.Context) { updateWorkerPeersError := p.updateWorkerPeers(ctx) - updateControlPlanePeersError := p.updateControlPlanePeers(ctx) + updateControlPlanePeersError := p.UpdateControlPlanePeers(ctx) if updateWorkerPeersError != nil || updateControlPlanePeersError != nil { // the default update interval is quite long, in case of an error we want to retry quicker quickCtx, quickCancel := context.WithCancel(ctx) wait.UntilWithContext(quickCtx, func(ctx context.Context) { quickUpdateWorkerPeersError := p.updateWorkerPeers(ctx) - quickUpdateControlPlanePeersError := p.updateControlPlanePeers(ctx) + quickUpdateControlPlanePeersError := p.UpdateControlPlanePeers(ctx) if quickUpdateWorkerPeersError == nil && quickUpdateControlPlanePeersError == nil { quickCancel() } @@ -102,18 +102,40 @@ func (p *Peers) Start(ctx context.Context) error { } func (p *Peers) updateWorkerPeers(ctx context.Context) error { - setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } - selectorGetter := func() labels.Selector { return p.workerPeerSelector } - return p.updatePeers(ctx, selectorGetter, setterFunc) + p.log.Info("updateWorkerPeers entered") + setterFunc := func(addresses []v1.PodIP) { + p.log.Info("updateWorkerPeers setter called", "addresses", addresses) + p.workerPeersAddresses = addresses + } + selectorGetter := func() labels.Selector { + p.log.Info("updateWorkerPeers getter called", "workerPeerSelector", p.workerPeerSelector) + return p.workerPeerSelector + } + resetFunc := func() { + p.workerPeersAddresses = []v1.PodIP{} + } + return p.updatePeers(ctx, selectorGetter, setterFunc, resetFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) error { - setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } - selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - return p.updatePeers(ctx, selectorGetter, setterFunc) +func (p *Peers) UpdateControlPlanePeers(ctx context.Context) error { + p.log.Info("UpdateControlPlanePeers entered") + + setterFunc := func(addresses []v1.PodIP) { + p.log.Info("UpdateControlPlanePeers setter called", "addresses", addresses) + p.controlPlanePeersAddresses = addresses + } + selectorGetter := func() labels.Selector { + p.log.Info("UpdateControlPlanePeers getter called", "workerPeerSelector", p.workerPeerSelector) + return p.controlPlanePeerSelector + } + resetFunc := func() { + p.controlPlanePeersAddresses = []v1.PodIP{} + } + return p.updatePeers(ctx, selectorGetter, setterFunc, resetFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP), + resetPeers func()) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -121,16 +143,19 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec defer cancel() nodes := v1.NodeList{} + // get some nodes, but not ourself if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil { if apierrors.IsNotFound(err) { // we are the only node at the moment... reset peerList - p.workerPeersAddresses = []v1.PodIP{} + resetPeers() } p.log.Error(err, "failed to update peer list") return pkgerrors.Wrap(err, "failed to update peer list") } + p.log.Info("updatePeers", "nodes", nodes) + pods := v1.PodList{} listOptions := &client.ListOptions{ LabelSelector: labels.SelectorFromSet(labels.Set{ @@ -163,6 +188,10 @@ func (p *Peers) mapNodesToPrimaryPodIPs(nodes v1.NodeList, pods v1.PodList) ([]v addresses = append(addresses, pod.Status.PodIPs[0]) } break + } else { + p.log.Info("Skipping current node/pod combo", + "node.Name", node.Name, + "pod.Spec.NodeName", pod.Spec.NodeName) } } if !found { @@ -177,11 +206,16 @@ func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP { p.mutex.Lock() defer p.mutex.Unlock() + p.log.Info("GetPeersAddresses", "workerPeersAddresses", p.workerPeersAddresses, + "controlPlanePeersAddresses", p.controlPlanePeersAddresses) + var addresses []v1.PodIP if role == Worker { addresses = p.workerPeersAddresses + p.log.Info("Got a request to see how many worker peers exist", "workerPeersAddresses", p.workerPeersAddresses) } else { addresses = p.controlPlanePeersAddresses + p.log.Info("Got a request to see how many control plane peers exist", "controlPlanePeersAddresses", p.controlPlanePeersAddresses) } //we don't want the caller to be able to change the addresses //so we create a deep copy and return it diff --git a/pkg/utils/pods.go b/pkg/utils/pods.go index 7cb0ae37b..6b902e5d5 100644 --- a/pkg/utils/pods.go +++ b/pkg/utils/pods.go @@ -2,7 +2,7 @@ package utils import ( "context" - "errors" + "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -20,6 +20,7 @@ func GetSelfNodeRemediationAgentPod(nodeName string, r client.Reader) (*v1.Pod, err := r.List(context.Background(), podList, &client.ListOptions{LabelSelector: selector}) if err != nil { + err = fmt.Errorf("failed to retrieve the self-node-remediation agent pod: %w", err) return nil, err } @@ -29,5 +30,5 @@ func GetSelfNodeRemediationAgentPod(nodeName string, r client.Reader) (*v1.Pod, } } - return nil, errors.New("failed to find self node remediation pod matching the given node") + return nil, fmt.Errorf("failed to find self node remediation pod matching the given node (%s)", nodeName) } diff --git a/vendor/github.com/onsi/gomega/gcustom/make_matcher.go b/vendor/github.com/onsi/gomega/gcustom/make_matcher.go new file mode 100644 index 000000000..5372fa441 --- /dev/null +++ b/vendor/github.com/onsi/gomega/gcustom/make_matcher.go @@ -0,0 +1,270 @@ +/* +package gcustom provides a simple mechanism for creating custom Gomega matchers +*/ +package gcustom + +import ( + "fmt" + "reflect" + "strings" + "text/template" + + "github.com/onsi/gomega/format" +) + +var interfaceType = reflect.TypeOf((*interface{})(nil)).Elem() +var errInterface = reflect.TypeOf((*error)(nil)).Elem() + +var defaultTemplate = template.Must(ParseTemplate("{{if .Failure}}Custom matcher failed for:{{else}}Custom matcher succeeded (but was expected to fail) for:{{end}}\n{{.FormattedActual}}")) + +func formatObject(object any, indent ...uint) string { + indentation := uint(0) + if len(indent) > 0 { + indentation = indent[0] + } + return format.Object(object, indentation) +} + +/* +ParseTemplate allows you to precompile templates for MakeMatcher's custom matchers. + +Use ParseTemplate if you are concerned about performance and would like to avoid repeatedly parsing failure message templates. The data made available to the template is documented in the WithTemplate() method of CustomGomegaMatcher. + +Once parsed you can pass the template in either as an argument to MakeMatcher(matchFunc,