Skip to content
Closed

Jojo #188

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 66 additions & 62 deletions jobManager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from __future__ import print_function
import copy
import time
import logging
import threading
from config import Config
from tangoObjects import TangoQueue
from worker import Worker
from preallocator import Preallocator
from jobQueue import JobQueue
from tango import *
from datetime import datetime
from builtins import str
#
# JobManager - Thread that assigns jobs to worker threads
#
Expand All @@ -13,17 +25,7 @@
from builtins import object
from future import standard_library
standard_library.install_aliases()
from builtins import str
import threading, logging, time, copy

from datetime import datetime
from tango import *
from jobQueue import JobQueue
from preallocator import Preallocator
from worker import Worker

from tangoObjects import TangoQueue
from config import Config

class JobManager(object):

Expand Down Expand Up @@ -63,59 +65,61 @@ def _getNextID(self):
def __manage(self):
self.running = True
while True:
id = self.jobQueue.getNextPendingJob()

if id:
job = self.jobQueue.get(id)

# job could no longer exist if it was completed by someone else
if job == None:
continue

if not job.accessKey and Config.REUSE_VMS:
id, vm = self.jobQueue.getNextPendingJobReuse(id)
job = self.jobQueue.get(id)
if job == None:
continue

try:
# Mark the job assigned
self.jobQueue.assignJob(job.id)
# if the job has specified an account
# create an VM on the account and run on that instance
if job.accessKeyId:
from vmms.ec2SSH import Ec2SSH
vmms = Ec2SSH(job.accessKeyId, job.accessKey)
newVM = copy.deepcopy(job.vm)
newVM.id = self._getNextID()
preVM = vmms.initializeVM(newVM)
# Gets the next pending job/ id
# NOTE: This grabs and acquires the lock on the queue
# If there is no pending job, job and id is None
job, _ = self.jobQueue.getNextPendingJobFromQueue()
if job is None:
continue

if not job.accessKey and Config.REUSE_VMS:
vm = self.jobQueue.reuseVM(job)

try:
# Mark the job assigned
job.makeAssigned()
# if the job has specified an account
# create an VM on the account and run on that instance
if job.accessKeyId:
from vmms.ec2SSH import Ec2SSH
vmms = Ec2SSH(job.accessKeyId, job.accessKey)
newVM = copy.deepcopy(job.vm)
newVM.id = self._getNextID()
preVM = vmms.initializeVM(newVM)

else:
# Try to find a vm on the free list and allocate it to
# the worker if successful.
if Config.REUSE_VMS:
preVM = vm
else:
# Try to find a vm on the free list and allocate it to
# the worker if successful.
if Config.REUSE_VMS:
preVM = vm
else:
preVM = self.preallocator.allocVM(job.vm.name)
vmms = self.vmms[job.vm.vmms] # Create new vmms object

# Now dispatch the job to a worker
self.log.info("Dispatched job %s:%d to %s [try %d]" %
(job.name, job.id, preVM.name, job.retries))
job.appendTrace(
"%s|Dispatched job %s:%d [try %d]" %
(datetime.utcnow().ctime(), job.name, job.id, job.retries))

Worker(
job,
vmms,
self.jobQueue,
self.preallocator,
preVM
).start()

except Exception as err:
self.jobQueue.makeDead(job.id, str(err))

preVM = self.preallocator.allocVM(job.vm.name)
vmms = self.vmms[job.vm.vmms] # Create new vmms object

if job is None:
raise Exception("here1")

# Now dispatch the job to a worker
self.log.info("Dispatched job %s:%d to %s [try %d]" %
(job.name, job.id, preVM.name, job.retries))
if job is None:
raise Exception("here")

# job.appendTrace(
# "%s|Dispatched job %s:%d [try %d]" %
# (datetime.utcnow().ctime(), job.name, job.id, job.retries))

Worker(
job,
vmms,
self.jobQueue,
self.preallocator,
preVM
).start()
except Exception as err:
print("EXCEPTION")
print(str(err))

# Sleep for a bit and then check again
time.sleep(Config.DISPATCH_PERIOD)

Expand Down
111 changes: 98 additions & 13 deletions jobQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
from builtins import range
from builtins import object
from builtins import str
import threading, logging, time
import threading
import multiprocessing
import logging
import time

from datetime import datetime
from tangoObjects import TangoDictionary, TangoJob
from tangoObjects import TangoDictionary, TangoJob, TangoQueue
from config import Config

#
Expand All @@ -34,8 +37,9 @@
class JobQueue(object):

def __init__(self, preallocator):
self.liveJobs = TangoDictionary("liveJobs")
self.deadJobs = TangoDictionary("deadJobs")
self.liveJobs = TangoDictionary("liveJobs2")
self.deadJobs = TangoDictionary("deadJobs2")
self.unassignedJobs = TangoQueue("unassigneds2")
self.queueLock = threading.Lock()
self.preallocator = preallocator
self.log = logging.getLogger("JobQueue")
Expand All @@ -51,8 +55,12 @@ def _getNextID(self):
self.log.debug("_getNextID|Acquired lock to job queue.")
id = self.nextID

# If a job already exists in the queue at nextID, then try to find
# an empty ID. If the queue is full, then return -1.
# If there is an livejob in the queue with with nextID,
# this means that the id is already taken.
# We try to find a free id to use by looping through all
# the job ids possible and finding one that is
# not used by any of the livejobs.
# Return -1 if no such free id is found.
keys = self.liveJobs.keys()
if (str(id) in keys):
id = -1
Expand All @@ -61,8 +69,12 @@ def _getNextID(self):
id = i
break

if (id == -1):
# No free id found, return -1
return -1
self.nextID += 1
if self.nextID > Config.MAX_JOBID:
# Wrap around if job ids go over max job ids avail
self.nextID = 1
self.queueLock.release()
self.log.debug("_getNextID|Released lock to job queue.")
Expand Down Expand Up @@ -93,6 +105,9 @@ def add(self, job):
self.log.debug("add| Acquired lock to job queue.")

self.liveJobs.set(job.id, job)
# Add this to the unassigned job queue too
self.unassignedJobs.put(job)

job.appendTrace("%s|Added job %s:%d to queue" %
(datetime.utcnow().ctime(), job.name, job.id))

Expand All @@ -103,7 +118,7 @@ def add(self, job):
self.queueLock.release()
self.log.debug("add|Releasing lock to job queue.")

self.log.info("Added job %s:%d to queue, details = %s" %
self.log.info("Added job %s:%d to queue, details = %s" %
(job.name, job.id, str(job.__dict__)))

return str(job.id)
Expand All @@ -115,7 +130,11 @@ def addDead(self, job):
"""
if (not isinstance(job, TangoJob)):
return -1

job.setId(self._getNextID())
if (job.id == -1):
self.log.info("add|JobQueue is full")
return -1
self.log.info("addDead|Unassigning job %s" % str(job.id))
job.makeUnassigned()
job.retries = 0
Expand All @@ -130,6 +149,7 @@ def addDead(self, job):

return job.id

# TODO: remove remove since it's not used
def remove(self, id):
"""remove - Remove job from live queue
"""
Expand Down Expand Up @@ -197,9 +217,9 @@ def getNextPendingJob(self):
for id, job in self.liveJobs.items():
if job.isNotAssigned():
self.queueLock.release()
return id
return job, id
self.queueLock.release()
return None
return None, None

def getNextPendingJobReuse(self, target_id=None):
"""getNextPendingJobReuse - Returns ID of next pending job and its VM.
Expand All @@ -225,12 +245,17 @@ def getNextPendingJobReuse(self, target_id=None):
self.queueLock.release()
return (None, None)

# TODO Remove this interface
def assignJob(self, jobId):
""" assignJob - marks a job to be assigned
"""
self.queueLock.acquire()
self.log.debug("assignJob| Acquired lock to job queue.")

job = self.liveJobs.get(jobId)
# Remove from the unassigned jobs list
# self.unassignedJobs.popBack()

self.log.debug("assignJob| Retrieved job.")
self.log.info("assignJob|Assigning job ID: %s" % str(job.id))
job.makeAssigned()
Expand All @@ -239,6 +264,8 @@ def assignJob(self, jobId):
self.queueLock.release()
self.log.debug("assignJob| Released lock to job queue.")

# Note: It seems here that you also assume that a job is being
# retried when you unassign it
def unassignJob(self, jobId):
""" assignJob - marks a job to be unassigned
"""
Expand All @@ -251,6 +278,10 @@ def unassignJob(self, jobId):
job.retries += 1
Config.job_retries += 1

# Add to the unassigned jobs list if it is not already there
# if str(jobId) not in self.unassignedJobs.keys():
# self.unassignedJobs.pushBack(jobId)

self.log.info("unassignJob|Unassigning job %s" % str(job.id))
job.makeUnassigned()
self.queueLock.release()
Expand All @@ -264,26 +295,80 @@ def makeDead(self, id, reason):
self.log.debug("makeDead| Acquired lock to job queue.")
status = -1
if str(id) in self.liveJobs.keys():
self.log.info("makeDead| Found job ID: %d in the live queue" % (id))
self.log.info(
"makeDead| Found job ID: %d in the live queue" % (id))
status = 0
job = self.liveJobs.get(id)
self.log.info("Terminated job %s:%d: %s" %
(job.name, job.id, reason))
self.deadJobs.set(id, job)
self.deadJobs.set(id, job)
self.liveJobs.delete(id)
job.appendTrace("%s|%s" % (datetime.utcnow().ctime(), reason))

# If this job is in unassigned jobs, remove it from the
# unassigned jobs list too
# if str(id) in self.unassignedJobs.keys():
# self.unassignedJobs.delete(id)

self.queueLock.release()
self.log.debug("makeDead| Released lock to job queue.")
return status

def getInfo(self):

info = {}
info['size'] = len(self.liveJobs.keys())
info['size_deadjobs'] = len(self.deadJobs.keys())

return info

def reset(self):
# TODO fix this
self.liveJobs._clean()
self.deadJobs._clean()

def _getPendingJobNoLock(self):
for id, job in self.liveJobs.items():
if job.isNotAssigned():
return job, id
return None, None



def getNextPendingJobFromQueue(self):
self.log.debug("_getNextPendingJob|Acquiring lock to job queue.")
self.queueLock.acquire()
self.log.debug("_getNextPendingJob|Acquired lock to job queue.")

# if (self.unassignedJobs.empty()):
# self.queueLock.release()
# return None, None

job = self.unassignedJobs.get()
# job = self.liveJobs.get(id)
if job is None:
raise Exception("Unassigned job not found in live job queue")

self.log.debug("getNextPendingJob| Releasing lock to job queue.")
self.queueLock.release()
self.log.debug("getNextPendingJob| Released lock to job queue.")
return job, id

def reuseVM(self, job):
# Create a pool if necessary
if self.preallocator.poolSize(job.vm.name) == 0:
self.preallocator.update(job.vm, Config.POOL_SIZE)

# If the job hasn't been assigned to a worker yet, see if there
# is a free VM
if (job.isNotAssigned()):
vm = self.preallocator.allocVM(job.vm.name)
if vm:
return vm
else:
# No Vms to assign
return None
else:
if vm:
return vm
else:
raise Exception("Job assigned without vm")

5 changes: 3 additions & 2 deletions tangoObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ def get(self, block=True, timeout=None):
else:
item = self.__db.lpop(self.key)

# if item:
# item = item[1]
if block and item:
print(item[0])
item = item[1]

item = pickle.loads(item)
return item
Expand Down