Skip to content

Commit 4ff475c

Browse files
rsuplinawmudge
andauthored
Add cm_cluster module and update assemble_cluster_template module (#197)
* Add cm_cluster module * Relocate cluster template merge logic to cm_utils * Update logic in cm_cluster module and use shared cluster template merge functions * Add pytest resources * Add unit tests for assemble_cluster_template module * Add unit tests for assemble_cluster_template action * Add unit tests for multiple idempotent keys and elements in merge functions * Add short-circuit merger for initial fragment * Add 'regex' alias to assemble_cluster_template module * Add "fix" for VSCode pytest discovery * Add pythonpath parameter to pytest.ini * Add wait_for_command_state Signed-off-by: rsuplina <[email protected]> Signed-off-by: Webster Mudge <[email protected]> Co-authored-by: Webster Mudge <[email protected]>
1 parent d5a771f commit 4ff475c

File tree

16 files changed

+1790
-150
lines changed

16 files changed

+1790
-150
lines changed

plugins/action/assemble_cluster_template.py

Lines changed: 12 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,72 +29,15 @@
2929
from ansible.plugins.action import ActionBase
3030
from ansible.utils.hashing import checksum_s
3131

32+
from ansible_collections.cloudera.cluster.plugins.module_utils.cm_utils import ClusterTemplate
3233

3334
class ActionModule(ActionBase):
3435
TRANSFERS_FILES = True
35-
36-
MERGED = {}
37-
IDEMPOTENT_IDS = ["refName", "name", "clusterName", "hostName", "product"]
38-
UNIQUE_IDS = ["repositories"]
39-
40-
def update_object(self, base, template, breadcrumbs=""):
41-
if isinstance(base, dict) and isinstance(template, dict):
42-
self.update_dict(base, template, breadcrumbs)
43-
return True
44-
elif isinstance(base, list) and isinstance(template, list):
45-
self.update_list(base, template, breadcrumbs)
46-
return True
47-
return False
48-
49-
def update_dict(self, base, template, breadcrumbs=""):
50-
for key, value in template.items():
51-
crumb = breadcrumbs + "/" + key
52-
53-
if key in self.IDEMPOTENT_IDS:
54-
if base[key] != value:
55-
self._display.error(
56-
"Objects with distinct IDs should not be merged: " + crumb
57-
)
58-
continue
59-
60-
if key not in base:
61-
base[key] = value
62-
elif not self.update_object(base[key], value, crumb) and base[key] != value:
63-
self._display.warning(
64-
f"Value being overwritten for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
65-
)
66-
base[key] = value
67-
68-
if key in self.UNIQUE_IDS:
69-
base[key] = list(set(base[key]))
70-
71-
def update_list(self, base, template, breadcrumbs=""):
72-
for item in template:
73-
if isinstance(item, dict):
74-
for attr in self.IDEMPOTENT_IDS:
75-
if attr in item:
76-
idempotent_id = attr
77-
break
78-
else:
79-
idempotent_id = None
80-
if idempotent_id:
81-
namesake = [
82-
i for i in base if i[idempotent_id] == item[idempotent_id]
83-
]
84-
if namesake:
85-
self.update_dict(
86-
namesake[0],
87-
item,
88-
breadcrumbs
89-
+ "/["
90-
+ idempotent_id
91-
+ "="
92-
+ item[idempotent_id]
93-
+ "]",
94-
)
95-
continue
96-
base.append(item)
97-
base.sort(key=lambda x: json.dumps(x, sort_keys=True))
36+
37+
def __init__(self, task, connection, play_context, loader, templar, shared_loader_obj):
38+
super().__init__(task, connection, play_context, loader, templar, shared_loader_obj)
39+
self.TEMPLATE = ClusterTemplate(warn_fn=self._display.warning, error_fn=self._display.error)
40+
self.MERGED = {}
9841

9942
def assemble_fragments(
10043
self, assembled_file, src_path, regex=None, ignore_hidden=True, decrypt=True
@@ -121,7 +64,10 @@ def assemble_fragments(
12164
encoding="utf-8",
12265
) as fragment_file:
12366
try:
124-
self.update_object(self.MERGED, json.loads(fragment_file.read()))
67+
if not self.MERGED:
68+
self.MERGED = json.loads(fragment_file.read())
69+
else:
70+
self.TEMPLATE.merge(self.MERGED, json.loads(fragment_file.read()))
12571
except json.JSONDecodeError as e:
12672
raise AnsibleActionFail(
12773
message=f"JSON parsing error: {to_text(e.msg)}",
@@ -155,6 +101,8 @@ def run(self, tmp=None, task_vars=None):
155101
regexp = self._task.args.get("regexp", None)
156102
if regexp is None:
157103
regexp = self._task.args.get("filter", None)
104+
if regexp is None:
105+
regexp = self._task.args.get("regex", None)
158106

159107
remote_src = boolean(self._task.args.get("remote_src", False))
160108
follow = boolean(self._task.args.get("follow", False))

plugins/module_utils/cm_utils.py

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import logging
2222

2323
from functools import wraps
24+
from typing import Union
2425
from urllib3 import disable_warnings
2526
from urllib3.exceptions import InsecureRequestWarning, MaxRetryError, HTTPError
2627
from urllib3.util import Url
2728
from urllib.parse import urljoin
2829
from time import sleep
2930
from ansible.module_utils.basic import AnsibleModule
3031
from ansible.module_utils.common.text.converters import to_text
31-
32+
from time import sleep
3233
from cm_client import ApiClient, Configuration
3334
from cm_client.rest import ApiException, RESTClientObject
3435
from cm_client.apis.cloudera_manager_resource_api import ClouderaManagerResourceApi
@@ -39,6 +40,101 @@
3940
__maintainer__ = ["[email protected]"]
4041

4142

43+
class ClusterTemplate(object):
44+
IDEMPOTENT_IDS = frozenset(
45+
["refName", "name", "clusterName", "hostName", "product"]
46+
)
47+
UNIQUE_IDS = frozenset(["repositories"])
48+
49+
def __init__(self, warn_fn, error_fn) -> None:
50+
self._warn = warn_fn
51+
self._error = error_fn
52+
53+
def merge(self, base: Union[dict, list], fragment: Union[dict, list]) -> bool:
54+
if isinstance(base, dict) and isinstance(fragment, dict):
55+
self._update_dict(base, fragment)
56+
elif isinstance(base, list) and isinstance(fragment, list):
57+
self._update_list(base, fragment)
58+
else:
59+
raise TypeError(
60+
f"Base and fragment arguments must be the same type: base[{type(base)}], fragment[{type(fragment)}]"
61+
)
62+
63+
def _update_dict(self, base, fragment, breadcrumbs="") -> None:
64+
for key, value in fragment.items():
65+
crumb = breadcrumbs + "/" + key
66+
67+
# If the key is idempotent, error that the values are different
68+
if key in self.IDEMPOTENT_IDS:
69+
if base[key] != value:
70+
self._error(f"Unable to override value for distinct key [{crumb}]")
71+
continue
72+
73+
# If it's a new key, add to the bae
74+
if key not in base:
75+
base[key] = value
76+
# If the value is a dictionary, merge
77+
elif isinstance(value, dict):
78+
self._update_dict(base[key], value, crumb)
79+
# If the value is a list, merge
80+
elif isinstance(value, list):
81+
self._update_list(base[key], value, crumb)
82+
# Else the value is a scalar
83+
else:
84+
# If the value is different, override
85+
if base[key] != value:
86+
self._warn(
87+
f"Overriding value for key [{crumb}]], Old: [{base[key]}], New: [{value}]"
88+
)
89+
base[key] = value
90+
91+
if key in self.UNIQUE_IDS:
92+
base[key] = list(set(base[key]))
93+
base[key].sort(key=lambda x: json.dumps(x, sort_keys=True))
94+
95+
def _update_list(self, base, fragment, breadcrumbs="") -> None:
96+
for entry in fragment:
97+
if isinstance(entry, dict):
98+
# Discover if the incoming dict has an idempotent key
99+
idempotent_key = next(
100+
iter(
101+
[
102+
id
103+
for id in set(entry.keys()).intersection(
104+
self.IDEMPOTENT_IDS
105+
)
106+
]
107+
),
108+
None,
109+
)
110+
111+
# Merge the idemponent key's dictionary rather than appending as a new entry
112+
if idempotent_key:
113+
existing_entry = next(
114+
iter(
115+
[
116+
i
117+
for i in base
118+
if isinstance(i, dict)
119+
and idempotent_key in i
120+
and i[idempotent_key] == entry[idempotent_key]
121+
]
122+
),
123+
None,
124+
)
125+
if existing_entry:
126+
self._update_dict(
127+
existing_entry,
128+
entry,
129+
f"{breadcrumbs}/[{idempotent_key}={entry[idempotent_key]}]",
130+
)
131+
continue
132+
# Else, drop to appending the entry as net new
133+
base.append(entry)
134+
135+
base.sort(key=lambda x: json.dumps(x, sort_keys=True))
136+
137+
42138
class ClouderaManagerModule(object):
43139
"""Base Ansible Module for API access to Cloudera Manager."""
44140

@@ -61,13 +157,12 @@ def _add_log(err):
61157
err = dict(
62158
msg="API error: " + to_text(ae.reason),
63159
status_code=ae.status,
64-
body=ae.body.decode("utf-8"),
65160
)
66-
if err["body"] != "":
161+
if ae.body:
67162
try:
68-
err.update(body=json.loads(err["body"]))
69-
except Exception as te:
70-
pass
163+
err.update(body=json.loads(ae.body))
164+
except Exception:
165+
err.update(body=ae.body.decode("utf-8")),
71166

72167
self.module.fail_json(**_add_log(err))
73168
except MaxRetryError as maxe:
@@ -285,7 +380,7 @@ def ansible_module(
285380
mutually_exclusive=[],
286381
required_one_of=[],
287382
required_together=[],
288-
**kwargs
383+
**kwargs,
289384
):
290385
"""
291386
Creates the base Ansible module argument spec and dependencies,

0 commit comments

Comments
 (0)