diff --git a/bin/bootstrap-python b/bin/bootstrap-python index fe9c5d2e0..c4a6b1d9a 100755 --- a/bin/bootstrap-python +++ b/bin/bootstrap-python @@ -51,7 +51,7 @@ then if [[ -n "${PIP_WHL_PATH}" ]] then # Bootstrap wheels are bundled in the buildpack - python3 ${PIP_WHL_PATH}/pip install $PIP_VERBOSITY_FLAGS --user --no-warn-script-location --no-index --find-links=$PIP_WHEELDIR pip setuptools wheel + python3 ${PIP_WHL_PATH}/pip install $PIP_VERBOSITY_FLAGS --user --no-warn-script-location --no-index --no-build-isolation --find-links=$PIP_WHEELDIR --use-pep517 pip setuptools wheel else # Bootstrap wheels are not bundled in the buildpack curl -sS https://bootstrap.pypa.io/pip/get-pip.py -o get-pip.py @@ -63,7 +63,7 @@ else if [ "$CF_STACK" == "cflinuxfs4" ]; then PIP_CMD="python3 $(dirname "$(command -v python3)")/pip3" fi - $PIP_CMD install --upgrade --user --no-warn-script-location pip setuptools wheel + $PIP_CMD install --upgrade --user --no-warn-script-location --use-pep517 pip setuptools wheel fi # Only download Python dependencies if they are not bundled @@ -72,9 +72,9 @@ then echo " ---> Using bundled Python dependencies" else echo " ---> Downloading Python dependencies..." - $PIP_CMD download $PIP_VERBOSITY_FLAGS -r $REQUIREMENTS_PATH --prefer-binary -d $PIP_WHEELDIR + $PIP_CMD download $PIP_VERBOSITY_FLAGS --use-pep517 -r $REQUIREMENTS_PATH --prefer-binary -d $PIP_WHEELDIR fi echo " ---> Installing Python dependencies to ${SITE_PACKAGES_PATH}..." -$PIP_CMD install $PIP_VERBOSITY_FLAGS --target $SITE_PACKAGES_PATH --prefer-binary --no-warn-script-location --no-index --find-links=$PIP_WHEELDIR -r $REQUIREMENTS_PATH +$PIP_CMD install $PIP_VERBOSITY_FLAGS --target $SITE_PACKAGES_PATH --prefer-binary --no-warn-script-location --no-index --no-build-isolation --find-links=$PIP_WHEELDIR --use-pep517 -r $REQUIREMENTS_PATH echo " ---> Finished installing Python dependencies" diff --git a/buildpack/core/nginx.py b/buildpack/core/nginx.py index 315610c5a..40d011476 100644 --- a/buildpack/core/nginx.py +++ b/buildpack/core/nginx.py @@ -23,6 +23,10 @@ "X-Permitted-Cross-Domain-Policies": r"(?i)(^all$|^none$|^master-only$|^by-content-type$|^by-ftp-filename$)", # noqa: C0301 "Origin-Trial": r"[a-zA-Z0-9:;/''\"\*_\- \.\n?=%&+]+", "X-XSS-Protection": r"(?i)(^0$|^1$|^1; mode=block$|^1; report=https?://([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]))*(:\d+)?$)", # noqa: C0301 + "Cross-Origin-Resource-Policy": r"(?i)(^same-origin$|^same-site$|^cross-origin$)", + "Cross-Origin-Opener-Policy": r"(?i)(^unsafe-none$|^same-origin$|^same-origin-allow-popups$|^noopener-allow-popups$)", + "Cross-Origin-Embedder-Policy": r"(?i)(^unsafe-none$|^require-corp$|^credentialless$)", + "Clear-Site-Data": r"(?i)(^cache$|^cookies$|^storage$|^executionContexts$|^prefetchCache$|^prerenderCache$)", } CONFIG_FILE = "nginx/conf/nginx.conf" diff --git a/buildpack/databroker/__init__.py b/buildpack/databroker/__init__.py deleted file mode 100644 index cf50643e7..000000000 --- a/buildpack/databroker/__init__.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -[EXPERIMENTAL] -Add databroker components to an mx app container -""" - -import atexit -import json -import logging -import os -import sys - -import backoff -from buildpack import util -from buildpack.databroker import connect, streams -from buildpack.databroker.config_generator.scripts.configloader import ( - configinitializer, -) -from buildpack.databroker.config_generator.scripts.generators import ( - jmx as jmx_cfg_generator, -) -from buildpack.databroker.config_generator.scripts.utils import write_file -from buildpack.databroker.config_generator.templates.jmx import consumer - -DATABROKER_ENABLED_FLAG = "DATABROKER_ENABLED" -RUNTIME_DATABROKER_FLAG = "DATABROKER.ENABLED" - -APP_MODEL_HOME = "/home/vcap/app/model" -METADATA_FILE = os.path.join(APP_MODEL_HOME, "metadata.json") -DEP_FILE = os.path.join(APP_MODEL_HOME, "dependencies.json") - -MAX_DATABROKER_COMPONENT_RESTART_RETRIES = 4 - - -def is_enabled(): - if os.environ.get(DATABROKER_ENABLED_FLAG) == "true": - logging.debug("Databroker is enabled") - return True - else: - return False - - -def is_producer_app(): - if not is_enabled(): - return False - - with open(METADATA_FILE) as f: - metadata_json = json.load(f) - - db_config = metadata_json.get("DataBrokerConfiguration") - - return bool( - db_config is not None - and db_config.get("publishedServices") is not None - and len(db_config.get("publishedServices")) > 0 - ) - - -def should_run_kafka_connect(): - try: - return ( - os.environ["CF_INSTANCE_INDEX"] is not None - and int(os.environ["CF_INSTANCE_INDEX"]) == 0 - ) - except Exception: - return False - - -def stage(buildpack_dir, install_path, cache_dir): - if not is_enabled(): - return - - connect.stage(buildpack_dir, install_path, cache_dir) - streams.stage(buildpack_dir, install_path, cache_dir) - - -def update_config(m2ee): - if is_enabled(): - util.upsert_custom_runtime_setting( - m2ee, RUNTIME_DATABROKER_FLAG, "true", overwrite=True - ) - - -class Databroker: - def __init__(self): - self.kafka_connect = None - self.kafka_streams = None - self.restart_retries = 0 - self.is_producer_app = is_producer_app() - - atexit.register(self.stop) - - def __setup_configs(self, database_config): - metadata = open(METADATA_FILE, "rt") - dep = open(DEP_FILE, "rt") - complete_conf = configinitializer.unify_configs( - [metadata, dep], database_config - ) - metadata.close() - dep.close() - return complete_conf - - def get_datadog_config(self, user_checks_dir): - extra_jmx_instance_config = None - jmx_config_files = [] - if is_enabled(): - if self.is_producer_app: - # kafka connect cfg - os.makedirs( - os.path.join(user_checks_dir, "jmx_2.d"), - exist_ok=True, - ) - kafka_connect_cfg = ( - jmx_cfg_generator.generate_kafka_connect_jmx_config() - ) - kafka_connect_cfg_path = os.path.join( - user_checks_dir, "jmx_2.d", "conf.yaml" - ) - write_file( - kafka_connect_cfg_path, - kafka_connect_cfg, - ) - - # kafka streams cfg - os.makedirs( - os.path.join(user_checks_dir, "jmx_3.d"), - exist_ok=True, - ) - kafka_streams_cfg = ( - jmx_cfg_generator.generate_kafka_streams_jmx_config() - ) - kafka_streams_cfg_path = os.path.join( - user_checks_dir, "jmx_3.d", "conf.yaml" - ) - write_file( - kafka_streams_cfg_path, - kafka_streams_cfg, - ) - jmx_config_files = [ - kafka_connect_cfg_path, - kafka_streams_cfg_path, - ] - else: - # consumer metrics setup - extra_jmx_instance_config = consumer.jmx_metrics - return (extra_jmx_instance_config, jmx_config_files) - - def run(self, database_config): - if not self.is_producer_app: - return - logging.info("Databroker: Initializing components") - - try: - complete_conf = self.__setup_configs(database_config) - if should_run_kafka_connect(): - self.kafka_connect = connect.run(complete_conf) - self.kafka_streams = streams.run(complete_conf) - logging.info("Databroker: Initialization complete") - except Exception as ex: - logging.error("Databroker: Initialization failed due to %s", ex) - raise Exception("Databroker initailization failed") from ex - - if not self.restart_if_any_component_not_healthy(): - logging.error( - "Databroker: component restart retries exhaused. Stopping the app" - ) - sys.exit(0) - - def stop(self): - if not self.is_producer_app: - return - - if self.kafka_connect: - self.kafka_connect.stop() - - if self.kafka_streams: - self.kafka_streams.stop() - - def kill(self): - if not self.is_producer_app: - return - if self.kafka_connect: - self.kafka_connect.kill() - if self.kafka_streams: - self.kafka_streams.kill() - - @backoff.on_predicate( - backoff.constant, - interval=10, - max_tries=MAX_DATABROKER_COMPONENT_RESTART_RETRIES, - ) - def restart_if_any_component_not_healthy(self): - if not self.is_producer_app: - return True - - result = True - if self.kafka_connect and not self.kafka_connect.is_alive(): - self.kafka_connect.restart() - result = False - if self.kafka_streams and not self.kafka_streams.is_alive(): - self.kafka_streams.restart() - result = False - - return result diff --git a/buildpack/databroker/config_generator/scripts/__init__.py b/buildpack/databroker/config_generator/scripts/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/buildpack/databroker/config_generator/scripts/config_env_whitelist.py b/buildpack/databroker/config_generator/scripts/config_env_whitelist.py deleted file mode 100644 index b1f89b3ed..000000000 --- a/buildpack/databroker/config_generator/scripts/config_env_whitelist.py +++ /dev/null @@ -1,14 +0,0 @@ -# This is list of allowed ENV VARS that will be converted into configuration -from buildpack.databroker.config_generator.scripts.constants import ( - ENV_VAR_BROKER_PREFIX, - NODE_COUNT_KEY, -) - -whitelist = [ - "MXRUNTIME_DatabaseType", - "MXRUNTIME_DatabaseHost", - "MXRUNTIME_DatabaseName", - "MXRUNTIME_DatabaseUserName", - "MXRUNTIME_DatabasePassword", - ENV_VAR_BROKER_PREFIX + NODE_COUNT_KEY, -] diff --git a/buildpack/databroker/config_generator/scripts/configloader/__init__.py b/buildpack/databroker/config_generator/scripts/configloader/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/buildpack/databroker/config_generator/scripts/configloader/configinitializer.py b/buildpack/databroker/config_generator/scripts/configloader/configinitializer.py deleted file mode 100644 index 485821c84..000000000 --- a/buildpack/databroker/config_generator/scripts/configloader/configinitializer.py +++ /dev/null @@ -1,168 +0,0 @@ -import json -import os -import logging -from omegaconf import OmegaConf - -from buildpack import util -from buildpack.databroker.config_generator.scripts.utils import ( - convert_dot_field_to_dict, - get_value_for_constant, -) -from buildpack.databroker.config_generator.scripts.constants import ( - ENV_VAR_RUNTIME_PREFIX, - ENV_VAR_BROKER_PREFIX, - CONSTANTS_ENV_VAR_PREFIX, - BOOTSTRAP_SERVERS_KEY, - SUPPORTED_DBS, - POSTGRESQL_MAX_TABLE_LENGTH, - NODE_COUNT_KEY, -) -from buildpack.databroker.config_generator.scripts.config_env_whitelist import ( - whitelist, -) - - -# variables generated with this method are going to have a single key of type "a.b.c" -# they are not nested -# this will be possible in a future version of OmegaConf -def __curate_key(key, prefix, replace_underscores=True): - new_key = key.replace(prefix, "", 1) - if replace_underscores: - new_key = new_key.replace("_", ".") - return new_key - - -def __generate_source_topic_names(config): - for service in config.DataBrokerConfiguration.publishedServices: - for entity in service.entities: - entity.rawTopic = "{}.{}.{}.{}".format( - config.DatabaseName, - "public", - entity.originalEntityName.replace(".", "_").lower(), - "private", - ) - - -def validate_config(complete_conf): - # check supported dbs - if complete_conf.DatabaseType.lower() not in [db.lower() for db in SUPPORTED_DBS]: - raise Exception( - f"{complete_conf.DatabaseType} is not supported." - f"Supported dbs: {SUPPORTED_DBS}" - ) - - # validate objectname length & constants - for published_service in complete_conf.DataBrokerConfiguration.publishedServices: - if not get_value_for_constant(complete_conf, published_service.brokerUrl): - raise Exception(f"No Constants found for {published_service.brokerUrl}") - for entity in published_service.entities: - if len(entity.publicEntityName) > POSTGRESQL_MAX_TABLE_LENGTH: - raise Exception( - f"Entity {entity.publicEntityName}'s name is too long. " - f"Max length of {POSTGRESQL_MAX_TABLE_LENGTH} supported" - ) - - # check if bootstrap server is empty - if not complete_conf.bootstrap_servers: - raise Exception("Broker URL not specified") - - -def unify_configs(configs, database_config, parameters_replacement=None): - if parameters_replacement is None: - parameters_replacement = {} - complete_conf = load_config(configs, database_config, parameters_replacement) - validate_config(complete_conf) - return complete_conf - - -def load_config(configs, database_config, parameters_replacement): - loaded_json = [] - - for config in configs: - try: - tmp_json = json.loads(config.read()) - except Exception as exception: - raise ( - f"Error loading input file called {config.name}." - f"Reason: '{exception}'" - ) - # Special check for metadata files, if they exist the idea is to replace the - # non existent constants with their default values - if ( - config.name.endswith("metadata.json") - and tmp_json["Constants"] - and isinstance(tmp_json["Constants"], list) - ): - tmp_json["Constants"] = dict( - map( - lambda constant: ( - constant["Name"], - constant["DefaultValue"], - ), - tmp_json["Constants"], - ) - ) - - loaded_json.append(convert_dot_field_to_dict(tmp_json)) - - modified_env_vars = OmegaConf.create() - if database_config: - modified_env_vars.update(database_config) - - for prefix in [ENV_VAR_RUNTIME_PREFIX, ENV_VAR_BROKER_PREFIX]: - env_vars = dict( - filter( - lambda key: key[0].startswith(prefix) and key[0] in whitelist, - dict(os.environ).items(), - ) - ) - for key, value in env_vars.items(): - new_key = __curate_key(key, prefix) - OmegaConf.update(modified_env_vars, new_key, value) - - # Fetch and update any constants passed as env var - const_env_vars = dict( - filter( - lambda key: key[0].startswith(CONSTANTS_ENV_VAR_PREFIX), - dict(os.environ).items(), - ) - ) - modified_constants = OmegaConf.create({"Constants": {}}) - for key, value in const_env_vars.items(): - new_key = key.replace(CONSTANTS_ENV_VAR_PREFIX, "", 1) - new_key = new_key.replace("_", ".", 1) - OmegaConf.update(modified_constants.Constants, new_key, value) - - parameters_replacement_dict = OmegaConf.create() - for key, value in parameters_replacement: - OmegaConf.update(parameters_replacement_dict, key, value) - - try: - complete_conf = OmegaConf.merge( - *loaded_json, - modified_env_vars, - modified_constants, - parameters_replacement_dict, - ) - bootstrap_servers = get_value_for_constant( - complete_conf, - complete_conf.DataBrokerConfiguration.publishedServices[0].brokerUrl, - ) - OmegaConf.update(complete_conf, BOOTSTRAP_SERVERS_KEY, bootstrap_servers) - - if not OmegaConf.select(complete_conf, NODE_COUNT_KEY): - complete_conf[NODE_COUNT_KEY] = 1 - - __generate_source_topic_names(complete_conf) - - OmegaConf.update( - complete_conf, - "log_level", - "DEBUG" if util.get_buildpack_loglevel() == logging.DEBUG else "INFO", - ) - - return complete_conf - except Exception as exception: - raise Exception( - "Error while reading input config files. " f"Reason: '{exception}'" - ) from exception diff --git a/buildpack/databroker/config_generator/scripts/constants.py b/buildpack/databroker/config_generator/scripts/constants.py deleted file mode 100644 index 0fae61191..000000000 --- a/buildpack/databroker/config_generator/scripts/constants.py +++ /dev/null @@ -1,13 +0,0 @@ -POSTGRESQL_NAME = "postgresql" -POSTGRESQL_DEFAULT_PORT = 5432 -POSTGRESQL_MAX_TABLE_LENGTH = 60 -ENV_BROKER_USERNAME = "broker.username" -ENV_BROKER_PASSWORD = "broker.password" -ENV_VAR_RUNTIME_PREFIX = "MXRUNTIME_" -ENV_VAR_BROKER_PREFIX = "DATABROKER_" -NODE_COUNT_KEY = "NodeCount" -METADATA_CONSTANTS = "Constants" -CONSTANTS_ENV_VAR_PREFIX = "MX_" -BOOTSTRAP_SERVERS_KEY = "bootstrap_servers" -SUPPORTED_DBS = [POSTGRESQL_NAME] -JAVA_BIN_PATH = "/home/vcap/app/.local/bin/java" diff --git a/buildpack/databroker/config_generator/scripts/generators/__init__.py b/buildpack/databroker/config_generator/scripts/generators/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/buildpack/databroker/config_generator/scripts/generators/azkarra.py b/buildpack/databroker/config_generator/scripts/generators/azkarra.py deleted file mode 100644 index aed8cad28..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/azkarra.py +++ /dev/null @@ -1,24 +0,0 @@ -from buildpack.databroker import streams -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) -from omegaconf import OmegaConf - - -def generate_config(config): - env = template_engine_instance() - template = env.get_template("azkarra.conf.j2") - azkarra_component_paths = ( - OmegaConf.select(config, "azkarra.component.paths") or None - ) - azkarra_home = OmegaConf.select(config, "azkarra.home") or None - broker_password = OmegaConf.select(config, "broker.password") or None - broker_username = OmegaConf.select(config, "broker.username") or None - return template.render( - config, - azkarra_component_paths=azkarra_component_paths, - azkarra_home=azkarra_home, - broker_password=broker_password, - broker_username=broker_username, - stream_version=streams.get_pdr_stream_version(), - ) diff --git a/buildpack/databroker/config_generator/scripts/generators/debezium.py b/buildpack/databroker/config_generator/scripts/generators/debezium.py deleted file mode 100644 index 60f55cbc6..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/debezium.py +++ /dev/null @@ -1,22 +0,0 @@ -from buildpack.databroker.config_generator.scripts.generators.debezium_configs.debezium_interface import ( # noqa: C0301 - DebeziumInterface, -) -from buildpack.databroker.config_generator.scripts.generators.debezium_configs.debezium_default import ( # noqa: C0301 - DebeziumDefault, -) - -# Do not remove this import, it allows automatic class load -from buildpack.databroker.config_generator.scripts.generators.debezium_configs import * # noqa: C0301, F403 - - -def generate_config(config): - subclasses = [ - cls(config) - for cls in filter( - lambda x: x != DebeziumDefault, DebeziumInterface.__subclasses__() - ) - ] - config_generator = DebeziumDefault(subclasses) - debezium_config = config_generator.generate_config() - - return debezium_config diff --git a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/__init__.py b/buildpack/databroker/config_generator/scripts/generators/debezium_configs/__init__.py deleted file mode 100644 index cba3ce02a..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -from inspect import isclass -from pkgutil import iter_modules -from pathlib import Path -from importlib import import_module -from .debezium_interface import DebeziumInterface - -# iterate through the modules in the current package -package_dir = Path(__file__).resolve().parent -for (_, module_name, _) in iter_modules([str(package_dir)]): - - # import the module and iterate through its attributes - module = import_module(f"{__name__}.{module_name}") - for attribute_name in dir(module): - attribute = getattr(module, attribute_name) - - if isclass(attribute) and issubclass(attribute, DebeziumInterface): - # Add the class to this package's variables - globals()[attribute_name] = attribute diff --git a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_default.py b/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_default.py deleted file mode 100644 index db2c9414f..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_default.py +++ /dev/null @@ -1,23 +0,0 @@ -from buildpack.databroker.config_generator.scripts.generators.debezium_configs.debezium_interface import ( # noqa: C0301 - DebeziumInterface, -) - - -class DebeziumDefault(DebeziumInterface): - def __init__(self, dbz_generators): - config_generator = next( - (idx for idx in dbz_generators if idx.is_generator()), None - ) - if config_generator is not None: - self.config_generator = config_generator - else: - self.config_generator = self - - def is_generator(self) -> bool: - return True - - def generate_config(self) -> dict: - if self.config_generator != self: - return self.config_generator.generate_config() - else: - return {} diff --git a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_interface.py b/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_interface.py deleted file mode 100644 index 567c8a550..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/debezium_interface.py +++ /dev/null @@ -1,11 +0,0 @@ -import abc - - -class DebeziumInterface(object, metaclass=abc.ABCMeta): - @abc.abstractmethod - def is_generator(self) -> bool: - pass - - @abc.abstractmethod - def generate_config(self) -> dict: - pass diff --git a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/postgres.py b/buildpack/databroker/config_generator/scripts/generators/debezium_configs/postgres.py deleted file mode 100644 index 56e4bb477..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/debezium_configs/postgres.py +++ /dev/null @@ -1,56 +0,0 @@ -from buildpack.databroker.config_generator.scripts.generators.debezium_configs.debezium_interface import ( # noqa: C0301 - DebeziumInterface, -) -from buildpack.databroker.config_generator.scripts.constants import ( - POSTGRESQL_NAME, - POSTGRESQL_DEFAULT_PORT, -) -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) -from functools import reduce - - -class PostgresConfig(DebeziumInterface): - def __init__(self, config): - self.config = config - - def is_generator(self): - return str(self.config.DatabaseType).lower() == POSTGRESQL_NAME - - def __parse_whitelist(self, entities): - def create_whitelist_strings(whitelist, entity): - t = ".*{}.*,".format(entity["originalEntityName"]) - whitelist["table"] += t - whitelist["column"] += t + entity["originalEntityName"] + ".id," - return whitelist - - whitelist = reduce( - create_whitelist_strings, entities, {"table": "", "column": ""} - ) - whitelist["table"] = whitelist["table"][:-1] - whitelist["column"] = whitelist["column"][:-1] - - return whitelist - - def generate_config(self): - env = template_engine_instance() - template = env.get_template("debezium/postgres.json.j2") - whitelist = self.__parse_whitelist( - self.config.DataBrokerConfiguration.publishedServices[0].entities - ) - database_data = self.config.DatabaseHost.split(":") - database_hostname = database_data[0] - if len(database_data) > 1: - # custom port provided - database_port = database_data[1] - else: - # use default port - database_port = POSTGRESQL_DEFAULT_PORT - return template.render( - self.config, - table_whitelist=whitelist["table"], - column_whitelist=whitelist["column"], - database_hostname=database_hostname, - database_port=database_port, - ) diff --git a/buildpack/databroker/config_generator/scripts/generators/jmx.py b/buildpack/databroker/config_generator/scripts/generators/jmx.py deleted file mode 100644 index 6a4c5ff43..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/jmx.py +++ /dev/null @@ -1,18 +0,0 @@ -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) -from buildpack.databroker.config_generator.scripts.constants import ( - JAVA_BIN_PATH, -) - - -def generate_kafka_connect_jmx_config(): - env = template_engine_instance() - template = env.get_template("jmx/kafka-connect.yaml.j2") - return template.render(java_path=JAVA_BIN_PATH) - - -def generate_kafka_streams_jmx_config(): - env = template_engine_instance() - template = env.get_template("jmx/kafka-streams.yaml.j2") - return template.render(java_path=JAVA_BIN_PATH) diff --git a/buildpack/databroker/config_generator/scripts/generators/kafka_connect.py b/buildpack/databroker/config_generator/scripts/generators/kafka_connect.py deleted file mode 100644 index a8273aa2b..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/kafka_connect.py +++ /dev/null @@ -1,9 +0,0 @@ -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) - - -def generate_config(config): - env = template_engine_instance() - template = env.get_template("kafka-connect.properties.j2") - return template.render(config) diff --git a/buildpack/databroker/config_generator/scripts/generators/loggers.py b/buildpack/databroker/config_generator/scripts/generators/loggers.py deleted file mode 100644 index 34709bf7b..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/loggers.py +++ /dev/null @@ -1,9 +0,0 @@ -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) - - -def generate_kafka_connect_logging_config(config): - env = template_engine_instance() - template = env.get_template("logging/debezium-log4j.properties") - return template.render(config) diff --git a/buildpack/databroker/config_generator/scripts/generators/stream.py b/buildpack/databroker/config_generator/scripts/generators/stream.py deleted file mode 100644 index 28b3501d8..000000000 --- a/buildpack/databroker/config_generator/scripts/generators/stream.py +++ /dev/null @@ -1,17 +0,0 @@ -import json -from buildpack.databroker.config_generator.scripts.utils import ( - template_engine_instance, -) - - -def generate_config(config): - topologies = {"topologies": []} - env = template_engine_instance() - template = env.get_template("streaming_producer.json.j2") - for service in config.DataBrokerConfiguration.publishedServices: - for entity in service.entities: - renderedTemplate = template.render(entity=entity) - renderedTemplateAsJson = json.loads(renderedTemplate) - topologies["topologies"].append(renderedTemplateAsJson) - - return json.dumps(topologies) diff --git a/buildpack/databroker/config_generator/scripts/utils.py b/buildpack/databroker/config_generator/scripts/utils.py deleted file mode 100644 index d90673eb6..000000000 --- a/buildpack/databroker/config_generator/scripts/utils.py +++ /dev/null @@ -1,41 +0,0 @@ -from functools import reduce -from jinja2 import Environment, FileSystemLoader -from omegaconf import OmegaConf -from buildpack.databroker.config_generator.scripts.constants import ( - METADATA_CONSTANTS, -) - - -def write_file(output_file_path, content): - if output_file_path is None: - print(content) - else: - try: - with open(output_file_path, "w") as f: - f.write(str(content)) - except Exception as exception: - raise Exception( - "Error while trying to write the configuration to a file. " - f"Reason: '{exception}'" - ) from exception - - -def template_engine_instance( - path="buildpack/databroker/config_generator/templates", -): - return Environment(loader=FileSystemLoader(path)) - - -def convert_dot_field_to_dict(field): - output = {} - if isinstance(field, dict): - for key, value in field.items(): - path = key.split(".") - target = reduce(lambda d, k: d.setdefault(k, {}), path[:-1], output) - target[path[-1]] = convert_dot_field_to_dict(value) - return output - return field - - -def get_value_for_constant(conf, key): - return OmegaConf.select(conf, f"{METADATA_CONSTANTS}.{key}") diff --git a/buildpack/databroker/config_generator/templates/__init__.py b/buildpack/databroker/config_generator/templates/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/buildpack/databroker/config_generator/templates/azkarra.conf.j2 b/buildpack/databroker/config_generator/templates/azkarra.conf.j2 deleted file mode 100644 index a7e1a6736..000000000 --- a/buildpack/databroker/config_generator/templates/azkarra.conf.j2 +++ /dev/null @@ -1,13 +0,0 @@ -{ - auto.create.topics.enable = true - enable.wait.for.topics = true - azkarra { - home = "/home/vcap/app/.local/databroker/azkarra" - component.paths = "/home/vcap/app/.local/databroker/producer-streams/stream-sidecar-{{ stream_version }}" - } - streams { - bootstrap.servers = "{{ bootstrap_servers }}" - state.dir = "/var/lib/kafka-streams/" - metrics.recording.level = "INFO" - } -} diff --git a/buildpack/databroker/config_generator/templates/debezium/postgres.json.j2 b/buildpack/databroker/config_generator/templates/debezium/postgres.json.j2 deleted file mode 100644 index 0399bdd5f..000000000 --- a/buildpack/databroker/config_generator/templates/debezium/postgres.json.j2 +++ /dev/null @@ -1,33 +0,0 @@ -{ - "name": "mx-databroker-{{ DatabaseType }}-source-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": 1, - "database.hostname": "{{ database_hostname }}", - "database.port": {{ database_port }}, - "database.user": "{{ DatabaseUserName }}", - "database.password": "{{ DatabasePassword }}", - "database.dbname": "{{ DatabaseName }}", - "database.server.name": "{{ DatabaseName }}", - "table.whitelist": "{{ table_whitelist }}", - "column.whitelist": "{{ column_whitelist }}", - "slot.name": "mx_databroker", - "transforms": "route,unwrap", - "transforms.unwrap.add.fields": "table,lsn,txId,ts_ms", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.operation.header": true, - "transforms.unwrap.drop.tombstones": true, - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter", - "transforms.route.regex":"([\\w\\W]+)", - "transforms.route.replacement":"$1.private", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "heartbeat.interval.ms": 5000, - "database.initial.statements": "CREATE TABLE IF NOT EXISTS heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE)", - "heartbeat.action.query": "INSERT INTO heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts", - "schema.refresh.mode": "columns_diff_exclude_unchanged_toast", - "sanitize.field.names": true, - "snapshot.mode": "always", - "plugin.name": "wal2json" - } -} diff --git a/buildpack/databroker/config_generator/templates/jmx/__init__.py b/buildpack/databroker/config_generator/templates/jmx/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/buildpack/databroker/config_generator/templates/jmx/consumer.py b/buildpack/databroker/config_generator/templates/jmx/consumer.py deleted file mode 100644 index 86f81629e..000000000 --- a/buildpack/databroker/config_generator/templates/jmx/consumer.py +++ /dev/null @@ -1,209 +0,0 @@ -jmx_metrics = [ - { - "include": { - "bean_regex": "kafka\.consumer:type=consumer-metrics,client-id=.*", - "attribute": { - "time-between-poll-avg": { - "alias": "kafka.consumer.time-between-poll-avg" - }, - "time-between-poll-max": { - "alias": "kafka.consumer.time-between-poll-max" - }, - "last-poll-seconds-ago": { - "alias": "kafka.consumer.last-poll-seconds-ago" - }, - "poll-idle-ratio-avg": {"alias": "kafka.consumer.poll-idle-ratio-avg"}, - }, - } - }, - { - "include": { - "bean_regex": "kafka\.consumer:type=consumer-coordinator-metrics,client-id=.*", # noqa: C0301 - "attribute": { - "commit-latency-avg": { - "alias": "kafka.consumer.coordinator.commit-latency-avg" - }, - "commit-latency-max": { - "alias": "kafka.consumer.coordinator.commit-latency-max" - }, - "commit-rate": {"alias": "kafka.consumer.coordinator.commit-rate"}, - "commit-total": {"alias": "kafka.consumer.coordinator.commit-total"}, - "assigned-partitions": { - "alias": "kafka.consumer.coordinator.assigned-partitions" - }, - "heartbeat-response-time-max": { - "alias": "kafka.consumer.coordinator.heartbeat-response-time-max" - }, - "heartbeat-rate": { - "alias": "kafka.consumer.coordinator.heartbeat-rate" - }, - "heartbeat-total": { - "alias": "kafka.consumer.coordinator.heartbeat-total" - }, - "join-time-avg": {"alias": "kafka.consumer.coordinator.join-time-avg"}, - "join-time-max": {"alias": "kafka.consumer.coordinator.join-time-max"}, - "join-rate": {"alias": "kafka.consumer.coordinator.join-rate"}, - "join-total": {"alias": "kafka.consumer.coordinator.join-total"}, - "sync-time-avg": {"alias": "kafka.consumer.coordinator.sync-time-avg"}, - "sync-time-max": {"alias": "kafka.consumer.coordinator.sync-time-max"}, - "sync-rate": {"alias": "kafka.consumer.coordinator.sync-rate"}, - "sync-total": {"alias": "kafka.consumer.coordinator.sync-total"}, - "rebalance-latency-avg": { - "alias": "kafka.consumer.coordinator.rebalance-latency-avg" - }, - "rebalance-latency-max": { - "alias": "kafka.consumer.coordinator.rebalance-latency-max" - }, - "rebalance-latency-total": { - "alias": "kafka.consumer.coordinator.rebalance-latency-total" - }, - "rebalance-total": { - "alias": "kafka.consumer.coordinator.rebalance-total" - }, - "rebalance-rate-per-hour": { - "alias": "kafka.consumer.coordinator.rebalance-rate-per-hour" - }, - "failed-rebalance-total": { - "alias": "kafka.consumer.coordinator.failed-rebalance-total" - }, - "failed-rebalance-rate-per-hour": { - "alias": "kafka.consumer.coordinator.failed-rebalance-rate-per-hour" - }, - "last-rebalance-seconds-ago": { - "alias": "kafka.consumer.coordinator.last-rebalance-seconds-ago" - }, - "last-heartbeat-seconds-ago": { - "alias": "kafka.consumer.coordinator.last-heartbeat-seconds-ago" - }, - "partitions-revoked-latency-avg": { - "alias": "kafka.consumer.coordinator.partitions-revoked-latency-avg" - }, - "partitions-revoked-latency-max": { - "alias": "kafka.consumer.coordinator.partitions-revoked-latency-max" - }, - "partitions-assigned-latency-avg": { - "alias": "kafka.consumer.coordinator.partitions-assigned-latency-avg" # noqa: C0301 - }, - "partitions-assigned-latency-max": { - "alias": "kafka.consumer.coordinator.partitions-assigned-latency-max" # noqa: C0301 - }, - "partitions-lost-latency-avg": { - "alias": "kafka.consumer.coordinator.partitions-lost-latency-avg" - }, - "partitions-lost-latency-max": { - "alias": "kafka.consumer.coordinator.partitions-lost-latency-max" - }, - }, - } - }, - { - "include": { - "bean_regex": "kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*", # noqa: C0301 - "attribute": { - "bytes-consumed-rate": { - "alias": "kafka.consumer.fetch.manager.bytes-consumed-rate" - }, - "bytes-consumed-total": { - "alias": "kafka.consumer.fetch.manager.bytes-consumed-total" - }, - "fetch-latency-avg": { - "alias": "kafka.consumer.fetch.manager.fetch-latency-avg" - }, - "fetch-latency-max": { - "alias": "kafka.consumer.fetch.manager.fetch-latency-max" - }, - "fetch-rate": {"alias": "kafka.consumer.fetch.manager.fetch-rate"}, - "fetch-size-avg": { - "alias": "kafka.consumer.fetch.manager.fetch-size-avg" - }, - "fetch-size-max": { - "alias": "kafka.consumer.fetch.manager.fetch-size-max" - }, - "fetch-throttle-time-avg": { - "alias": "kafka.consumer.fetch.manager.fetch-throttle-time-avg" - }, - "fetch-throttle-time-max": { - "alias": "kafka.consumer.fetch.manager.fetch-throttle-time-max" - }, - "fetch-total": {"alias": "kafka.consumer.fetch.manager.fetch-total"}, - "records-consumed-rate": { - "alias": "kafka.consumer.fetch.manager.records-consumed-rate" - }, - "records-consumed-total": { - "alias": "kafka.consumer.fetch.manager.records-consumed-total" - }, - "records-lag-max": { - "alias": "kafka.consumer.fetch.manager.records-lag-max" - }, - "records-lead-min": { - "alias": "kafka.consumer.fetch.manager.records-lead-min" - }, - "records-per-request-avg": { - "alias": "kafka.consumer.fetch.manager.records-per-request-avg" - }, - }, - } - }, - { - "include": { - "bean_regex": "kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*,topic=.*", # noqa: C0301 - "attribute": { - "bytes-consumed-rate": { - "alias": "kafka.consumer.fetch.manager.bytes-consumed-rate" - }, - "bytes-consumed-total": { - "alias": "kafka.consumer.fetch.manager.bytes-consumed-total" - }, - "fetch-size-avg": { - "alias": "kafka.consumer.fetch.manager.fetch-size-avg" - }, - "fetch-size-max": { - "alias": "kafka.consumer.fetch.manager.fetch-size-max" - }, - "records-consumed-rate": { - "alias": "kafka.consumer.fetch.manager.records-consumed-rate" - }, - "records-consumed-total": { - "alias": "kafka.consumer.fetch.manager.records-consumed-total" - }, - "records-per-request-avg": { - "alias": "kafka.consumer.fetch.manager.records-per-request-avg" - }, - }, - } - }, - { - "include": { - "bean_regex": "kafka\.consumer:type=consumer-fetch-manager-metrics,partition=.*,topic=.*,client-id=.*", # noqa: C0301 - "attribute": { - "preferred-read-replica": { - "alias": "kafka.consumer.fetch.manager.preferred-read-replica" - }, - "records-lag": {"alias": "kafka.consumer.fetch.manager.records-lag"}, - "records-lag-avg": { - "alias": "kafka.consumer.fetch.manager.records-lag-avg" - }, - "records-lag-max": { - "alias": "kafka.consumer.fetch.manager.records-lag-max" - }, - "records-lead": {"alias": "kafka.consumer.fetch.manager.records-lead"}, - "records-lead-avg": { - "alias": "kafka.consumer.fetch.manager.records-lead-avg" - }, - "records-lead-min": { - "alias": "kafka.consumer.fetch.manager.records-lead-min" - }, - }, - } - }, - { - "include": { - "bean": "com.mendix:type=DataBroker", - "attribute": { - "EntitiesCreatedCount": {"alias": "com.mendix.EntitiesCreatedCount"}, - "EntitiesUpdatedCount": {"alias": "com.mendix.EntitiesUpdatedCount"}, - "EntitiesDeletedCount": {"alias": "com.mendix.EntitiesDeletedCount"}, - }, - } - }, -] diff --git a/buildpack/databroker/config_generator/templates/jmx/kafka-connect.yaml.j2 b/buildpack/databroker/config_generator/templates/jmx/kafka-connect.yaml.j2 deleted file mode 100644 index 3b10a9c56..000000000 --- a/buildpack/databroker/config_generator/templates/jmx/kafka-connect.yaml.j2 +++ /dev/null @@ -1,209 +0,0 @@ -init_config: - collect_default_metrics: true - is_jmx: true -instances: -- conf: - - include: - attribute: - connector-count: - alias: kafka.connect.worker.connector-count - connector-startup-attempts-total: - alias: kafka.connect.worker.connector-startup-attempts-total - connector-startup-failure-percentage: - alias: kafka.connect.worker.connector-startup-failure-percentage - connector-startup-failure-total: - alias: kafka.connect.worker.connector-startup-failure-total - connector-startup-success-percentage: - alias: kafka.connect.worker.connector-startup-success-percentage - connector-startup-success-total: - alias: kafka.connect.worker.connector-startup-success-total - task-count: - alias: kafka.connect.worker.task-count - task-startup-failure-percentage: - alias: kafka.connect.worker.task-startup-failure-percentage - task-startup-failure-total: - alias: kafka.connect.worker.task-startup-failure-total - task-startup-success-percentage: - alias: kafka.connect.worker.task-startup-success-percentage - task-startup-success-total: - alias: kafka.connect.worker.task-startup-success-total - bean: kafka.connect:type=connect-worker-metrics - - include: - attribute: - connector-destroyed-task-count: - alias: kafka.connect.worker.connector-destroyed-task-count - connector-failed-task-count: - alias: kafka.connect.worker.connector-failed-task-count - connector-paused-task-count: - alias: kafka.connect.worker.connector-paused-task-count - connector-running-task-count: - alias: kafka.connect.worker.connector-running-task-count - connector-total-task-count: - alias: kafka.connect.worker.connector-total-task-count - connector-unassigned-task-count: - alias: kafka.connect.worker.connector-unassigned-task-count - bean_regex: 'kafka\.connect:type=connect-worker-metrics,connector=.*' - - include: - attribute: - completed-rebalances-total: - alias: kafka.connect.worker.rebalance.completed-rebalances-total - connect-protocol: - alias: kafka.connect.worker.rebalance.connect-protocol - epoch: - alias: kafka.connect.worker.rebalance.epoch - leader-name: - alias: kafka.connect.worker.rebalance.leader-name - rebalance-avg-time-ms: - alias: kafka.connect.worker.rebalance.rebalance-avg-time-ms - rebalance-max-time-ms: - alias: kafka.connect.worker.rebalance.rebalance-max-time-ms - rebalancing: - alias: kafka.connect.worker.rebalance.rebalancing - time-since-last-rebalance-ms: - alias: kafka.connect.worker.rebalance.time-since-last-rebalance-ms - bean: kafka.connect:type=connect-worker-rebalance-metrics - - include: - attribute: - connector-class: - alias: kafka.connect.connector-class - connector-type: - alias: kafka.connect.connector-type - connector-version: - alias: kafka.connect.connector-version - status: - alias: kafka.connect.status - bean_regex: 'kafka\.connect:type=connector-metrics,connector=.*' - - include: - attribute: - batch-size-avg: - alias: kafka.connect.task.batch-size-avg - batch-size-max: - alias: kafka.connect.task.batch-size-max - offset-commit-avg-time-ms: - alias: kafka.connect.task.offset-commit-avg-time-ms - offset-commit-failure-percentage: - alias: kafka.connect.task.offset-commit-failure-percentage - offset-commit-max-time-ms: - alias: kafka.connect.task.offset-commit-max-time-ms - offset-commit-success-percentage: - alias: kafka.connect.task.offset-commit-success-percentage - pause-ratio: - alias: kafka.connect.task.pause-ratio - running-ratio: - alias: kafka.connect.task.running-ratio - status: - alias: kafka.connect.task.status - tags: - connector: $1 - task: $2 - bean_regex: 'kafka\.connect:type=connector-task-metrics,connector=(.*?),task=(.*?)(?:,|$)' - - include: - attribute: - poll-batch-avg-time-ms: - alias: kafka.connect.source.task.poll-batch-avg-time-ms - poll-batch-max-time-ms: - alias: kafka.connect.source.task.poll-batch-max-time-ms - source-record-active-count: - alias: kafka.connect.source.task.source-record-active-count - source-record-active-count-avg: - alias: kafka.connect.source.task.source-record-active-count-avg - source-record-active-count-max: - alias: kafka.connect.source.task.source-record-active-count-max - source-record-poll-rate: - alias: kafka.connect.source.task.source-record-poll-rate - source-record-poll-total: - alias: kafka.connect.source.task.source-record-poll-total - source-record-write-rate: - alias: kafka.connect.source.task.source-record-write-rate - source-record-write-total: - alias: kafka.connect.source.task.source-record-write-total - tags: - connector: $1 - task: $2 - bean_regex: 'kafka\.connect:type=source-task-metrics,connector=(.*?),task=(.*?)(?:,|$)' - - include: - attribute: - deadletterqueue-produce-failures: - alias: kafka.connect.task.error.deadletterqueue-produce-failures - deadletterqueue-produce-requests: - alias: kafka.connect.task.error.deadletterqueue-produce-requests - last-error-timestamp: - alias: kafka.connect.task.error.last-error-timestamp - total-errors-logged: - alias: kafka.connect.task.error.total-errors-logged - total-record-errors: - alias: kafka.connect.task.error.total-record-errors - total-record-failures: - alias: kafka.connect.task.error.total-record-failures - total-records-skipped: - alias: kafka.connect.task.error.total-records-skipped - total-retries: - alias: kafka.connect.task.error.total-retries - tags: - connector: $1 - task: $2 - bean_regex: 'kafka\.connect:type=task-error-metrics,connector=(.*?),task=(.*?)(?:,|$)' - - include: - attribute: - LastEvent: - alias: debezium.postgres.snapshot.last-event - MilliSecondsSinceLastEvent: - alias: debezium.postgres.snapshot.milliseconds-since-last-event - MonitoredTables: - alias: debezium.postgres.snapshot.monitored-tables - NumberOfEventsFiltered: - alias: debezium.postgres.snapshot.number-of-events-filtered - QueueRemainingCapacity: - alias: debezium.postgres.snapshot.queue-remaining-capacity - QueueTotalCapacity: - alias: debezium.postgres.snapshot.queue-total-capacity - RemainingTableCount: - alias: debezium.postgres.snapshot.remaining-table-count - RowsScanned: - alias: debezium.postgres.snapshot.rows-scanned - SnapshotAborted: - alias: debezium.postgres.snapshot.snapshot-aborted - SnapshotCompleted: - alias: debezium.postgres.snapshot.snapshot-completed - SnapshotDurationInSeconds: - alias: debezium.postgres.snapshot.snapshot-duration-in-seconds - SnapshotRunning: - alias: debezium.postgres.snapshot.snapshot-running - TotalNumberOfEventsSeen: - alias: debezium.postgres.snapshot.total-number-of-events-seen - TotalTableCount: - alias: debezium.postgres.snapshot.total-table-count - bean_regex: 'debezium\.postgres:type=connector-metrics,context=snapshot,server=.*' - - include: - attribute: - Connected: - alias: debezium.postgres.streaming.connected - LastEvent: - alias: debezium.postgres.streaming.last-event - LastTransactionId: - alias: debezium.postgres.streaming.last-transaction-id - MilliSecondsBehindSource: - alias: debezium.postgres.streaming.milliseconds-behind-source - MilliSecondsSinceLastEvent: - alias: debezium.postgres.streaming.milliseconds-since-last-event - MonitoredTables: - alias: debezium.postgres.streaming.monitored-tables - NumberOfCommittedTransactions: - alias: debezium.postgres.streaming.number-of-committed-transactions - NumberOfEventsFiltered: - alias: debezium.postgres.streaming.number-of-events-filtered - QueueRemainingCapacity: - alias: debezium.postgres.streaming.queue-remaining-capacity - QueueTotalCapacity: - alias: debezium.postgres.streaming.queue-total-capacity - SourceEventPosition: - alias: debezium.postgres.streaming.source-event-position - TotalNumberOfEventsSeen: - alias: debezium.postgres.streaming.total-number-of-events-seen - bean_regex: 'debezium\.postgres:type=connector-metrics,context=streaming,server=.*' - host: localhost - port: 11003 - jvm_direct: true - java_bin_path: {{ java_bin_path }} - java_options: -Xmx50m -Xms15m - refresh_beans: 120 diff --git a/buildpack/databroker/config_generator/templates/jmx/kafka-streams.yaml.j2 b/buildpack/databroker/config_generator/templates/jmx/kafka-streams.yaml.j2 deleted file mode 100644 index 7b096437c..000000000 --- a/buildpack/databroker/config_generator/templates/jmx/kafka-streams.yaml.j2 +++ /dev/null @@ -1,110 +0,0 @@ -init_config: - collect_default_metrics: true - is_jmx: true -instances: -- conf: - - include: - attribute: - version: - alias: kafka.streams.version - bean_regex: 'kafka\.streams:type=stream-metrics,client-id=.*' - - include: - attribute: - commit-latency-avg: - alias: kafka.streams.thread.commit-latency-avg - commit-latency-max: - alias: kafka.streams.thread.commit-latency-max - commit-rate: - alias: kafka.streams.thread.commit-rate - commit-total: - alias: kafka.streams.thread.commit-total - poll-latency-avg: - alias: kafka.streams.thread.poll-latency-avg - poll-latency-max: - alias: kafka.streams.thread.poll-latency-max - poll-rate: - alias: kafka.streams.thread.poll-rate - poll-total: - alias: kafka.streams.thread.poll-total - process-latency-avg: - alias: kafka.streams.thread.process-latency-avg - process-latency-max: - alias: kafka.streams.thread.process-latency-max - process-rate: - alias: kafka.streams.thread.process-rate - process-total: - alias: kafka.streams.thread.process-total - punctuate-latency-avg: - alias: kafka.streams.thread.punctuate-latency-avg - punctuate-latency-max: - alias: kafka.streams.thread.punctuate-latency-max - punctuate-rate: - alias: kafka.streams.thread.punctuate-rate - punctuate-total: - alias: kafka.streams.thread.punctuate-total - task-closed-rate: - alias: kafka.streams.thread.task-closed-rate - task-closed-total: - alias: kafka.streams.thread.task-closed-total - task-created-rate: - alias: kafka.streams.thread.task-created-rate - task-created-total: - alias: kafka.streams.thread.task-created-total - tags: - thread-id: $1 - bean_regex: 'kafka\.streams:type=stream-thread-metrics,thread-id=(.*?)(?:,|$)' - - include: - attribute: - commit-latency-avg: - alias: kafka.streams.task.commit-latency-avg - commit-latency-max: - alias: kafka.streams.task.commit-latency-max - commit-rate: - alias: kafka.streams.task.commit-rate - commit-total: - alias: kafka.streams.task.commit-total - dropped-records-rate: - alias: kafka.streams.task.dropped-records-rate - dropped-records-total: - alias: kafka.streams.task.dropped-records-total - enforced-processing-rate: - alias: kafka.streams.task.enforced-processing-rate - enforced-processing-total: - alias: kafka.streams.task.enforced-processing-total - process-latency-avg: - alias: kafka.streams.task.process-latency-avg - process-latency-max: - alias: kafka.streams.task.process-latency-max - process-rate: - alias: kafka.streams.task.process-rate - process-total: - alias: kafka.streams.task.process-total - record-lateness-avg: - alias: kafka.streams.task.record-lateness-avg - record-lateness-max: - alias: kafka.streams.task.record-lateness-max - tags: - thread-id: $1 - task: $2 - bean_regex: 'kafka\.streams:type=stream-task-metrics,thread-id=(.*?),task-id=(.*?)(?:,|$)' - - include: - attribute: - process-rate: - alias: kafka.streams.processor.process-rate - process-total: - alias: kafka.streams.processor.process-total - suppression-emit-rate: - alias: kafka.streams.processor.suppression-emit-rate - suppression-emit-total: - alias: kafka.streams.processor.suppression-emit-total - tags: - thread-id: $1 - task: $2 - processor-node-id: $3 - bean_regex: 'kafka\.streams:type=stream-processor-node-metrics,thread-id=(.*?),task-id=(.*?),processor-node-id=(.*?)(?:,|$)' - host: localhost - port: 11004 - jvm_direct: true - java_bin_path: {{ java_bin_path }} - java_options: -Xmx50m -Xms15m - refresh_beans: 120 diff --git a/buildpack/databroker/config_generator/templates/kafka-connect.properties.j2 b/buildpack/databroker/config_generator/templates/kafka-connect.properties.j2 deleted file mode 100644 index f20f6cad4..000000000 --- a/buildpack/databroker/config_generator/templates/kafka-connect.properties.j2 +++ /dev/null @@ -1,16 +0,0 @@ -bootstrap.servers={{ bootstrap_servers }} -group.id=mx-databroker-consumer -key.converter.schemas.enable=true -value.converter.schemas.enable=true -key.converter=org.apache.kafka.connect.json.JsonConverter -value.converter=org.apache.kafka.connect.json.JsonConverter -config.storage.topic=mx-databroker-connect-configs -config.storage.replication.factor={{ NodeCount }} -offset.storage.topic=mx-databroker-connect-offsets -offset.storage.replication.factor={{ NodeCount }} -offset.storage.file.filename=/tmp/connect.offsets -status.storage.replication.factor={{ NodeCount }} -status.storage.topic=mx-databroker-connect-status -wait.interval.time=10 -plugin.path=/home/vcap/app/.local/databroker/debezium -rest.port=8083 diff --git a/buildpack/databroker/config_generator/templates/logging/debezium-log4j.properties b/buildpack/databroker/config_generator/templates/logging/debezium-log4j.properties deleted file mode 100644 index 655c3997d..000000000 --- a/buildpack/databroker/config_generator/templates/logging/debezium-log4j.properties +++ /dev/null @@ -1,13 +0,0 @@ -log4j.rootLogger={{ log_level }}, stdout - -# Disable excessive reflection warnings - KAFKA-5229 -log4j.logger.org.reflections=ERROR - -# Avoid excesive ZooKeeper logging -log4j.logger.org.apache.zookeeper=ERROR -log4j.logger.org.I0Itec.zkclient=ERROR -log4j.logger.org.reflections=ERROR - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/buildpack/databroker/config_generator/templates/streaming_producer.json.j2 b/buildpack/databroker/config_generator/templates/streaming_producer.json.j2 deleted file mode 100644 index 7cd840e55..000000000 --- a/buildpack/databroker/config_generator/templates/streaming_producer.json.j2 +++ /dev/null @@ -1,14 +0,0 @@ -{ - "name": "{{ entity.publicEntityName }} topology", - "source": "{{ entity.rawTopic }}", - "sink": "{{ entity.topicName }}", - "filter": "", - "join": "", - "originalEntityName": "{{ entity.originalEntityName }}", - "publicEntityName": "{{ entity.publicEntityName }}", - "attributeMapping": { - {%- for attributeKey, attributeValue in entity.attributeMapping.items() %} - "{{ attributeKey }}": "{{ attributeValue }}"{% if not loop.last %},{% endif %} - {%- endfor %} - } -} diff --git a/buildpack/databroker/connect.py b/buildpack/databroker/connect.py deleted file mode 100644 index 94711a59d..000000000 --- a/buildpack/databroker/connect.py +++ /dev/null @@ -1,146 +0,0 @@ -""" -[EXPERIMENTAL] - -Add Debezium to an app container to collect data from the DB for the Data Broker. -""" - -import os -import time -import logging -import json - -import backoff -import requests - -from buildpack import util -from buildpack.databroker.process_supervisor import DataBrokerProcess -from buildpack.databroker.config_generator.scripts.generators import ( - debezium as debezium_generator, - kafka_connect as connect_generator, - loggers as loggers_generator, -) -from buildpack.databroker.config_generator.scripts.utils import write_file - -# Compile constants -NAMESPACE = BASE_DIR = "databroker" -DBZ_DIR = "debezium" -PROCESS_NAME = "kafka-connect" -KAFKA_CONNECT_DIR = PROCESS_NAME -DBZ_CFG_NAME = "debezium-connector.json" -KAFKA_CONNECT_CFG_NAME = "connect.properties" -LOG4J_DEBEZIUM_CFG_NAME = "debezium-log4j.properties" - -# Run constants -LOCAL = ".local" -KAFKA_CONNECT_START_PATH = os.path.join( - LOCAL, BASE_DIR, KAFKA_CONNECT_DIR, "bin", "connect-distributed.sh" -) -KAFKA_CONNECT_CFG_PATH = os.path.join( - LOCAL, BASE_DIR, KAFKA_CONNECT_DIR, KAFKA_CONNECT_CFG_NAME -) -LOG4J_CFG_PATH = os.path.join( - LOCAL, BASE_DIR, KAFKA_CONNECT_DIR, LOG4J_DEBEZIUM_CFG_NAME -) -DBZ_HOME_DIR = os.path.join(LOCAL, BASE_DIR, DBZ_DIR) -CONNECT_URL = "http://localhost:8083/connectors" -INITIAL_WAIT = 15 -MAX_RETRIES = 8 -BACKOFF_TIME = 5 -KAFKA_CONNECT_JMX_PORT = "11003" - - -def _download_pkgs(buildpack_dir, install_path, cache_dir): - # Download kafka connect and debezium - util.resolve_dependency( - f"{NAMESPACE}.kafka-connect", - os.path.join(install_path, BASE_DIR, KAFKA_CONNECT_DIR), - buildpack_dir=buildpack_dir, - cache_dir=cache_dir, - ) - overrides = {} - version = os.getenv("DATABROKER_DBZ_VERSION") - if version: - overrides = {"version": version} - util.resolve_dependency( - f"{NAMESPACE}.debezium", - os.path.join(install_path, BASE_DIR, DBZ_DIR), - buildpack_dir=buildpack_dir, - cache_dir=cache_dir, - overrides=overrides, - ) - - -def stage(buildpack_dir, install_path, cache_dir): - _download_pkgs(buildpack_dir, install_path, cache_dir) - - -def setup_configs(complete_conf): - connect_config = connect_generator.generate_config(complete_conf) - write_file(KAFKA_CONNECT_CFG_PATH, connect_config) - - connect_logging = loggers_generator.generate_kafka_connect_logging_config( - complete_conf - ) - write_file(LOG4J_CFG_PATH, connect_logging) - - -def run(complete_conf): - setup_configs(complete_conf) - java_path = os.path.join(os.getcwd(), LOCAL, "bin") - os.environ["PATH"] += os.pathsep + java_path - os.environ["JMX_PORT"] = KAFKA_CONNECT_JMX_PORT - os.environ["KAFKA_LOG4J_OPTS"] = "-Dlog4j.configuration=file:" + os.path.join( - os.getcwd(), LOG4J_CFG_PATH - ) - kafka_connect_heap_opts = os.environ.get( - "DATABROKER_KAFKA_CONNECT_HEAP_OPTS", "-Xms512M -Xmx2G" - ) - os.environ["KAFKA_HEAP_OPTS"] = kafka_connect_heap_opts - env = dict(os.environ) - - kafka_connect_process = DataBrokerProcess( - PROCESS_NAME, - (KAFKA_CONNECT_START_PATH, KAFKA_CONNECT_CFG_PATH), - env, - ) - - # Wait for kafka connect to initialize - # and then issue a request for debezium connector - time.sleep(INITIAL_WAIT) - debezium_config = json.loads(debezium_generator.generate_config(complete_conf)) - - def backoff_hdlr(details): - logging.warning( - "Databroker: Failed to receive successful response from connect. " - "Retrying...(%d/%d)", - details["tries"], - MAX_RETRIES, - ) - - def giveup_hdlr(): - logging.error("Databroker: Kafka Connect wait retries exhaused") - raise Exception("Databroker: Kafka Connect failed to start") - - @backoff.on_predicate( - backoff.constant, - interval=BACKOFF_TIME, - max_tries=MAX_RETRIES, - on_backoff=backoff_hdlr, - on_giveup=giveup_hdlr, - ) - @backoff.on_exception( - backoff.constant, - Exception, - interval=BACKOFF_TIME, - max_tries=MAX_RETRIES, - on_backoff=backoff_hdlr, - on_giveup=giveup_hdlr, - ) - def start_debezium_connector(): - return requests.put( - f"{CONNECT_URL}/{debezium_config['name']}/config", - json=debezium_config["config"], - ) - - start_debezium_connector() - return kafka_connect_process diff --git a/buildpack/databroker/process_supervisor.py b/buildpack/databroker/process_supervisor.py deleted file mode 100644 index c4fa804dd..000000000 --- a/buildpack/databroker/process_supervisor.py +++ /dev/null @@ -1,52 +0,0 @@ -import subprocess -import logging - - -class DataBrokerProcess: - def __init__(self, name, cmd, env): - self.name = name - self.cmd = cmd - self.env = env - self.__start() - - def __start(self): - logging.debug("Starting process %s", self.name) - self.__process_handle = subprocess.Popen(self.cmd, env=self.env) - logging.debug( - "Started process %s via command %s. Process Id: %d", - self.name, - self.cmd, - self.__process_handle.pid, - ) - - def stop(self, timeout=30): - logging.debug("Stopping process %s", self.name) - try: - self.__process_handle.terminate() - self.__process_handle.wait(timeout=timeout) - except ProcessLookupError: - logging.debug("%s is already terminated", self.name) - except subprocess.TimeoutExpired: - logging.warning( - "Timed out while waiting for process %s to terminate. Initiating kill", - self.name, - ) - self.__process_handle.kill() - except ProcessLookupError: - logging.debug("%s is already terminated", self.name) - except Exception as ex: - logging.error("Stop failed for %s process due to error %s", self.name, ex) - - def kill(self): - logging.debug("Killing process %s", self.name) - self.__process_handle.kill() - - def is_alive(self): - if self.__process_handle.poll() is None: - return True - return False - - def restart(self): - if self.is_alive(): - self.kill() - self.__start() diff --git a/buildpack/databroker/streams.py b/buildpack/databroker/streams.py deleted file mode 100644 index be8674619..000000000 --- a/buildpack/databroker/streams.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -[EXPERIMENTAL] - -Add streams to an app container to collect and optionally filter data -""" -import os -import logging - -from buildpack import util -from buildpack.databroker.process_supervisor import DataBrokerProcess -from buildpack.databroker.config_generator.scripts.generators import ( - stream as stream_generator, - azkarra as azkarra_generator, -) -from buildpack.databroker.config_generator.scripts.utils import write_file - -# Constants -NAMESPACE = BASE_DIR = "databroker" -DEPENDENCY = f"{NAMESPACE}.stream-sidecar" -AZKARRA_TPLY_CONF_NAME = "topology.conf" -PDR_STREAMS_FILENAME = "stream-sidecar" -PDR_STREAMS_DIR = os.path.join(BASE_DIR, "producer-streams") -PROCESS_NAME = "kafka-streams" -KAFKA_STREAMS_JMX_PORT = "11004" -LOCAL = ".local" -LOG_LEVEL = "DEBUG" if util.get_buildpack_loglevel() == logging.DEBUG else "INFO" - - -def get_pdr_stream_version(): - streams_version = os.getenv("DATABROKER_STREAMS_VERSION") - if not streams_version: - streams_version = util.get_dependency(DEPENDENCY)["version"] - return streams_version - - -def _get_pdr_streams_home(version): - return os.path.join(PDR_STREAMS_DIR, f"{PDR_STREAMS_FILENAME}-{version}") - - -def _get_azkarra_conf_path(version): - return os.path.join( - os.getcwd(), LOCAL, _get_pdr_streams_home(version), "azkarra.conf" - ) - - -def _get_pdr_streams_jar(version): - return os.path.join( - os.getcwd(), - LOCAL, - _get_pdr_streams_home(version), - "lib", - f"{PDR_STREAMS_FILENAME}-{get_pdr_stream_version()}.jar", - ) - - -def _download_pkgs(buildpack_dir, install_path, cache_dir): - # Download producer streams artifact - overrides = {} - version = os.getenv("DATABROKER_STREAMS_VERSION") - if version: - overrides = {"version": version} - util.resolve_dependency( - DEPENDENCY, - os.path.join(install_path, PDR_STREAMS_DIR), - buildpack_dir=buildpack_dir, - cache_dir=cache_dir, - overrides=overrides, - ) - - -def stage(buildpack_dir, install_path, cache_dir): - _download_pkgs(buildpack_dir, install_path, cache_dir) - - -def setup_configs(complete_conf, version): - TPLY_CONF_PATH = os.path.join( - os.getcwd(), - LOCAL, - _get_pdr_streams_home(version), - AZKARRA_TPLY_CONF_NAME, - ) - topologies_config = stream_generator.generate_config(complete_conf) - write_file(TPLY_CONF_PATH, topologies_config) - os.environ["TOPOLOGY_CONFIGURATION_PATH"] = TPLY_CONF_PATH - - azkarra_config = azkarra_generator.generate_config(complete_conf) - write_file(_get_azkarra_conf_path(version), azkarra_config) - - -def run(complete_conf): - version = get_pdr_stream_version() - setup_configs(complete_conf, version) - java_path = os.path.join(os.getcwd(), LOCAL, "bin") - os.environ["PATH"] += os.pathsep + java_path - os.environ["JMX_PORT"] = KAFKA_STREAMS_JMX_PORT - os.environ["LOG_LEVEL"] = LOG_LEVEL - env = dict(os.environ) - - kafka_streams_process = DataBrokerProcess( - PROCESS_NAME, - ( - "java", - "-Dconfig.file=" + _get_azkarra_conf_path(version), - "-Dcom.sun.management.jmxremote", - "-Dcom.sun.management.jmxremote.authenticate=false", - "-Dcom.sun.management.jmxremote.ssl=false", - "-Dcom.sun.management.jmxremote.port=" + KAFKA_STREAMS_JMX_PORT, - "-jar", - _get_pdr_streams_jar(version), - ), - env, - ) - return kafka_streams_process diff --git a/buildpack/stage.py b/buildpack/stage.py index 47d4f2d79..d1ce2f442 100755 --- a/buildpack/stage.py +++ b/buildpack/stage.py @@ -4,7 +4,7 @@ import shutil import sys -from buildpack import databroker, util +from buildpack import util from buildpack.core import java, mxbuild, nginx, runtime from buildpack.infrastructure import database from buildpack.telemetry import ( @@ -210,7 +210,6 @@ def cleanup_dependency_cache(cached_dir, dependency_list): database.stage(BUILDPACK_DIR, BUILD_DIR) runtime.stage(BUILDPACK_DIR, BUILD_DIR, CACHE_DIR) logs.stage(BUILDPACK_DIR, BUILD_DIR, CACHE_DIR) - databroker.stage(BUILDPACK_DIR, DOT_LOCAL_LOCATION, CACHE_DIR) nginx.stage(BUILDPACK_DIR, BUILD_DIR, CACHE_DIR) logging.info("Mendix Cloud Foundry Buildpack staging completed") diff --git a/buildpack/start.py b/buildpack/start.py index 03621c775..9330990bf 100755 --- a/buildpack/start.py +++ b/buildpack/start.py @@ -7,7 +7,7 @@ import traceback from http.server import BaseHTTPRequestHandler, HTTPServer -from buildpack import databroker, util +from buildpack import util from buildpack.databroker import business_events from buildpack.core import java, nginx, runtime from buildpack.infrastructure import database, storage @@ -125,7 +125,6 @@ def _register_signal_handlers(): if __name__ == "__main__": m2ee = None nginx_process = None - databroker_processes = databroker.Databroker() _register_signal_handlers() @@ -169,21 +168,13 @@ def _register_signal_handlers(): fluentbit.update_config(m2ee) mx_java_agent.update_config(m2ee) telegraf.update_config(m2ee, application_name) - ( - databroker_jmx_instance_cfg, - databroker_jmx_config_files, - ) = databroker_processes.get_datadog_config(datadog._get_user_checks_dir()) datadog.update_config( m2ee, model_version=model_version, runtime_version=runtime_version, - extra_jmx_instance_config=databroker_jmx_instance_cfg, - jmx_config_files=databroker_jmx_config_files, ) nginx.update_config() - logging.debug(dir(databroker)) logging.debug(dir(business_events)) - databroker.update_config(m2ee) business_events.update_config(m2ee, util.get_vcap_services_data()) # Start components and runtime @@ -197,11 +188,9 @@ def _register_signal_handlers(): nginx.run() # Block of code where the order is important - # Wait for the Runtime to be ready before starting Databroker and User-metering Sidecar to not block the Runtime from start + # Wait for the Runtime to be ready before starting User-metering Sidecar to not block the Runtime from start runtime.await_database_ready(m2ee) metering.run() - if databroker.is_enabled(): - databroker_processes.run(database.get_config()) # End of the block where order is important except RuntimeError as re: diff --git a/dependencies.yml b/dependencies.yml index c586a7839..48a6ce8bf 100644 --- a/dependencies.yml +++ b/dependencies.yml @@ -7,16 +7,6 @@ dependencies: machine-agent: artifact: appdynamics/machineagent-bundle-64bit-linux-{{ version }}.zip version: 24.12.0.4485 - databroker: - debezium: - artifact: experimental/databroker/debezium-{{ version }}.tar.gz - version: 1.2.0 - kafka-connect: - artifact: experimental/databroker/kafka-connect-{{ version }}.tar.gz - version: 2.13-2.5.1-v2 - stream-sidecar: - artifact: experimental/databroker/stream-sidecar-{{ version }}.tar - version: 0.23.0-9 datadog: buildpack: alias: cf-datadog-sidecar diff --git a/pyproject.toml b/pyproject.toml index a9a12f25e..f187a3179 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,12 +4,6 @@ src = ["lib"] # Same as Black. line-length = 88 -# These files contain long lines in testdata that trigger the linter -exclude = [ - "tests/unit/test_databroker_business_events.py", - "tests/unit/test_databroker.py" -] - [tool.ruff.lint.mccabe] # Unlike Flake8, default to a complexity level of 10. max-complexity = 10 diff --git a/tests/integration/test_databroker.py b/tests/integration/test_databroker.py deleted file mode 100644 index 5c79caf39..000000000 --- a/tests/integration/test_databroker.py +++ /dev/null @@ -1,158 +0,0 @@ -import socket -import re - -import backoff - -from . import basetest -from .runner import CfLocalRunnerWithPostgreSQL - -# Constants -KAFKA_CLUSTER_IMAGE_NAME = "johnnypark/kafka-zookeeper" -KAFKA_CLUSTER_IMAGE_VERSION = "2.4.0" -KAFKA_CLUSTER_NAME = "kafka-cluster" -KAFKA_CONNECT_URL = "http://localhost:8083" -KAFKA_PG_CONNECTOR_NAME = "mx-databroker-PostgreSQL-source-connector" -KAFKA_PG_CONNECTOR_STATUS_API = "{}/connectors/{}/status".format( - KAFKA_CONNECT_URL, - KAFKA_PG_CONNECTOR_NAME, -) -KAFKA_BROKER_PORT = 9092 -KAFKA_ZOOKEEPER_PORT = 2181 -DATABROKER_TOPIC_FORMAT_VERSION = "1_0_0" -POSTGRES_DB_DOCKER_IMAGE = "debezium/postgres" -POSTGRES_DB_VERSION = "9.6-alpine" -MAX_RETRY_COUNT = 8 -BACKOFF_TIME = 10 - - -class CfLocalRunnerWithKafka(CfLocalRunnerWithPostgreSQL): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self._database_postgres_image = POSTGRES_DB_DOCKER_IMAGE - self._database_postgres_version = POSTGRES_DB_VERSION - - self._kafka_container_name = "{}-{}".format(self._app_name, KAFKA_CLUSTER_NAME) - - def _get_environment(self, env_vars): - environment = super()._get_environment(env_vars) - - environment.update( - { - "MX_MyFirstModule_broker_url": "{}:{}".format( - self.get_host(), - KAFKA_BROKER_PORT, - ) - } - ) - - return environment - - def _start_kafka_cluster(self): - result = self._cmd( - ( - "docker", - "run", - "--name", - self._kafka_container_name, - "-p", - "{}:{}".format(KAFKA_BROKER_PORT, KAFKA_BROKER_PORT), - "-e", - "ADVERTISED_HOST={}".format(self._host), - "-e", - "NUM_PARTITIONS={}".format(3), - "-d", - "{}:{}".format( - KAFKA_CLUSTER_IMAGE_NAME, - KAFKA_CLUSTER_IMAGE_VERSION, - ), - ) - ) - - if not result[1]: - raise RuntimeError( - "Cannot create {} container: {}".format( - KAFKA_CLUSTER_NAME, - result[0], - ) - ) - - def stage(self, *args, **kwargs): - result = super().stage(*args, **kwargs) - - self._start_kafka_cluster() - - @backoff.on_predicate(backoff.expo, lambda x: x > 0, max_time=30) - def _await_kafka_cluster(): - return socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect_ex( - ("localhost", KAFKA_BROKER_PORT) - ) - - _await_kafka_cluster() - - return result - - def is_debezium_running(self): - return self.run_on_container("curl " + KAFKA_PG_CONNECTOR_STATUS_API) - - def is_azkarra_running(self): - topics = self.run_on_container( - f"./opt/kafka_2.12-{KAFKA_CLUSTER_IMAGE_VERSION}/bin/kafka-topics.sh " - f"--list --zookeeper localhost:{KAFKA_ZOOKEEPER_PORT}", - target_container=self._kafka_container_name, - ) - - expect_public_topic_pattern = rf".*?\.{DATABROKER_TOPIC_FORMAT_VERSION}" - - return ( - len( - re.findall( - r"(mx-databroker-connect-(?:configs|offsets|status))", - topics, - ) - ) - == 3 - and len(re.findall(expect_public_topic_pattern, topics)) > 0 - ) - - -class TestCaseDataBroker(basetest.BaseTestWithPostgreSQL): - def _init_cflocal_runner(self, *args, **kwargs): - return CfLocalRunnerWithKafka(*args, **kwargs) - - def test_databroker_running(self): - # os.environ[ - # "PACKAGE_URL" - # ] = "https://dghq119eo3niv.cloudfront.net/test-app/MyProducer902.mda" - self.stage_container( - package="https://dghq119eo3niv.cloudfront.net/test-app/MyProducer902.mda", - env_vars={ - "DATABROKER_ENABLED": "true", - "FORCED_MXRUNTIME_URL": "https://dghq119eo3niv.cloudfront.net/", - }, - ) - - self.start_container() - - # check app is running - self.assert_app_running() - - @backoff.on_exception( - backoff.constant, - Exception, - interval=BACKOFF_TIME, - max_tries=MAX_RETRY_COUNT, - ) - def check_if_dbz_running(): - return self._runner.is_debezium_running() - - response = check_if_dbz_running() - assert str(response).find('"state":"RUNNING"') > 0 - - # check azkarra is running by verify expected topics have been created - assert self._runner.is_azkarra_running() - - # check streaming service - output = self.get_recent_logs() - assert output is not None - assert str(output).find("State transition from REBALANCING to RUNNING") >= 0 diff --git a/tests/unit/test_custom_headers.py b/tests/unit/test_custom_headers.py index 98ff47bdb..13555fa79 100644 --- a/tests/unit/test_custom_headers.py +++ b/tests/unit/test_custom_headers.py @@ -192,3 +192,91 @@ def test_inValid_header_originTrial(self): ) header_config = nginx._get_http_headers() self.assertEqual([], header_config) + + def test_valid_header_cross_origin_resource_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Resource-Policy": "same-origin" + } + ) + header_config = nginx._get_http_headers() + self.assertIn( + ("Cross-Origin-Resource-Policy", + "same-origin", + ), + header_config, + ) + def test_invalid_header_cross_origin_resource_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Resource-Policy": "#####" + } + ) + header_config = nginx._get_http_headers() + self.assertEqual([], header_config) + + def test_valid_header_cross_origin_opener_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Opener-Policy": "same-origin" + } + ) + header_config = nginx._get_http_headers() + self.assertIn( + ("Cross-Origin-Opener-Policy", + "same-origin", + ), + header_config, + ) + def test_invalid_header_cross_origin_opener_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Opener-Policy": "&^%$#" + } + ) + header_config = nginx._get_http_headers() + self.assertEqual([], header_config) + + def test_valid_header_cross_origin_embedder_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Embedder-Policy": "require-corp" + } + ) + header_config = nginx._get_http_headers() + self.assertIn( + ("Cross-Origin-Embedder-Policy", + "require-corp", + ), + header_config, + ) + def test_invalid_header_cross_origin_embedder_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Cross-Origin-Embedder-Policy": "&^as%$#" + } + ) + header_config = nginx._get_http_headers() + self.assertEqual([], header_config) + + def test_valid_header_clear_site_data_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Clear-Site-Data": "executionContexts" + } + ) + header_config = nginx._get_http_headers() + self.assertIn( + ("Clear-Site-Data", + "executionContexts", + ), + header_config, + ) + def test_invalid_header_clear_site_data_policy(self): + os.environ["HTTP_RESPONSE_HEADERS"] = json.dumps( + { + "Clear-Site-Data": "&^as%$#" + } + ) + header_config = nginx._get_http_headers() + self.assertEqual([], header_config) diff --git a/tests/unit/test_databroker.py b/tests/unit/test_databroker.py deleted file mode 100644 index 2813f9543..000000000 --- a/tests/unit/test_databroker.py +++ /dev/null @@ -1,380 +0,0 @@ -import json -import os -import unittest -from pathlib import Path - -from buildpack.databroker import connect, streams -from buildpack.databroker.config_generator.scripts.configloader import ( - configinitializer, -) -from buildpack.databroker.config_generator.scripts.generators import ( - debezium as debezium_generator, -) -from buildpack.databroker.config_generator.scripts.utils import write_file -from buildpack.util import get_dependency - -# Constants -TEST_METADATA_FILE_PATH = "/tmp/metadata.json" -TEST_DEPENDENCIES_FILE_PATH = "/tmp/dependencies.json" -TEST_BROKER_URL = "localhost:9092" - -LOCAL_DATABROKER_FOLDER = "{}/.local/databroker".format(os.getcwd()) -KAFKA_CONNECT_DIR = "{}/kafka-connect".format(LOCAL_DATABROKER_FOLDER) -KAFKA_CONNECT_CFG_NAME = "connect.properties" -KAFKA_CONNECT_CFG_PATH = "{}/{}".format(KAFKA_CONNECT_DIR, KAFKA_CONNECT_CFG_NAME) -LOG4J_DEBEZIUM_CFG_PATH = "{}/{}".format(KAFKA_CONNECT_DIR, "debezium-log4j.properties") - -STREAM_SIDECAR_DIR = "{}/producer-streams/stream-sidecar-{}".format( - LOCAL_DATABROKER_FOLDER, streams.get_pdr_stream_version() -) -STREAM_TOPOLOGY_CFG_NAME = "topology.conf" -STREAM_TOPOLOGY_CFG_PATH = "{}/{}".format(STREAM_SIDECAR_DIR, STREAM_TOPOLOGY_CFG_NAME) -STREAM_AZKARRA_CFG_NAME = "azkarra.conf" -STREAM_AZKARRA_CFG_PATH = "{}/{}".format(STREAM_SIDECAR_DIR, STREAM_AZKARRA_CFG_NAME) - - -class TestDataBrokerConfigs(unittest.TestCase): - complete_producer_conf = None - - # define metadata config - metadata_config = """ -{ - "Constants": [ - { - "Name": "MyFirstModule.Kafka_broker_url", - "Type": "String", - "Description": "", - "DefaultValue": "localhost:9092" - }, - { - "Name": "Atlas_UI_Resources.Atlas_UI_Resources_Version", - "Type": "String", - "Description": "", - "DefaultValue": " 2.5.4" - } - ], - "ScheduledEvents": [], - "DataBrokerConfiguration": { - "publishedServices": [ - { - "brokerUrl": "MyFirstModule.Kafka_broker_url", - "entities": [ - { - "originalEntityName": "MyFirstModule.company", - "publicEntityName": "MyFirstModule.company", - "topicName": "bde821e1-f8cf-43c3-9c49-8af49bebb084.16747dc6-b6b7-42ae-aabf-255dca2aeeaf.56f74de7-32c5-48c9-8157-7df3670896db.1_0_0", - "attributeMapping": { - "INT_CompanyName": "CompanyName", - "INT_CompanyId": "CompanyId", - "INT_CompanyAddress": "INT_CompanyAddress" - } - }, - { - "originalEntityName": "MyFirstModule.project", - "publicEntityName": "MyFirstModule.projectPublic", - "topicName": "bde821ec-f8cf-43cc-9c4c-8af49bebb08c.16747dcc-b6bc-42ac-aabc-255dca2aeeac.56f74dec-32cc-48cc-8157-7df3670896dc.1_0_0", - "attributeMapping": { - "INT_ProjectName": "ProjectName", - "INT_ProjectId": "ProjectId", - "INT_ProjectAddress": "INT_ProjectAddress" - } - } - ] - }, - { - "brokerUrl": "MyFirstModule.Kafka_broker_url", - "entities": [ - { - "originalEntityName": "MyFirstModule.companyint", - "publicEntityName": "MyFirstModule.companypub", - "topicName": "bde821ed-f8cd-43c3-9c4d-8af49bebb08d.16747dcd-b6bd-42ad-aabd-255dca2aeead.56f74ded-32cd-48cd-815d-7df3670896dd.1_0_0", - "attributeMapping": { - "INT_CompanyPubName": "CompanyPubName", - "INT_CompanyPubId": "CompanyPubId", - "INT_CompanyPubAddress": "INT_CompanyPubAddress" - } - }, - { - "originalEntityName": "MyFirstModule.member", - "publicEntityName": "MyFirstModule.memberpub", - "topicName": "bde821ee-f8ce-43ce-9c4e-8af49bebb08e.16747dce-b6be-42ae-aabe-255dca2aeeae.56f74dee-32ce-48ce-815e-7df3670896de.1_0_0", - "attributeMapping": { - "INT_MemberPubName": "MemberPubName", - "INT_MemberPubId": "MemberPubId", - "INT_MemberPubAddress": "INT_MemberPubAddress" - } - } - ] - } - ] - } -} -""" - # define dependencies config - dependencies_config = """ -{ - "schemaVersion": "0.2", - "appName": "Simple-Producer-App", - "published": [], - "consumed": [] -} -""" - - def setUp(self): - - # transform string to file mode - write_file(TEST_METADATA_FILE_PATH, self.metadata_config) - write_file(TEST_DEPENDENCIES_FILE_PATH, self.dependencies_config) - - # define environment variables - os.environ["MXRUNTIME_DatabaseType"] = "PostgreSQL" - os.environ["MXRUNTIME_DatabaseHost"] = "localhost:5432" - os.environ["MXRUNTIME_DatabaseUserName"] = "mx-app" - os.environ["MXRUNTIME_DatabaseName"] = "mendix" - os.environ["MXRUNTIME_DatabasePassword"] = "mx-app-password" - # environment variable will overwrite the defautl constant value - os.environ["MX_MyFirstModule.Kafka_broker_url"] = TEST_BROKER_URL - - metadata_file = open(TEST_METADATA_FILE_PATH, "rt") - dependencies_file = open(TEST_DEPENDENCIES_FILE_PATH, "rt") - - database_config = {} - self.complete_producer_conf = configinitializer.unify_configs( - [metadata_file, dependencies_file], database_config - ) - - metadata_file.close() - dependencies_file.close() - - def tearDown(self): - os.unlink(TEST_METADATA_FILE_PATH) - os.unlink(TEST_DEPENDENCIES_FILE_PATH) - - def _check_folder_exist(self, folder_path): - os.makedirs(folder_path, exist_ok=True) - - def test_kafka_connect_config(self): - - self._check_folder_exist(KAFKA_CONNECT_DIR) - - # check config has been generated - connect.setup_configs(self.complete_producer_conf) - - assert os.path.isfile(KAFKA_CONNECT_CFG_PATH) - - actual_config = {} - with open(KAFKA_CONNECT_CFG_PATH, "r") as f: - for line in f.readlines(): - tmp_line = line.strip().split("=") - actual_config[tmp_line[0]] = tmp_line[1] - - assert actual_config["bootstrap.servers"] == os.environ.get( - "MX_MyFirstModule.Kafka_broker_url" - ) - - # verify postgres whitelists - debezium_config = json.loads( - debezium_generator.generate_config(self.complete_producer_conf) - ) - assert ( - debezium_config["config"]["table.whitelist"] - == ".*MyFirstModule.company.*,.*MyFirstModule.project.*" - ) - assert ( - debezium_config["config"]["column.whitelist"] - == ".*MyFirstModule.company.*,MyFirstModule.company.id,.*MyFirstModule.project.*,MyFirstModule.project.id" - ) - - def test_streams_override(self): - os.environ["DATABROKER_STREAMS_VERSION"] = "0.99999" - assert streams.get_pdr_stream_version() == "0.99999" - del os.environ["DATABROKER_STREAMS_VERSION"] # reset - # default - assert streams.get_pdr_stream_version() == "0.23.0-9" - - # There are two configs for streams - # one is topology.conf another is azkarra.conf - # Make sure specifice fields would be replaced - # with correct value based on template file - def test_stream_config(self): - - self._check_folder_exist(STREAM_SIDECAR_DIR) - - streams.setup_configs( - self.complete_producer_conf, - get_dependency(streams.DEPENDENCY)["version"], - ) - - # verify topology config - assert os.path.isfile(STREAM_TOPOLOGY_CFG_PATH) - - expect_metadata_config = json.loads(self.metadata_config) - with open(STREAM_TOPOLOGY_CFG_PATH, "r") as f: - actual_config = json.loads(f.read()) - - assert actual_config["topologies"][0]["name"] == "{} topology".format( - expect_metadata_config["DataBrokerConfiguration"]["publishedServices"][ - 0 - ]["entities"][0]["publicEntityName"] - ) - assert ( - actual_config["topologies"][0]["source"] - == "mendix.public.myfirstmodule_company.private" - ) - assert ( - actual_config["topologies"][0]["sink"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["topicName"] - ) - - assert ( - actual_config["topologies"][0]["originalEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["originalEntityName"] - ) - assert ( - actual_config["topologies"][0]["publicEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["publicEntityName"] - ) - assert ( - actual_config["topologies"][0]["attributeMapping"]["INT_CompanyName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["attributeMapping"]["INT_CompanyName"] - ) - assert ( - actual_config["topologies"][0]["attributeMapping"]["INT_CompanyId"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["attributeMapping"]["INT_CompanyId"] - ) - assert ( - actual_config["topologies"][0]["attributeMapping"]["INT_CompanyAddress"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][0]["attributeMapping"]["INT_CompanyAddress"] - ) - - assert ( - actual_config["topologies"][1]["originalEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][1]["originalEntityName"] - ) - assert ( - actual_config["topologies"][1]["publicEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][1]["publicEntityName"] - ) - assert ( - actual_config["topologies"][1]["attributeMapping"]["INT_ProjectName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][1]["attributeMapping"]["INT_ProjectName"] - ) - assert ( - actual_config["topologies"][1]["attributeMapping"]["INT_ProjectId"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][1]["attributeMapping"]["INT_ProjectId"] - ) - assert ( - actual_config["topologies"][1]["attributeMapping"]["INT_ProjectAddress"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][0]["entities"][1]["attributeMapping"]["INT_ProjectAddress"] - ) - - assert ( - actual_config["topologies"][2]["originalEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][0]["originalEntityName"] - ) - assert ( - actual_config["topologies"][2]["publicEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][0]["publicEntityName"] - ) - assert ( - actual_config["topologies"][2]["attributeMapping"]["INT_CompanyPubName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][0]["attributeMapping"]["INT_CompanyPubName"] - ) - assert ( - actual_config["topologies"][2]["attributeMapping"]["INT_CompanyPubId"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][0]["attributeMapping"]["INT_CompanyPubId"] - ) - assert ( - actual_config["topologies"][2]["attributeMapping"][ - "INT_CompanyPubAddress" - ] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][0]["attributeMapping"]["INT_CompanyPubAddress"] - ) - - assert ( - actual_config["topologies"][3]["originalEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][1]["originalEntityName"] - ) - assert ( - actual_config["topologies"][3]["publicEntityName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][1]["publicEntityName"] - ) - assert ( - actual_config["topologies"][3]["attributeMapping"]["INT_MemberPubName"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][1]["attributeMapping"]["INT_MemberPubName"] - ) - assert ( - actual_config["topologies"][3]["attributeMapping"]["INT_MemberPubId"] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][1]["attributeMapping"]["INT_MemberPubId"] - ) - assert ( - actual_config["topologies"][3]["attributeMapping"][ - "INT_MemberPubAddress" - ] - == expect_metadata_config["DataBrokerConfiguration"][ - "publishedServices" - ][1]["entities"][1]["attributeMapping"]["INT_MemberPubAddress"] - ) - - # verify azkarra config - assert os.path.isfile(STREAM_AZKARRA_CFG_PATH) - with open(STREAM_AZKARRA_CFG_PATH, "r") as f: - actual_config = f.read() - - assert ( - str(actual_config).find( - 'bootstrap.servers = "{}"'.format( - os.environ.get("MX_MyFirstModule.Kafka_broker_url") - ) - ) - > 1 - ) - - # verify log4j configuration - assert os.path.isfile(LOG4J_DEBEZIUM_CFG_PATH) - assert ( - Path(LOG4J_DEBEZIUM_CFG_PATH) - .read_text() - .find("log4j.rootLogger=INFO, stdout") - > -1 - ) diff --git a/tests/unit/test_runtime_configuration.py b/tests/unit/test_runtime_configuration.py index bf8cacd31..716ff2cec 100644 --- a/tests/unit/test_runtime_configuration.py +++ b/tests/unit/test_runtime_configuration.py @@ -151,10 +151,12 @@ def _create_self_signed_cert(): # pylint: disable=no-method-argument @mock.patch.dict(os.environ, CERTIFICATE_ENV) def test_selfsigned_certificate_less_mx720(self): + util.mkdir_p(".local") # make sure .local exists in unit test result = security.get_client_certificates(MXVersion(7.16)) assert "WebServiceClientCertificates" in result @mock.patch.dict(os.environ, CERTIFICATE_ENV) def test_selfsigned_certificate_greq_mx720(self): + util.mkdir_p(".local") # make sure .local exists in unit test result = security.get_client_certificates(MXVersion(7.23)) assert "ClientCertificateUsages" in result