From 6f5e9afb65b749e64aa50adfcfe0a277795a83d6 Mon Sep 17 00:00:00 2001 From: Dmitry Romanenko Date: Sun, 21 Nov 2021 22:10:21 -0500 Subject: [PATCH] Fix proxies --- tests/test_application_master.py | 2 +- yarn_api_client/application_master.py | 6 +++--- yarn_api_client/hadoop_conf.py | 12 ++++++------ yarn_api_client/history_server.py | 4 ++-- yarn_api_client/node_manager.py | 4 ++-- yarn_api_client/resource_manager.py | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/test_application_master.py b/tests/test_application_master.py index 0dce2c7..35f5f3f 100644 --- a/tests/test_application_master.py +++ b/tests/test_application_master.py @@ -14,7 +14,7 @@ def setUp(self): def test__init__(self, get_config_mock, request_mock): get_config_mock.return_value = None ApplicationMaster() - get_config_mock.assert_called_with(30, None, True) + get_config_mock.assert_called_with(30, None, True, None) def test_application_information(self, request_mock): self.app.application_information('app_100500') diff --git a/yarn_api_client/application_master.py b/yarn_api_client/application_master.py index 1b9a7e9..d33449e 100644 --- a/yarn_api_client/application_master.py +++ b/yarn_api_client/application_master.py @@ -26,11 +26,11 @@ class ApplicationMaster(BaseYarnAPI): we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None): if not service_endpoint: - service_endpoint = get_webproxy_endpoint(timeout, auth, verify) + service_endpoint = get_webproxy_endpoint(timeout, auth, verify, proxies) - super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify) + super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify, proxies) def application_information(self, application_id): """ diff --git a/yarn_api_client/hadoop_conf.py b/yarn_api_client/hadoop_conf.py index f9624d4..0c15d8f 100644 --- a/yarn_api_client/hadoop_conf.py +++ b/yarn_api_client/hadoop_conf.py @@ -50,9 +50,9 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None): return ('https://' if is_https_only else 'http://') + rm_address if rm_address else None -def check_is_active_rm(url, timeout=30, auth=None, verify=True): +def check_is_active_rm(url, timeout=30, auth=None, verify=True, proxies=None): try: - response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify) + response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify, proxies=proxies) except requests.RequestException as e: log.warning("Exception encountered accessing RM '{url}': '{err}', continuing...".format(url=url, err=e)) return False @@ -64,7 +64,7 @@ def check_is_active_rm(url, timeout=30, auth=None, verify=True): return True -def get_resource_manager_endpoint(timeout=30, auth=None, verify=True): +def get_resource_manager_endpoint(timeout=30, auth=None, verify=True, proxies=None): log.info('Getting resource manager endpoint from config: {config_path}'.format(config_path=os.path.join(CONF_DIR, 'yarn-site.xml'))) hadoop_conf_path = CONF_DIR rm_ids = _get_rm_ids(hadoop_conf_path) @@ -72,7 +72,7 @@ def get_resource_manager_endpoint(timeout=30, auth=None, verify=True): for rm_id in rm_ids: ret = _get_resource_manager(hadoop_conf_path, rm_id) if ret: - if check_is_active_rm(ret, timeout, auth, verify): + if check_is_active_rm(ret, timeout, auth, verify, proxies): return ret return None else: @@ -93,12 +93,12 @@ def get_nodemanager_endpoint(): return parse(config_path, prop_name) -def get_webproxy_endpoint(timeout=30, auth=None, verify=True): +def get_webproxy_endpoint(timeout=30, auth=None, verify=True, proxies=None): config_path = os.path.join(CONF_DIR, 'yarn-site.xml') log.info('Getting webproxy endpoint from config: {config_path}'.format(config_path=config_path)) prop_name = 'yarn.web-proxy.address' value = parse(config_path, prop_name) - return value or get_resource_manager_endpoint(timeout, auth, verify) + return value or get_resource_manager_endpoint(timeout, auth, verify, proxies) def parse(config_path, key): diff --git a/yarn_api_client/history_server.py b/yarn_api_client/history_server.py index 92b131d..34fc9c2 100644 --- a/yarn_api_client/history_server.py +++ b/yarn_api_client/history_server.py @@ -25,11 +25,11 @@ class HistoryServer(BaseYarnAPI): we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None): if not service_endpoint: service_endpoint = get_jobhistory_endpoint() - super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify) + super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify, proxies) def application_information(self): """ diff --git a/yarn_api_client/node_manager.py b/yarn_api_client/node_manager.py index 26fe5a0..20084e1 100644 --- a/yarn_api_client/node_manager.py +++ b/yarn_api_client/node_manager.py @@ -35,11 +35,11 @@ class NodeManager(BaseYarnAPI): we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True): + def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None): if not service_endpoint: service_endpoint = get_nodemanager_endpoint() - super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify) + super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify, proxies) def node_information(self): """ diff --git a/yarn_api_client/resource_manager.py b/yarn_api_client/resource_manager.py index 91ac74e..3f27c4d 100644 --- a/yarn_api_client/resource_manager.py +++ b/yarn_api_client/resource_manager.py @@ -83,7 +83,7 @@ class ResourceManager(BaseYarnAPI): we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True`` """ - def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True): + def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True, proxies=None): active_service_endpoint = None if not service_endpoints: active_service_endpoint = get_resource_manager_endpoint(timeout, auth, verify) @@ -94,7 +94,7 @@ def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True): break if active_service_endpoint: - super(ResourceManager, self).__init__(active_service_endpoint, timeout, auth, verify) + super(ResourceManager, self).__init__(active_service_endpoint, timeout, auth, verify, proxies) else: raise Exception("No active RMs found")