Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/test_application_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def setUp(self):
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = None
ApplicationMaster()
get_config_mock.assert_called_with(30, None, True)
get_config_mock.assert_called_with(30, None, True, None)

def test_application_information(self, request_mock):
self.app.application_information('app_100500')
Expand Down
6 changes: 3 additions & 3 deletions yarn_api_client/application_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class ApplicationMaster(BaseYarnAPI):
we verify the server's TLS certificate, or a string, in which case it must
be a path to a CA bundle to use. Defaults to ``True``
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None):
if not service_endpoint:
service_endpoint = get_webproxy_endpoint(timeout, auth, verify)
service_endpoint = get_webproxy_endpoint(timeout, auth, verify, proxies)

super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify)
super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify, proxies)

def application_information(self, application_id):
"""
Expand Down
12 changes: 6 additions & 6 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
return ('https://' if is_https_only else 'http://') + rm_address if rm_address else None


def check_is_active_rm(url, timeout=30, auth=None, verify=True):
def check_is_active_rm(url, timeout=30, auth=None, verify=True, proxies=None):
try:
response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify)
response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify, proxies=proxies)
except requests.RequestException as e:
log.warning("Exception encountered accessing RM '{url}': '{err}', continuing...".format(url=url, err=e))
return False
Expand All @@ -64,15 +64,15 @@ def check_is_active_rm(url, timeout=30, auth=None, verify=True):
return True


def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):
def get_resource_manager_endpoint(timeout=30, auth=None, verify=True, proxies=None):
log.info('Getting resource manager endpoint from config: {config_path}'.format(config_path=os.path.join(CONF_DIR, 'yarn-site.xml')))
hadoop_conf_path = CONF_DIR
rm_ids = _get_rm_ids(hadoop_conf_path)
if rm_ids:
for rm_id in rm_ids:
ret = _get_resource_manager(hadoop_conf_path, rm_id)
if ret:
if check_is_active_rm(ret, timeout, auth, verify):
if check_is_active_rm(ret, timeout, auth, verify, proxies):
return ret
return None
else:
Expand All @@ -93,12 +93,12 @@ def get_nodemanager_endpoint():
return parse(config_path, prop_name)


def get_webproxy_endpoint(timeout=30, auth=None, verify=True):
def get_webproxy_endpoint(timeout=30, auth=None, verify=True, proxies=None):
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
log.info('Getting webproxy endpoint from config: {config_path}'.format(config_path=config_path))
prop_name = 'yarn.web-proxy.address'
value = parse(config_path, prop_name)
return value or get_resource_manager_endpoint(timeout, auth, verify)
return value or get_resource_manager_endpoint(timeout, auth, verify, proxies)


def parse(config_path, key):
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/history_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class HistoryServer(BaseYarnAPI):
we verify the server's TLS certificate, or a string, in which case it must
be a path to a CA bundle to use. Defaults to ``True``
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None):
if not service_endpoint:
service_endpoint = get_jobhistory_endpoint()

super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify)
super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify, proxies)

def application_information(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class NodeManager(BaseYarnAPI):
we verify the server's TLS certificate, or a string, in which case it must
be a path to a CA bundle to use. Defaults to ``True``
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True, proxies=None):
if not service_endpoint:
service_endpoint = get_nodemanager_endpoint()

super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify)
super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify, proxies)

def node_information(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ResourceManager(BaseYarnAPI):
we verify the server's TLS certificate, or a string, in which case it must
be a path to a CA bundle to use. Defaults to ``True``
"""
def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True):
def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True, proxies=None):
active_service_endpoint = None
if not service_endpoints:
active_service_endpoint = get_resource_manager_endpoint(timeout, auth, verify)
Expand All @@ -94,7 +94,7 @@ def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True):
break

if active_service_endpoint:
super(ResourceManager, self).__init__(active_service_endpoint, timeout, auth, verify)
super(ResourceManager, self).__init__(active_service_endpoint, timeout, auth, verify, proxies)
else:
raise Exception("No active RMs found")

Expand Down