Skip to content

Pass auth to Active RM check #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/test_application_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def setUp(self):
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = None
ApplicationMaster()
get_config_mock.assert_called_with(30)
get_config_mock.assert_called_with(30, None, True)

def test_application_information(self, request_mock):
self.app.application_information('app_100500')
Expand Down
62 changes: 35 additions & 27 deletions tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@
from mock import patch
from tests import TestCase

import requests_mock
from yarn_api_client import hadoop_conf
import platform
import os
import sys

if sys.version_info[0] == 2:
_mock_exception_method = 'assertRaisesRegexp'
else:
_mock_exception_method = 'assertRaisesRegex'

_http_request_method = ''
_http_getresponse_method = ''
Expand Down Expand Up @@ -139,34 +146,35 @@ def test_get_rm_ids(self):
self.assertIsNone(rm_list)

@mock.patch('yarn_api_client.hadoop_conf._is_https_only')
@mock.patch(_http_request_method)
@mock.patch(_http_getresponse_method)
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock, is_https_only_mock):
class ResponseMock():
def __init__(self, status, header_dict):
self.status = status
self.header_dict = header_dict

def getheader(self, header_key, default_return):
if header_key in self.header_dict:
return self.header_dict[header_key]
else:
return default_return

def test_check_is_active_rm(self, is_https_only_mock):
is_https_only_mock.return_value = False
http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf.check_is_active_rm('example2:8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh': "testing"})
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh': "testing"})
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))
http_conn_request_mock.side_effect = Exception('error')
http_conn_request_mock.reset_mock()
http_conn_request_mock.return_value = None
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))

# Success scenario
with requests_mock.mock() as requests_get_mock:
requests_get_mock.get('https://example2:8022/cluster', status_code=200)
self.assertTrue(hadoop_conf.check_is_active_rm('https://example2:8022'))

# Outage scenario
with requests_mock.mock() as requests_get_mock:
requests_get_mock.get('https://example2:8022/cluster', status_code=500)
self.assertFalse(hadoop_conf.check_is_active_rm('https://example2:8022'))

# Error scenario (URL is wrong - not found)
with requests_mock.mock() as requests_get_mock:
requests_get_mock.get('https://example2:8022/cluster', status_code=404)
self.assertFalse(hadoop_conf.check_is_active_rm('https://example2:8022'))

# Error scenario (necessary Auth is not provided or invalid credentials)
with requests_mock.mock() as requests_get_mock:
requests_get_mock.get('https://example2:8022/cluster', status_code=401)
self.assertFalse(hadoop_conf.check_is_active_rm('https://example2:8022'))

# Emulate requests library exception (socket timeout, etc)
with requests_mock.mock() as requests_get_mock:
requests_get_mock.side_effect = Exception('error')
# requests_get_mock.get('https://example2:8022/cluster', status_code=200)
requests_get_mock.return_value = None
self.assertFalse(hadoop_conf.check_is_active_rm('https://example2:8022'))

def test_get_resource_manager(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def setUp(self, check_is_active_rm_mock):
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = "https:localhost"
rm = ResourceManager()
get_config_mock.assert_called_with(30)
get_config_mock.assert_called_with(30, None, True)
self.assertEqual(rm.service_uri.is_https, True)

def test_cluster_information(self, request_mock):
Expand Down
2 changes: 1 addition & 1 deletion yarn_api_client/application_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ApplicationMaster(BaseYarnAPI):
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
if not service_endpoint:
self.logger.debug('Get configuration from hadoop conf dir')
service_endpoint = get_webproxy_endpoint(timeout)
service_endpoint = get_webproxy_endpoint(timeout, auth, verify)

super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify)

Expand Down
32 changes: 11 additions & 21 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# -*- coding: utf-8 -*-
import os
import xml.etree.ElementTree as ET
try:
from httplib import HTTPConnection, HTTPSConnection, OK
except ImportError:
from http.client import HTTPConnection, HTTPSConnection, OK
from .base import Uri
import requests

CONF_DIR = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf')

Expand Down Expand Up @@ -47,33 +43,27 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
return rm_webapp_address or None


def check_is_active_rm(url, timeout=30):
uri = Uri(url)
if uri.is_https:
conn = HTTPSConnection(host=uri.hostname, port=uri.port, timeout=timeout)
else:
conn = HTTPConnection(host=uri.hostname, port=uri.port, timeout=timeout)
def check_is_active_rm(url, timeout=30, auth=None, verify=True):
try:
conn.request('GET', '/cluster')
response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify)
except:
return False
response = conn.getresponse()
if response.status != OK:

if response.status_code != 200:
print("Error to access RM - HTTP Code {}".format(response.status_code))
return False
else:
if response.getheader('Refresh', None) is not None:
return False
return True
return True


def get_resource_manager_endpoint(timeout=30):
def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):
hadoop_conf_path = CONF_DIR
rm_ids = _get_rm_ids(hadoop_conf_path)
if rm_ids:
for rm_id in rm_ids:
ret = _get_resource_manager(hadoop_conf_path, rm_id)
if ret:
if check_is_active_rm(ret, timeout):
if check_is_active_rm(ret, timeout, auth, verify):
return ret
return None
else:
Expand All @@ -92,11 +82,11 @@ def get_nodemanager_endpoint():
return parse(config_path, prop_name)


def get_webproxy_endpoint(timeout=30):
def get_webproxy_endpoint(timeout=30, auth=None, verify=True):
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
prop_name = 'yarn.web-proxy.address'
value = parse(config_path, prop_name)
return value or get_resource_manager_endpoint(timeout)
return value or get_resource_manager_endpoint(timeout, auth, verify)


def parse(config_path, key):
Expand Down
4 changes: 2 additions & 2 deletions yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True):
active_service_endpoint = None
if not service_endpoints:
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
active_service_endpoint = get_resource_manager_endpoint(timeout)
active_service_endpoint = get_resource_manager_endpoint(timeout, auth, verify)
else:
for endpoint in service_endpoints:
if check_is_active_rm(endpoint, timeout):
if check_is_active_rm(endpoint, timeout, auth, verify):
active_service_endpoint = endpoint
break

Expand Down