diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..dba4d019 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,7 @@ +language: python +services: + - redis-server +before_script: + - pip install pyflakes +script: + - python -m unittest tests/test* diff --git a/jobManager.py b/jobManager.py index 5f24bc33..3798a764 100644 --- a/jobManager.py +++ b/jobManager.py @@ -63,61 +63,56 @@ def _getNextID(self): def __manage(self): self.running = True while True: - id = self.jobQueue.getNextPendingJob() - - if id: - job = self.jobQueue.get(id) - - # job could no longer exist if it was completed by someone else - if job == None: - continue - - if not job.accessKey and Config.REUSE_VMS: - id, vm = self.jobQueue.getNextPendingJobReuse(id) - job = self.jobQueue.get(id) - if job == None: - continue - - try: - # Mark the job assigned - self.jobQueue.assignJob(job.id) - # if the job has specified an account - # create an VM on the account and run on that instance - if job.accessKeyId: - from vmms.ec2SSH import Ec2SSH - vmms = Ec2SSH(job.accessKeyId, job.accessKey) - newVM = copy.deepcopy(job.vm) - newVM.id = self._getNextID() - preVM = vmms.initializeVM(newVM) + # Blocks until we get a next job + job = self.jobQueue.getNextPendingJob() + + if not job.accessKey and Config.REUSE_VMS: + vm = None + while vm is None: + vm = self.jobQueue.reuseVM(job) + # Sleep for a bit and then check again + time.sleep(Config.DISPATCH_PERIOD) + + try: + # Mark the job assigned + job.makeAssigned() + # if the job has specified an account + # create an VM on the account and run on that instance + if job.accessKeyId: + from vmms.ec2SSH import Ec2SSH + vmms = Ec2SSH(job.accessKeyId, job.accessKey) + newVM = copy.deepcopy(job.vm) + newVM.id = self._getNextID() + preVM = vmms.initializeVM(newVM) + else: + # Try to find a vm on the free list and allocate it to + # the worker if successful. + if Config.REUSE_VMS: + preVM = vm else: - # Try to find a vm on the free list and allocate it to - # the worker if successful. - if Config.REUSE_VMS: - preVM = vm - else: - preVM = self.preallocator.allocVM(job.vm.name) - vmms = self.vmms[job.vm.vmms] # Create new vmms object - - # Now dispatch the job to a worker + preVM = self.preallocator.allocVM(job.vm.name) + vmms = self.vmms[job.vm.vmms] # Create new vmms object + + if preVM.name is not None: self.log.info("Dispatched job %s:%d to %s [try %d]" % (job.name, job.id, preVM.name, job.retries)) - job.appendTrace( - "%s|Dispatched job %s:%d [try %d]" % - (datetime.utcnow().ctime(), job.name, job.id, job.retries)) - - Worker( - job, - vmms, - self.jobQueue, - self.preallocator, - preVM - ).start() - - except Exception as err: - self.jobQueue.makeDead(job.id, str(err)) - - # Sleep for a bit and then check again - time.sleep(Config.DISPATCH_PERIOD) + else: + self.log.info("Unable to pre-allocate a vm for job job %s:%d [try %d]" % (job.name, job.id, job.retries)) + + job.appendTrace( + "%s|Dispatched job %s:%d [try %d]" % + (datetime.utcnow().ctime(), job.name, job.id, job.retries)) + + Worker( + job, + vmms, + self.jobQueue, + self.preallocator, + preVM + ).start() + + except Exception as err: + self.jobQueue.makeDead(job.id, str(err)) if __name__ == "__main__": diff --git a/jobQueue.py b/jobQueue.py index c64484a6..f5b445e2 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -13,7 +13,7 @@ import threading, logging, time from datetime import datetime -from tangoObjects import TangoDictionary, TangoJob +from tangoObjects import TangoDictionary, TangoJob, TangoQueue from config import Config # @@ -34,8 +34,30 @@ class JobQueue(object): def __init__(self, preallocator): + """ + Here we maintain several data structures used to keep track of the + jobs present for the autograder. + + Live jobs contains: + - jobs that are yet to be assigned and run + - jobs that are currently running + + Dead jobs contains: + - jobs that have been completed, or have been 'deleted' when in + the live jobs queue + + Unassigned jobs: + This is a FIFO queue of jobs that are pending assignment. + - We enforce the invariant that all jobs in this queue must be + present in live jobs + + queueLock protects all the internal data structure of JobQueue. This + is needed since there are multiple worker threads and they might be + using the makeUnassigned api. + """ self.liveJobs = TangoDictionary("liveJobs") self.deadJobs = TangoDictionary("deadJobs") + self.unassignedJobs = TangoQueue("unassignedLiveJobs") self.queueLock = threading.Lock() self.preallocator = preallocator self.log = logging.getLogger("JobQueue") @@ -43,7 +65,6 @@ def __init__(self, preallocator): def _getNextID(self): """_getNextID - updates and returns the next ID to be used for a job - Jobs have ID's between 1 and MAX_JOBID. """ self.log.debug("_getNextID|Acquiring lock to job queue.") @@ -51,8 +72,12 @@ def _getNextID(self): self.log.debug("_getNextID|Acquired lock to job queue.") id = self.nextID - # If a job already exists in the queue at nextID, then try to find - # an empty ID. If the queue is full, then return -1. + # If there is an livejob in the queue with with nextID, + # this means that the id is already taken. + # We try to find a free id to use by looping through all + # the job ids possible and finding one that is + # not used by any of the livejobs. + # Return -1 if no such free id is found. keys = self.liveJobs.keys() if (str(id) in keys): id = -1 @@ -63,27 +88,57 @@ def _getNextID(self): self.nextID += 1 if self.nextID > Config.MAX_JOBID: + # Wrap around if job ids go over max job ids avail self.nextID = 1 self.queueLock.release() self.log.debug("_getNextID|Released lock to job queue.") return id + def remove(self, id): + """remove - Remove job from live queue + """ + status = -1 + self.log.debug("remove|Acquiring lock to job queue.") + self.queueLock.acquire() + self.log.debug("remove|Acquired lock to job queue.") + if id in self.liveJobs: + self.liveJobs.delete(id) + status = 0 + self.unassignedJobs.remove(int(id)) + + self.queueLock.release() + self.log.debug("remove|Relased lock to job queue.") + + if status == 0: + self.log.debug("Removed job %s from queue" % id) + else: + self.log.error("Job %s not found in queue" % id) + return status + def add(self, job): """add - add job to live queue - - This function assigns an ID number to a job and then adds it - to the queue of live jobs. + This function assigns an ID number to a *new* job and then adds it + to the queue of live jobs. + Returns the job id on success, -1 otherwise """ if (not isinstance(job, TangoJob)): return -1 + + # Get an id for the new job self.log.debug("add|Getting next ID") - job.setId(self._getNextID()) - if (job.id == -1): + nextId = self._getNextID() + if (nextId == -1): self.log.info("add|JobQueue is full") return -1 + job.setId(nextId) self.log.debug("add|Gotten next ID: " + str(job.id)) + self.log.info("add|Unassigning job ID: %d" % (job.id)) + # Make the job unassigned job.makeUnassigned() + + # Since we assume that the job is new, we set the number of retries + # of this job to 0 job.retries = 0 # Add the job to the queue. Careful not to append the trace until we @@ -92,7 +147,12 @@ def add(self, job): self.queueLock.acquire() self.log.debug("add| Acquired lock to job queue.") + # Adds the job to the live jobs dictionary self.liveJobs.set(job.id, job) + + # Add this to the unassigned job queue too + self.unassignedJobs.put(int(job.id)) + job.appendTrace("%s|Added job %s:%d to queue" % (datetime.utcnow().ctime(), job.name, job.id)) @@ -103,19 +163,28 @@ def add(self, job): self.queueLock.release() self.log.debug("add|Releasing lock to job queue.") - self.log.info("Added job %s:%d to queue, details = %s" % + self.log.info("Added job %s:%s to queue, details = %s" % (job.name, job.id, str(job.__dict__))) return str(job.id) def addDead(self, job): """ addDead - add a job to the dead queue. - - Called by validateJob when a job validation fails. + Called by validateJob when a job validation fails. + Returns -1 on failure and the job id on success """ if (not isinstance(job, TangoJob)): return -1 - job.setId(self._getNextID()) + + # Get an id for the new job + self.log.debug("add|Getting next ID") + nextId = self._getNextID() + if (nextId == -1): + self.log.info("add|JobQueue is full") + return -1 + job.setId(nextId) + self.log.debug("addDead|Gotten next ID: " + str(job.id)) + self.log.info("addDead|Unassigning job %s" % str(job.id)) job.makeUnassigned() job.retries = 0 @@ -124,32 +193,13 @@ def addDead(self, job): self.queueLock.acquire() self.log.debug("addDead|Acquired lock to job queue.") + # We add the job into the dead jobs dictionary self.deadJobs.set(job.id, job) self.queueLock.release() self.log.debug("addDead|Released lock to job queue.") return job.id - def remove(self, id): - """remove - Remove job from live queue - """ - status = -1 - self.log.debug("remove|Acquiring lock to job queue.") - self.queueLock.acquire() - self.log.debug("remove|Acquired lock to job queue.") - if str(id) in self.liveJobs.keys(): - self.liveJobs.delete(id) - status = 0 - - self.queueLock.release() - self.log.debug("remove|Relased lock to job queue.") - - if status == 0: - self.log.debug("Removed job %s from queue" % id) - else: - self.log.error("Job %s not found in queue" % id) - return status - def delJob(self, id, deadjob): """ delJob - Implements delJob() interface call @param id - The id of the job to remove @@ -163,7 +213,7 @@ def delJob(self, id, deadjob): status = -1 self.queueLock.acquire() self.log.debug("delJob| Acquired lock to job queue.") - if str(id) in self.deadJobs.keys(): + if id in self.deadJobs: self.deadJobs.delete(id) status = 0 self.queueLock.release() @@ -181,56 +231,22 @@ def get(self, id): """ self.queueLock.acquire() self.log.debug("get| Acquired lock to job queue.") - if str(id) in self.liveJobs.keys(): - job = self.liveJobs.get(id) - else: - job = None + job = self.liveJobs.get(id) self.queueLock.release() self.log.debug("get| Released lock to job queue.") return job - def getNextPendingJob(self): - """getNextPendingJob - Returns ID of next pending job from queue. - Called by JobManager when Config.REUSE_VMS==False - """ - self.queueLock.acquire() - for id, job in self.liveJobs.items(): - if job.isNotAssigned(): - self.queueLock.release() - return id - self.queueLock.release() - return None - - def getNextPendingJobReuse(self, target_id=None): - """getNextPendingJobReuse - Returns ID of next pending job and its VM. - Called by JobManager when Config.REUSE_VMS==True - """ - self.queueLock.acquire() - for id, job in self.liveJobs.items(): - # if target_id is set, only interested in this id - if target_id and target_id != id: - continue - # Create a pool if necessary - if self.preallocator.poolSize(job.vm.name) == 0: - self.preallocator.update(job.vm, Config.POOL_SIZE) - - # If the job hasn't been assigned to a worker yet, see if there - # is a free VM - if (job.isNotAssigned()): - vm = self.preallocator.allocVM(job.vm.name) - if vm: - self.queueLock.release() - return (id, vm) - - self.queueLock.release() - return (None, None) - def assignJob(self, jobId): """ assignJob - marks a job to be assigned """ self.queueLock.acquire() self.log.debug("assignJob| Acquired lock to job queue.") + job = self.liveJobs.get(jobId) + + # Remove the current job from the queue + self.unassignedJobs.remove(int(jobId)) + self.log.debug("assignJob| Retrieved job.") self.log.info("assignJob|Assigning job ID: %s" % str(job.id)) job.makeAssigned() @@ -240,11 +256,18 @@ def assignJob(self, jobId): self.log.debug("assignJob| Released lock to job queue.") def unassignJob(self, jobId): - """ assignJob - marks a job to be unassigned - """ + """ unassignJob - marks a job to be unassigned + Note: We assume here that a job is to be rescheduled or + 'retried' when you unassign it. This retry is done by + the worker. + """ self.queueLock.acquire() self.log.debug("unassignJob| Acquired lock to job queue.") + + # Get the current job job = self.liveJobs.get(jobId) + + # Increment the number of retires if job.retries is None: job.retries = 0 else: @@ -253,6 +276,12 @@ def unassignJob(self, jobId): self.log.info("unassignJob|Unassigning job %s" % str(job.id)) job.makeUnassigned() + + # Since the assumption is that the job is being retried, + # we simply add the job to the unassigned jobs queue without + # removing anything from it + self.unassignedJobs.put(int(jobId)) + self.queueLock.release() self.log.debug("unassignJob| Released lock to job queue.") @@ -263,14 +292,22 @@ def makeDead(self, id, reason): self.queueLock.acquire() self.log.debug("makeDead| Acquired lock to job queue.") status = -1 - if str(id) in self.liveJobs.keys(): - self.log.info("makeDead| Found job ID: %d in the live queue" % (id)) + # Check to make sure that the job is in the live jobs queue + if id in self.liveJobs: + self.log.info("makeDead| Found job ID: %s in the live queue" % (id)) status = 0 job = self.liveJobs.get(id) - self.log.info("Terminated job %s:%d: %s" % + self.log.info("Terminated job %s:%s: %s" % (job.name, job.id, reason)) - self.deadJobs.set(id, job) + + # Add the job to the dead jobs dictionary + self.deadJobs.set(id, job) + # Remove the job from the live jobs dictionary self.liveJobs.delete(id) + + # Remove the job from the unassigned live jobs queue + self.unassignedJobs.remove(int(id)) + job.appendTrace("%s|%s" % (datetime.utcnow().ctime(), reason)) self.queueLock.release() self.log.debug("makeDead| Released lock to job queue.") @@ -281,9 +318,60 @@ def getInfo(self): info = {} info['size'] = len(self.liveJobs.keys()) info['size_deadjobs'] = len(self.deadJobs.keys()) + info['size_unassignedjobs'] = self.unassignedJobs.qsize() return info def reset(self): + """ reset - resets and clears all the internal dictionaries + and queues + """ self.liveJobs._clean() self.deadJobs._clean() + self.unassignedJobs._clean() + + + def getNextPendingJob(self): + """Gets the next unassigned live job. Note that this is a + blocking function and we will block till there is an available + job. + """ + # Blocks till the next item is added + id = self.unassignedJobs.get() + + self.log.debug("_getNextPendingJob|Acquiring lock to job queue.") + self.queueLock.acquire() + self.log.debug("_getNextPendingJob|Acquired lock to job queue.") + + # Get the corresponding job + job = self.liveJobs.get(id) + if job is None: + raise Exception("Cannot find unassigned job in live jobs") + + self.log.debug("getNextPendingJob| Releasing lock to job queue.") + self.queueLock.release() + self.log.debug("getNextPendingJob| Released lock to job queue.") + return job + + def reuseVM(self, job): + """Helps a job reuse a vm. This is called if CONFIG.REUSE_VM is + set to true. + """ + + # Create a pool if necessary + # This is when there is no existing pool for the vm name required. + if self.preallocator.poolSize(job.vm.name) == 0: + self.preallocator.update(job.vm, Config.POOL_SIZE) + + # If the job hasn't been assigned to a worker yet, we try to + # allocate a new vm for this job + if (job.isNotAssigned()): + # Note: This could return None, when all VMs are being used + return self.preallocator.allocVM(job.vm.name) + else: + # In the case where a job is already assigned, it should have + # a vm, and we just return that vm here + if job.vm: + return job.vm + else: + raise Exception("Job assigned without vm") diff --git a/preallocator.py b/preallocator.py index 8eb3f190..fc9f25ec 100644 --- a/preallocator.py +++ b/preallocator.py @@ -31,7 +31,7 @@ def __init__(self, vmms): def poolSize(self, vmName): """ poolSize - returns the size of the vmName pool, for external callers """ - if vmName not in self.machines.keys(): + if vmName not in self.machines: return 0 else: return len(self.machines.get(vmName)[0]) @@ -46,7 +46,7 @@ def update(self, vm, num): of machines as necessary. """ self.lock.acquire() - if vm.name not in self.machines.keys(): + if vm.name not in self.machines: self.machines.set(vm.name, [[], TangoQueue(vm.name)]) self.log.debug("Creating empty pool of %s instances" % (vm.name)) self.lock.release() @@ -72,7 +72,7 @@ def allocVM(self, vmName): """ allocVM - Allocate a VM from the free list """ vm = None - if vmName in self.machines.keys(): + if vmName in self.machines: self.lock.acquire() if not self.machines.get(vmName)[1].empty(): @@ -203,7 +203,7 @@ def destroyVM(self, vmName, id): this function when the system is queiscent (pool size == free size) """ - if vmName not in self.machines.keys(): + if vmName not in self.machines: return -1 dieVM = None @@ -228,7 +228,7 @@ def destroyVM(self, vmName, id): def getAllPools(self): result = {} - for vmName in self.machines.keys(): + for vmName in self.machines: result[vmName] = self.getPool(vmName) return result @@ -236,7 +236,7 @@ def getPool(self, vmName): """ getPool - returns the members of a pool and its free list """ result = {} - if vmName not in self.machines.keys(): + if vmName not in self.machines: return result result["total"] = [] diff --git a/tangoObjects.py b/tangoObjects.py index ec389a3e..288aa6d9 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -9,7 +9,7 @@ from builtins import str import redis import pickle -import queue +from queue import Queue from config import Config redisConnection = None @@ -203,9 +203,19 @@ def TangoQueue(object_name): if Config.USE_REDIS: return TangoRemoteQueue(object_name) else: - return queue.Queue() + return ExtendedQueue() +class ExtendedQueue(Queue): + """ Python Thread safe Queue with the remove and clean function added """ + + def remove(self, value): + with self.mutex: + self.queue.remove(value) + def _clean(self): + with self.mutex: + self.queue.clear() + class TangoRemoteQueue(object): """Simple Queue with Redis Backend""" @@ -238,8 +248,11 @@ def get(self, block=True, timeout=None): else: item = self.__db.lpop(self.key) - # if item: - # item = item[1] + if item is None: + return None + + if block and item: + item = item[1] item = pickle.loads(item) return item @@ -257,6 +270,13 @@ def __setstate__(self, dict): self.__db = getRedisConnection() self.__dict__.update(dict) + def remove(self, item): + items = self.__db.lrange(self.key, 0, -1) + pickled_item = pickle.dumps(item) + return self.__db.lrem(self.key, 0, pickled_item) + + def _clean(self): + self.__db.delete(self.key) # This is an abstract class that decides on # if we should initiate a TangoRemoteDictionary or TangoNativeDictionary @@ -274,6 +294,9 @@ def __init__(self, object_name): self.r = getRedisConnection() self.hash_name = object_name + def __contains__(self, id): + return self.r.hexists(self.hash_name, str(id)) + def set(self, id, obj): pickled_obj = pickle.dumps(obj) @@ -284,7 +307,7 @@ def set(self, id, obj): return str(id) def get(self, id): - if self.r.hexists(self.hash_name, str(id)): + if id in self: unpickled_obj = self.r.hget(self.hash_name, str(id)) obj = pickle.loads(unpickled_obj) return obj @@ -319,11 +342,14 @@ class TangoNativeDictionary(object): def __init__(self): self.dict = {} + def __contains__(self, id): + return str(id) in self.dict + def set(self, id, obj): self.dict[str(id)] = obj def get(self, id): - if str(id) in self.dict: + if id in self: return self.dict[str(id)] else: return None diff --git a/tests/testJobQueue.py b/tests/testJobQueue.py index 6bd65266..fed20a6f 100644 --- a/tests/testJobQueue.py +++ b/tests/testJobQueue.py @@ -66,27 +66,25 @@ def test_add(self): info = self.jobQueue.getInfo() self.assertEqual(info['size'], 2) - def test_addDead(self): - return self.assertEqual(1, 1) - - def test_remove(self): - self.jobQueue.remove(self.jobId1) + def test_addToUnassigned(self): info = self.jobQueue.getInfo() - self.assertEqual(info['size'], 1) + self.assertEqual(info['size_unassignedjobs'], 2) - self.jobQueue.remove(self.jobId2) - info = self.jobQueue.getInfo() - self.assertEqual(info['size'], 0) + def test_addDead(self): + return self.assertEqual(1, 1) def test_delJob(self): self.jobQueue.delJob(self.jobId1, 0) info = self.jobQueue.getInfo() self.assertEqual(info['size'], 1) self.assertEqual(info['size_deadjobs'], 1) + self.assertEqual(info['size_unassignedjobs'], 1) self.jobQueue.delJob(self.jobId1, 1) info = self.jobQueue.getInfo() self.assertEqual(info['size_deadjobs'], 0) + self.assertEqual(info['size'], 1) + self.assertEqual(info['size_unassignedjobs'], 1) return False @@ -99,12 +97,24 @@ def test_get(self): def test_getNextPendingJob(self): self.jobQueue.assignJob(self.jobId2) + # job 2 should have been removed from unassigned queue + info = self.jobQueue.getInfo() + self.assertEqual(info['size_unassignedjobs'], 1) + self.jobQueue.assignJob(self.jobId1) + info = self.jobQueue.getInfo() + self.assertEqual(info['size_unassignedjobs'], 0) self.jobQueue.unassignJob(self.jobId1) - exp_id = self.jobQueue.getNextPendingJob() - self.assertMultiLineEqual(exp_id, self.jobId1) + info = self.jobQueue.getInfo() + self.assertEqual(info['size_unassignedjobs'], 1) + job = self.jobQueue.getNextPendingJob() + self.assertMultiLineEqual(str(job.id), self.jobId1) + + def test_getNextPendingJob2(self): + job = self.jobQueue.getNextPendingJob() + self.assertMultiLineEqual(str(job.id), self.jobId1) + job = self.jobQueue.getNextPendingJob() + self.assertMultiLineEqual(str(job.id), self.jobId2) - def test_getNextPendingJobReuse(self): - return False def test_assignJob(self): self.jobQueue.assignJob(self.jobId1) @@ -123,17 +133,17 @@ def test_unassignJob(self): def test_makeDead(self): info = self.jobQueue.getInfo() self.assertEqual(info['size_deadjobs'], 0) + self.assertEqual(info['size_unassignedjobs'], 2) self.jobQueue.makeDead(self.jobId1, "test") info = self.jobQueue.getInfo() self.assertEqual(info['size_deadjobs'], 1) + self.assertEqual(info['size_unassignedjobs'], 1) def test__getNextID(self): - init_id = self.jobQueue.nextID for i in range(1, Config.MAX_JOBID + 100): id = self.jobQueue._getNextID() self.assertNotEqual(str(id), self.jobId1) - self.jobQueue.nextID = init_id if __name__ == '__main__': diff --git a/tests/testObjects.py b/tests/testObjects.py new file mode 100644 index 00000000..4f40f3c6 --- /dev/null +++ b/tests/testObjects.py @@ -0,0 +1,126 @@ +from __future__ import print_function +from builtins import range +from builtins import str +import unittest +import redis + +from tangoObjects import TangoDictionary, TangoJob, TangoQueue +import config + + +class TestDictionary(unittest.TestCase): + + def setUp(self): + if config.Config.USE_REDIS: + __db = redis.StrictRedis( + config.Config.REDIS_HOSTNAME, config.Config.REDIS_PORT, db=0) + __db.flushall() + + self.test_entries = { + "key": "value", + 0: "0_value", + 123: 456, + } + + def runDictionaryTests(self): + test_dict = TangoDictionary("test") + self.assertEqual(test_dict.keys(), []) + self.assertEqual(test_dict.values(), []) + + for key in self.test_entries: + test_dict.set(key, self.test_entries[key]) + + for key in self.test_entries: + self.assertTrue(key in test_dict) + self.assertEqual(test_dict.get(key), self.test_entries[key]) + + for (key, val) in test_dict.items(): + self.assertEqual(self.test_entries.get(key), val) + + self.assertEqual(test_dict.keys(), [str(key) for key in self.test_entries.keys()]) + self.assertEqual(test_dict.values(), list(self.test_entries.values())) + self.assertTrue("key_not_present" not in test_dict) + self.assertEqual(test_dict.get("key_not_present"), None) + + test_dict.set("key", "new_value") + self.assertEqual(test_dict.get("key"), "new_value") + + test_dict.delete("key") + self.assertTrue("key" not in test_dict) + + def test_nativeDictionary(self): + config.Config.USE_REDIS = False + self.runDictionaryTests() + + def test_remoteDictionary(self): + config.Config.USE_REDIS = True + self.runDictionaryTests() + + +class TestQueue(unittest.TestCase): + def setUp(self): + if config.Config.USE_REDIS: + __db = redis.StrictRedis( + config.Config.REDIS_HOSTNAME, config.Config.REDIS_PORT, db=0) + __db.flushall() + self.test_entries = [i for i in range(10)] + + def addAllToQueue(self): + # Add all items into the queue + for x in self.test_entries: + self.testQueue.put(x) + self.expectedSize += 1 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + + def runQueueTests(self): + self.testQueue = TangoQueue("self.testQueue") + self.expectedSize = 0 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + self.assertTrue(self.testQueue.empty()) + + self.addAllToQueue() + + # Test the blocking get + for x in self.test_entries: + item = self.testQueue.get() + self.expectedSize -= 1 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + self.assertEqual(item, x) + + self.addAllToQueue() + + # Test the blocking get + for x in self.test_entries: + item = self.testQueue.get_nowait() + self.expectedSize -= 1 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + self.assertEqual(item, x) + + self.addAllToQueue() + + # Remove all the even entries + for x in self.test_entries: + if (x % 2 == 0): + self.testQueue.remove(x) + self.expectedSize -= 1 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + + # Test that get only returns odd keys in order + for x in self.test_entries: + if (x % 2 == 1): + item = self.testQueue.get_nowait() + self.expectedSize -= 1 + self.assertEqual(self.testQueue.qsize(), self.expectedSize) + self.assertEqual(item, x) + + def test_nativeQueue(self): + config.Config.USE_REDIS = False + self.runQueueTests() + + def test_remoteQueue(self): + config.Config.USE_REDIS = True + self.runQueueTests() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/testPreallocator.py b/tests/testPreallocator.py index ffca5562..9e803c91 100644 --- a/tests/testPreallocator.py +++ b/tests/testPreallocator.py @@ -1,51 +1,155 @@ import unittest +import random +import redis from preallocator import * from config import Config - +from tangoObjects import TangoMachine class TestPreallocator(unittest.TestCase): + def createTangoMachine(self, image, vmms, + vmObj={'cores': 1, 'memory': 512}): + """ createTangoMachine - Creates a tango machine object from image + """ + return TangoMachine( + name=image, + vmms=vmms, + image="%s" % (image), + cores=vmObj["cores"], + memory=vmObj["memory"], + disk=None, + network=None) + def setUp(self): - return False + # Add more machine types to test here in future + self.testMachines = ["localDocker"] + + def createVM(self): + if Config.USE_REDIS: + __db = redis.StrictRedis( + Config.REDIS_HOSTNAME, Config.REDIS_PORT, db=0) + __db.flushall() + + if Config.VMMS_NAME == "ec2SSH": + from vmms.ec2SSH import Ec2SSH + vmms = Ec2SSH() + self.preallocator = Preallocator({"ec2SSH": vmms}) + + elif Config.VMMS_NAME == "localDocker": + from vmms.localDocker import LocalDocker + vmms = LocalDocker() + self.preallocator = Preallocator({"localDocker": vmms}) + + elif Config.VMMS_NAME == "distDocker": + from vmms.distDocker import DistDocker + vmms = DistDocker() + self.preallocator = Preallocator({"distDocker": vmms}) + else: + vmms = None + self.preallocator = Preallocator({"default": vmms}) + self.vm = self.createTangoMachine(image="autograding_image", vmms=Config.VMMS_NAME) def test_poolSize(self): - return False + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + # VM with empty pool + self.assertEqual(self.preallocator.poolSize(self.vm.name), 0) + + # VM post pool update + self.preallocator.update(self.vm, 5) + self.assertEqual(self.preallocator.poolSize(self.vm.name), 5) def test_update(self): - return False - - def test_allocVM(self): - return False - - def test_freeVM(self): - return False + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() - def test_addVM(self): - return False + # Addition of machines (delta > 0) + self.preallocator.update(self.vm, 10) + self.assertEqual(self.preallocator.poolSize(self.vm.name), 10) - def test_removeVM(self): - return False + # Deletion of machines (delta < 0) + self.preallocator.update(self.vm, 5) + self.assertEqual(self.preallocator.poolSize(self.vm.name), 5) - def test__getNextID(self): - return False + def test_allocVM(self): + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + + # No machines to allocate in pool + self.preallocator.update(self.vm, 0) + vm = self.preallocator.allocVM(self.vm.name) + self.assertIsNone(vm) + + # Regular behavior + self.preallocator.update(self.vm, 5) + vm = self.preallocator.allocVM(self.vm.name) + self.assertIsNotNone(vm) - def test___create(self): - return False + def test_freeVM(self): + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + # Allocating single, free machine + self.preallocator.update(self.vm, 1) + vm = self.preallocator.allocVM(self.vm.name) + self.preallocator.freeVM(vm) + free = self.preallocator.getPool(self.vm.name)['free'] + self.assertFalse(free == []) + + # Revert pool for other tests + self.preallocator.update(self.vm, 5) + + def test_getNextID(self): + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + + # Obtain valid machine id during creation/update + idx = self.preallocator._getNextID() + self.assertGreaterEqual(idx, 1000) + self.assertLessEqual(idx, 9999) + + def test_createVMPool(self): + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + + # Create single VM + self.preallocator.update(self.vm, 1) + allPools = self.preallocator.getAllPools() + self.assertIn(self.vm.name, allPools.keys()) - def test___destroy(self): - return False + def test_destroyVM(self): + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() - def test_createVM(self): - return False + # Destroy non existent VM + res = self.preallocator.destroyVM("nonExistent", 1001) + self.assertEqual(res, -1) - def test_destroyVM(self): - return False + # Destroy existent VM + self.preallocator.update(self.vm, 1) + prevPool = self.preallocator.getPool(self.vm.name) + rand = random.choice(prevPool['total']) + res = self.preallocator.destroyVM(self.vm.name, rand) + self.assertEqual(res, 0) def test_getPool(self): - return False - + for machine in self.testMachines: + Config.VMMS_NAME = machine + self.createVM() + + # Empty pool + self.preallocator.update(self.vm, 0) + pool = self.preallocator.getPool(self.vm.name) + self.assertEqual(pool["total"], []) + self.assertEqual(pool["free"], []) if __name__ == '__main__': unittest.main() diff --git a/vmms/Dockerfile b/vmms/Dockerfile index 649e9dea..eca9b547 100644 --- a/vmms/Dockerfile +++ b/vmms/Dockerfile @@ -1,12 +1,15 @@ # Autolab - autograding docker image -FROM ubuntu:14.04 -MAINTAINER Mihir Pandya +FROM ubuntu:18.04 +MAINTAINER Autolab Team -RUN apt-get update --fix-missing -RUN apt-get install -y gcc -RUN apt-get install -y make -RUN apt-get install -y build-essential +RUN apt-get update && apt-get install -y \ + build-essential \ + gcc \ + git \ + make \ + sudo \ + && rm -rf /var/lib/apt/lists/* # Install autodriver WORKDIR /home @@ -16,8 +19,7 @@ RUN mkdir autolab autograde output RUN chown autolab:autolab autolab RUN chown autolab:autolab output RUN chown autograde:autograde autograde -RUN apt-get install -y git -RUN git clone https://github.com/autolab/Tango.git +RUN git clone --depth 1 https://github.com/autolab/Tango.git WORKDIR Tango/autodriver RUN make clean && make RUN cp autodriver /usr/bin/autodriver @@ -25,9 +27,7 @@ RUN chmod +s /usr/bin/autodriver # Clean up WORKDIR /home -RUN apt-get remove -y git -RUN apt-get -y autoremove -RUN rm -rf Tango/ +RUN apt-get remove -y git && apt-get -y autoremove && rm -rf Tango/ # Check installation RUN ls -l /home diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 1c89f5c9..0f7563ff 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -92,6 +92,11 @@ def getVolumePath(self, instanceName): volumePath = os.path.join(volumePath, instanceName, "") return volumePath + def getDockerVolumePath(self, dockerPath, instanceName): + # Last empty string to cause trailing '/' + volumePath = os.path.join(dockerPath, instanceName, "") + return volumePath + def domainName(self, vm): """ Returns the domain name that is stored in the vm instance. @@ -121,6 +126,9 @@ def copyIn(self, vm, inputFiles): # Create a fresh volume os.makedirs(volumePath) for file in inputFiles: + # Create output directory if it does not exist + os.makedirs(os.path.dirname(volumePath), exist_ok=True) + shutil.copy(file.localFile, volumePath + file.destFile) self.log.debug('Copied in file %s to %s' % (file.localFile, volumePath + file.destFile)) return 0 @@ -134,6 +142,9 @@ def runJob(self, vm, runTimeout, maxOutputFileSize): """ instanceName = self.instanceName(vm.id, vm.image) volumePath = self.getVolumePath(instanceName) + if os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"): + volumePath = self.getDockerVolumePath( + os.getenv("DOCKER_TANGO_HOST_VOLUME_PATH"), instanceName) args = ['docker', 'run', '--name', instanceName, '-v'] args = args + ['%s:%s' % (volumePath, '/home/mount')] args = args + [vm.image] @@ -220,7 +231,7 @@ def existsVM(self, vm): """ instanceName = self.instanceName(vm.id, vm.name) ret = timeout(['docker', 'inspect', instanceName]) - return (ret is 0) + return ret == 0 def getImages(self): """ getImages - Executes `docker images` and returns a list of