Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions Tests/kaas/k8s-node-distribution/check_nodes_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Unit tests for node distribution check functions.

(c) Martin Morgenstern <[email protected]>, 4/2024
(c) Hannes Baum <[email protected]>, 5/2024
SPDX-License-Identifier: CC-BY-SA-4.0
"""

from pathlib import Path
import yaml

import pytest

from k8s_node_distribution_check import check_nodes


HERE = Path(__file__).parent


@pytest.fixture
def load_testdata():
with open(Path(HERE, "testdata", "scenarios.yaml")) as stream:
return yaml.safe_load(stream)


@pytest.mark.parametrize("yaml_key", ["success-1", "success-2"])
def test_success_single_region_warning(yaml_key, caplog, load_testdata):
data = load_testdata[yaml_key]
assert check_nodes(data.values()) == 0
assert len(caplog.records) == 2
for record in caplog.records:
assert "no distribution across multiple regions" in record.message
assert record.levelname == "WARNING"


def test_not_enough_nodes(caplog, load_testdata):
data = load_testdata["not-enough-nodes"]
assert check_nodes(data.values()) == 2
assert len(caplog.records) == 1
assert "cluster only contains a single node" in caplog.records[0].message
assert caplog.records[0].levelname == "ERROR"


@pytest.mark.parametrize("yaml_key", ["no-distribution-1", "no-distribution-2"])
def test_no_distribution(yaml_key, caplog, load_testdata):
data = load_testdata[yaml_key]
with caplog.at_level("ERROR"):
assert check_nodes(data.values()) == 2
assert len(caplog.records) == 1
record = caplog.records[0]
assert "distribution of nodes described in the standard couldn't be detected" in record.message
assert record.levelname == "ERROR"


def test_missing_label(caplog, load_testdata):
data = load_testdata["missing-labels"]
assert check_nodes(data.values()) == 2
hostid_missing_records = [
record for record in caplog.records
if "label for host-ids" in record.message
]
assert len(hostid_missing_records) == 1
assert hostid_missing_records[0].levelname == "ERROR"
24 changes: 0 additions & 24 deletions Tests/kaas/k8s-node-distribution/config.yaml.template

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
node-role.kubernetes.io/control-plane

(c) Hannes Baum <[email protected]>, 6/2023
(c) Martin Morgenstern <[email protected]>, 4/2024
License: CC-BY-SA 4.0
"""

Expand All @@ -37,29 +38,17 @@
import logging
import logging.config
import sys
import yaml


logging_config = {
"level": "INFO",
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"k8s-node-distribution-check": {
"format": "%(levelname)s: %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "k8s-node-distribution-check",
"stream": "ext://sys.stdout"
}
},
"root": {
"handlers": ["console"]
}
}

# It is important to note, that the order of these labels matters for this test.
# Since we want to check if nodes are distributed, we want to do this from bigger
# infrastructure parts to smaller ones. So we first look if nodes are distributed
# across regions, then zones and then hosts. If one of these requirements is fulfilled,
# we don't need to check anymore, since a distribution was already detected.
LABELS = (
"topology.kubernetes.io/region",
"topology.kubernetes.io/zone",
"topology.scs.community/host-id",
)

logger = logging.getLogger(__name__)

Expand All @@ -76,10 +65,12 @@ class DistributionException(BaseException):
"""Exception raised if the distribution seems to be not enough"""


class LabelException(BaseException):
"""Exception raised if a label isn't set"""


class Config:
config_path = "./config.yaml"
kubeconfig = None
logging = None


def print_usage():
Expand All @@ -97,7 +88,6 @@ def print_usage():
2 - No distribution according to the standard could be detected for the nodes available.

The following arguments can be set:
-c/--config PATH/TO/CONFIG - Path to the config file of the test script
-k/--kubeconfig PATH/TO/KUBECONFIG - Path to the kubeconfig of the server we want to check
-h - Output help
""")
Expand All @@ -108,51 +98,27 @@ def parse_arguments(argv):
config = Config()

try:
opts, args = getopt.gnu_getopt(argv, "c:k:h", ["config", "kubeconfig", "help"])
opts, args = getopt.gnu_getopt(argv, "k:t:h", ["kubeconfig=", "test=", "help"])
except getopt.GetoptError:
raise ConfigException

for opt in opts:
if opt[0] == "-h" or opt[0] == "--help":
raise HelpException
if opt[0] == "-c" or opt[0] == "--config":
config.config_path = opt[1]
if opt[0] == "-k" or opt[0] == "--kubeconfig":
config.kubeconfig = opt[1]

return config


def setup_logging(config_log):

logging.config.dictConfig(config_log)
loggers = [
logging.getLogger(name)
for name in logging.root.manager.loggerDict
if not logging.getLogger(name).level
]

for log in loggers:
log.setLevel(config_log['level'])


def initialize_config(config):
"""Initialize the configuration for the test script"""

try:
with open(config.config_path, "r") as f:
config.logging = yaml.safe_load(f)['logging']
except OSError:
logger.warning(f"The config file under {config.config_path} couldn't be found, "
f"falling back to the default config.")
finally:
# Setup logging if the config file with the relevant information could be loaded before
# Otherwise, we initialize logging with the included literal
setup_logging(config.logging or logging_config)

if config.kubeconfig is None:
raise ConfigException("A kubeconfig needs to be set in order to test a k8s cluster version.")

logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)

return config


Expand All @@ -176,77 +142,74 @@ async def get_k8s_cluster_labelled_nodes(kubeconfig, interesting_labels):
return nodes


def compare_labels(node_list, labels, node_type="master"):
def compare_labels(node_list, node_type="control"):

label_data = {key: list() for key in labels}
label_data = {key: list() for key in LABELS}

for node in node_list:
for key in labels:
for key in LABELS:
try:
label_data[key].append(node[key])
except KeyError:
logger.warning(f"The label for {key.split('/')[1]}s don't seem to be set for all nodes.")
raise LabelException(f"The label for {key.split('/')[1]}s doesn't seem to be set for all nodes.")

for label in labels:
if len(label_data[label]) < len(node_list):
logger.warning(f"The label for {label.split('/')[1]}s doesn't seem to be set for all nodes.")
for label in LABELS:
if len(set(label_data[label])) <= 1:
logger.warning(f"There seems to be no distribution across multiple {label.split('/')[1]}s "
"or labels aren't set correctly across nodes.")
else:
logger.info(f"The nodes are distributed across {str(len(set(label_data[label])))} {label.split('/')[1]}s.")
logger.info(
f"The {node_type} nodes are distributed across "
f"{str(len(set(label_data[label])))} {label.split('/')[1]}s."
)
return

if node_type == "master":
if node_type == "control":
raise DistributionException("The distribution of nodes described in the standard couldn't be detected.")
elif node_type == "worker":
logger.warning("No node distribution could be detected for the worker nodes. "
"This produces only a warning, since it is just a recommendation.")
return


async def main(argv):
try:
config = initialize_config(parse_arguments(argv))
except (OSError, ConfigException, HelpException) as e:
if hasattr(e, 'message'):
logger.error(e.message)
print_usage()
return 1

# It is important to note, that the order of these labels matters for this test.
# Since we want to check if nodes are distributed, we want to do this from bigger
# infrastructure parts to smaller ones. So we first look if nodes are distributed
# across regions, then zones and then hosts. If one of these requirements is fulfilled,
# we don't need to check anymore, since a distribution was already detected.
labels = (
"topology.kubernetes.io/region",
"topology.kubernetes.io/zone",
"topology.scs.community/host-id",
)

nodes = await get_k8s_cluster_labelled_nodes(config.kubeconfig, labels + ("node-role.kubernetes.io/control-plane", ))

def check_nodes(nodes):
if len(nodes) < 2:
logger.error("The tested cluster only contains a single node, which can't comply with the standard.")
return 2

labelled_master_nodes = [node for node in nodes if "node-role.kubernetes.io/control-plane" in node]
labelled_control_nodes = [node for node in nodes if "node-role.kubernetes.io/control-plane" in node]
try:
if len(labelled_master_nodes) >= 1:
if len(labelled_control_nodes) >= 1:
worker_nodes = [node for node in nodes if "node-role.kubernetes.io/control-plane" not in node]
# Compare the labels of both types, since we have enough of them with labels
compare_labels(labelled_master_nodes, labels, "master")
compare_labels(worker_nodes, labels, "worker")
compare_labels(labelled_control_nodes, "control")
compare_labels(worker_nodes, "worker")
else:
compare_labels(nodes, labels)
except DistributionException as e:
compare_labels(nodes)
except (DistributionException, LabelException) as e:
logger.error(str(e))
return 2

return 0


async def main(argv):
try:
config = initialize_config(parse_arguments(argv))
except (OSError, ConfigException, HelpException) as e:
if hasattr(e, 'message'):
logger.error(e.message)
print_usage()
return 1

nodes = await get_k8s_cluster_labelled_nodes(
config.kubeconfig,
LABELS + ("node-role.kubernetes.io/control-plane", )
)

return check_nodes(nodes)


if __name__ == "__main__":
return_code = asyncio.run(main(sys.argv[1:]))
sys.exit(return_code)
Loading