Skip to content

Commit 391a9a7

Browse files
Merge pull request #44 from openvstorage/develop
Develop in master
2 parents 6d0eff1 + 70e4784 commit 391a9a7

File tree

9 files changed

+134
-101
lines changed

9 files changed

+134
-101
lines changed

helpers/api.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,8 @@ def __init__(self, *args, **kwargs):
5858

5959
class TimeOutError(RuntimeError):
6060
"""
61-
Custom tineout class
61+
Custom timeout class
6262
"""
63-
def __init__(self, status_code, *args, **kwargs):
64-
super(TimeOutError, self).__init__(*args, **kwargs)
6563

6664

6765
class OVSClient(object):
@@ -187,7 +185,7 @@ def _process(self, response, overrule_raw=False):
187185
else:
188186
raise HttpException(status_code, message)
189187

190-
def _call(self, api, params, function, **kwargs):
188+
def _call(self, api, params, func, **kwargs):
191189
if not api.endswith('/'):
192190
api = '{0}/'.format(api)
193191
if not api.startswith('/'):
@@ -197,15 +195,15 @@ def _call(self, api, params, function, **kwargs):
197195
first_connect = self._token is None
198196
headers, url = self._prepare(params=params)
199197
try:
200-
return self._process(function(url=url.format(api), headers=headers, verify=self._verify, **kwargs))
198+
return self._process(func(url=url.format(api), headers=headers, verify=self._verify, **kwargs))
201199
except ForbiddenException:
202200
if self._volatile_client is not None:
203201
self._volatile_client.delete(self._key)
204202
if first_connect is True: # First connect, so no token was present yet, so no need to try twice without token
205203
raise
206204
self._token = None
207205
headers, url = self._prepare(params=params)
208-
return self._process(function(url=url.format(api), headers=headers, verify=self._verify, **kwargs))
206+
return self._process(func(url=url.format(api), headers=headers, verify=self._verify, **kwargs))
209207
except Exception:
210208
if self._volatile_client is not None:
211209
self._volatile_client.delete(self._key)
@@ -216,15 +214,15 @@ def delete(self, api):
216214
Executes a DELETE call
217215
:param api: Specification for to fill out in the URL, eg: /alba/backends/<albabackend_guid>
218216
"""
219-
return self._call(api=api, params={}, function=requests.delete)
217+
return self._call(api=api, params={}, func=requests.delete)
220218

221219
def get(self, api, params=None):
222220
"""
223221
Executes a GET call
224222
:param api: Specification for to fill out in the URL, eg: /vpools/<vpool_guid>/shrink_vpool
225223
:param params: Additional query parameters, eg: _dynamics
226224
"""
227-
return self._call(api=api, params=params, function=requests.get)
225+
return self._call(api=api, params=params, func=requests.get)
228226

229227
def post(self, api, data=None, params=None):
230228
"""
@@ -233,7 +231,7 @@ def post(self, api, data=None, params=None):
233231
:param data: Data to post
234232
:param params: Additional query parameters, eg: _dynamics
235233
"""
236-
return self._call(api=api, params=params, function=requests.post, data=self._to_json(data))
234+
return self._call(api=api, params=params, func=requests.post, data=self._to_json(data))
237235

238236
def put(self, api, data=None, params=None):
239237
"""
@@ -242,7 +240,7 @@ def put(self, api, data=None, params=None):
242240
:param data: Data to put
243241
:param params: Additional query parameters, eg: _dynamics
244242
"""
245-
return self._call(api=api, params=params, function=requests.put, data=self._to_json(data))
243+
return self._call(api=api, params=params, func=requests.put, data=self._to_json(data))
246244

247245
def patch(self, api, data=None, params=None):
248246
"""
@@ -251,7 +249,7 @@ def patch(self, api, data=None, params=None):
251249
:param data: Data to patch
252250
:param params: Additional query parameters, eg: _dynamics
253251
"""
254-
return self._call(api=api, params=params, function=requests.patch, data=self._to_json(data))
252+
return self._call(api=api, params=params, func=requests.patch, data=self._to_json(data))
255253

256254
def wait_for_task(self, task_id, timeout=None):
257255
"""

helpers/hypervisor/apis/kvm/sdk.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ def _recurse(treeitem):
4949
return result
5050

5151

52-
def authenticated(function):
52+
def authenticated(func):
5353
"""
5454
Decorator that make sure all required calls are running onto a connected SDK
5555
"""
5656
def wrapper(self, *args, **kwargs):
57-
self.__doc__ = function.__doc__
57+
self.__doc__ = func.__doc__
5858
# determine if connection isn't closed.
5959
try:
6060
self._conn = self.connect(self.login, self.host)
@@ -64,7 +64,7 @@ def wrapper(self, *args, **kwargs):
6464
except:
6565
pass
6666
raise
67-
return function(self, *args, **kwargs)
67+
return func(self, *args, **kwargs)
6868
return wrapper
6969

7070

@@ -622,7 +622,7 @@ def create_vm(self, name, vcpus, ram, disks, cdrom_iso=None, os_type=None, os_va
622622

623623
ovs_vm = False
624624
if edge_configuration is not None:
625-
required_edge_params = {'port': (int, {'min': 1, 'max': 65565}),
625+
required_edge_params = {'port': (int, {'min': 1, 'max': 65535}),
626626
'protocol': (str, ['tcp', 'udp', 'rdma']),
627627
'hostname': (str, None),
628628
'username': (str, None, False),

helpers/hypervisor/apis/vmware/sdk.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,23 @@ def authenticated(force=False):
3838
Decorator to make that a login is executed in case the current session isn't valid anymore
3939
@param force: Force a (re)login, as some methods also work when not logged in
4040
"""
41-
def wrapper(function):
41+
def wrapper(func):
4242
def new_function(self, *args, **kwargs):
43-
self.__doc__ = function.__doc__
43+
self.__doc__ = func.__doc__
4444
try:
4545
if force:
4646
self._login()
47-
return function(self, *args, **kwargs)
47+
return func(self, *args, **kwargs)
4848
except WebFault as fault:
4949
if 'The session is not authenticated' in str(fault):
5050
logger.debug('Received WebFault authentication failure, logging in...')
5151
self._login()
52-
return function(self, *args, **kwargs)
52+
return func(self, *args, **kwargs)
5353
raise
5454
except NotAuthenticatedException:
5555
logger.debug('Received NotAuthenticatedException, logging in...')
5656
self._login()
57-
return function(self, *args, **kwargs)
57+
return func(self, *args, **kwargs)
5858
return new_function
5959
return wrapper
6060

helpers/system.py

Lines changed: 76 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313
#
1414
# Open vStorage is distributed in the hope that it will be useful,
1515
# but WITHOUT ANY WARRANTY of any kind.
16-
16+
import time
1717
from ovs.log.log_handler import LogHandler
18-
from ovs.extensions.generic.system import System
18+
from ovs.extensions.services.service import ServiceManager
1919
from ovs.extensions.generic.sshclient import SSHClient
20-
from ovs.extensions.packages.package import PackageManager
21-
from ..helpers.init_manager import InitManager, InitManagerSupported
20+
from ovs.extensions.generic.system import System
2221

2322

2423
class SystemHelper(object):
@@ -31,40 +30,20 @@ def __init__(self):
3130
pass
3231

3332
@staticmethod
34-
def get_non_running_ovs_services(storagerouter_ip):
33+
def get_non_running_ovs_services(client):
3534
"""
3635
get all non-running ovs services
37-
:param storagerouter_ip: ip address of a existing storagerouter
38-
:type storagerouter_ip: str
36+
:param client: sshclient instance
3937
:return: list of non running ovs services
4038
:rtype: list
4139
"""
42-
client = SSHClient(storagerouter_ip, username='root')
43-
if InitManager.INIT_MANAGER == InitManagerSupported.INIT:
44-
ovs_services = [service for service in client.dir_list(InitManager.UPSTART_BASEDIR) if 'ovs-' in service]
45-
return [ovs_service.split('.')[0] for ovs_service in ovs_services
46-
if not InitManager.service_running(ovs_service.split('.')[0], storagerouter_ip)]
47-
elif InitManager.INIT_MANAGER == InitManagerSupported.SYSTEMD:
48-
ovs_services = [service for service in client.dir_list(InitManager.SYSTEMD_BASEDIR) if 'ovs-' in service]
49-
return [ovs_service.split('.')[0] for ovs_service in ovs_services
50-
if not InitManager.service_running(ovs_service.split('.')[0], storagerouter_ip)]
51-
52-
@staticmethod
53-
def get_missing_packages(ip, required_packages):
54-
"""
55-
Get all missing packages based on required packages
56-
57-
:param ip: ip address of a server
58-
:type ip: str
59-
:param required_packages: a list of required packages (e.g. ['openvstorage', 'qemu', 'fio'])
60-
:type required_packages: list
61-
:return: list of missing packages
62-
:rtype: list
63-
"""
64-
client = SSHClient(ip, username='root')
65-
return [package for package in required_packages
66-
if client.run("dpkg -s {0} | grep Status | cut -d ' ' -f 3".format(package),
67-
allow_insecure=True) != "ok"]
40+
non_running_ovs_services = []
41+
for service in ServiceManager.list_services(client):
42+
if not service.startswith('ovs-'):
43+
continue
44+
if ServiceManager.get_service_status(service, client) != 'active':
45+
non_running_ovs_services.append(service)
46+
return non_running_ovs_services
6847

6948
@staticmethod
7049
def get_local_storagerouter():
@@ -75,31 +54,76 @@ def get_local_storagerouter():
7554
"""
7655
return System.get_my_storagerouter()
7756

78-
@staticmethod
79-
def get_ovs_version(client):
57+
@classmethod
58+
def get_ovs_version(cls, storagerouter=None):
8059
"""
8160
Gets the installed ovs version
82-
:param client: Sshclient instance
83-
:type client: ovs.extensions.generic.sshclient.SSHClient
61+
:param storagerouter: Storagerouter to fetch info from
8462
:return: ovs version identifier. Either ee or ose
8563
:rtype: str
8664
"""
87-
# @todo replace with storagerouter.features instead
88-
# Version mapping with identifier
89-
mapping = {'ee': 'volumedriver-ee-base',
90-
'ose': 'volumedriver-no-dedup-base'}
91-
installed_versions = PackageManager.get_installed_versions(client=client)
92-
for ovs_version, detection_key in mapping.iteritems():
93-
if detection_key in installed_versions:
94-
return ovs_version
65+
if storagerouter is None:
66+
storagerouter = cls.get_local_storagerouter()
67+
if storagerouter.features['alba']['edition'] == 'community':
68+
return 'ose'
69+
else:
70+
return 'ee'
9571

9672
@staticmethod
97-
def upper_case_first_letter(x):
73+
def idle_till_ovs_is_up(ip, username, password=None, connection_timeout=300, service_timeout=60, logger=LOGGER):
9874
"""
99-
Upper case the first letter of a string
100-
:param x: a normal string
101-
:type x: str
102-
:return: a normal string with the first letter uppercases
103-
:rtype: str
75+
wait until a node is back up and all ovs related are running (or potentially stuck)
76+
:param ip: ip of the node
77+
:param username: username to login with
78+
:param password: password to login with
79+
:param connection_timeout: raise when not online after these seconds
80+
:param service_timeout: poll for x seconds when checking services
81+
:param logger: logging instance
82+
:raise RuntimeError: when the timeout has been reached
83+
:return: dict with services mapped by their state
10484
"""
105-
return x[0].upper() + x[1:]
85+
# neutral_states = ['inactive', 'deactivating']
86+
failed_states = ['failed', 'error']
87+
active_states = ['active', 'reloading']
88+
activating_state = 'activating'
89+
start_time = time.time()
90+
client = None
91+
while client is None:
92+
delta = time.time() - start_time
93+
if delta > connection_timeout:
94+
raise RuntimeError('Idling has timed out after {0}s'.format(delta))
95+
try:
96+
client = SSHClient(ip, username=username, password=password)
97+
except:
98+
logger.debug('Could not establish a connection yet to {0} after {1}s'.format(ip, delta))
99+
time.sleep(1)
100+
ovs_services = [service for service in ServiceManager.list_services(client) if service.startswith('ovs-')]
101+
active_services = []
102+
failed_service = []
103+
activating_services = []
104+
# Initially class these services
105+
for service in ovs_services:
106+
logger.debug('Initially classifying {0}'.format(service))
107+
service_state = ServiceManager.get_service_status(service, client)
108+
logger.debug('Service {0} - State {1}'.format(service, service_state))
109+
if service_state in failed_states:
110+
failed_service.append(service)
111+
elif service_state in active_states:
112+
active_services.append(service)
113+
elif service_state == activating_state:
114+
activating_services.append(service)
115+
else:
116+
logger.error('Unable to process service state {0}'.format(service_state))
117+
start_time = time.time()
118+
while len(activating_services) > 0:
119+
if time.time() - start_time > service_timeout:
120+
break
121+
service = activating_services.pop()
122+
service_state = ServiceManager.get_service_status(service, client)
123+
if service_state in failed_states:
124+
failed_service.append(service)
125+
elif service_state in active_states:
126+
active_services.append(service)
127+
elif service_state == activating_state:
128+
activating_services.append(service)
129+
return {'active': active_services, 'failed': failed_service, 'activating': activating_services}

helpers/thread.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def start_thread_with_event(target, name, args=(), kwargs=None):
4242
args = args + (event,)
4343
thread = threading.Thread(target=target, args=tuple(args), kwargs=kwargs)
4444
thread.setName(str(name))
45+
thread.setDaemon(True)
4546
thread.start()
4647
return thread, event
4748

@@ -52,18 +53,22 @@ def start_thread(target, name, args=(), kwargs=None):
5253
ThreadHelper.LOGGER.info('Starting thread with target {0}'.format(target))
5354
thread = threading.Thread(target=target, args=tuple(args), kwargs=kwargs)
5455
thread.setName(str(name))
56+
thread.setDaemon(True)
5557
thread.start()
5658
return thread
5759

5860
@staticmethod
59-
def stop_evented_threads(thread_pairs, r_semaphore=None, logger=LOGGER):
61+
def stop_evented_threads(thread_pairs, r_semaphore=None, logger=LOGGER, timeout=300):
6062
for thread_pair in thread_pairs:
6163
if thread_pair[0].isAlive():
6264
thread_pair[1].set()
6365
# Wait again to sync
6466
logger.info('Syncing threads')
6567
if r_semaphore is not None:
68+
start = time.time()
6669
while r_semaphore.get_counter() < len(thread_pairs): # Wait for the number of threads we currently have.
70+
if time.time() - start > timeout:
71+
raise RuntimeError('Synching the thread with the r_semaphore has timed out.')
6772
time.sleep(0.05)
6873
r_semaphore.wait() # Unlock them to let them stop (the object is set -> wont loop)
6974
# Wait for threads to die

remove/vdisk.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ class VDiskRemover(object):
3030
def __init__(self):
3131
pass
3232

33+
@staticmethod
34+
def remove_vdisks_with_structure(vdisks):
35+
"""
36+
Remove many vdisks at once. Will keep the parent structure in mind
37+
:param vdisks: list of vdisks
38+
:return:
39+
"""
40+
removed_guids = []
41+
for vdisk in vdisks:
42+
if vdisk.guid in removed_guids:
43+
continue
44+
if len(vdisk.child_vdisks_guids) > 0:
45+
for vdisk_child_guid in vdisk.child_vdisks_guids:
46+
VDiskRemover.remove_vdisk(vdisk_child_guid)
47+
removed_guids.append(vdisk_child_guid)
48+
VDiskRemover.remove_vdisk(vdisk.guid)
49+
removed_guids.append(vdisk.guid)
50+
3351
@staticmethod
3452
def remove_snapshot(snapshot_guid, vdisk_name, vpool_name, api, timeout=REMOVE_SNAPSHOT_TIMEOUT):
3553
"""

0 commit comments

Comments
 (0)