Last update Jun 10
import thread, threading class _WorkerThread(threading.Thread): def __init__(self, sem, job, *args, **kargs): self._workSem = sem super(_WorkerThread, self).__init__() self.job = job self.args = args self.kargs = kargs self.isCanceled = False self.hasStarted = False self._cancelLock = threading.Lock() def run(self): if self._workSem: with self._workSem: with self._cancelLock: if self.isCanceled: return self.hasStarted = True self.job(*self.args, **self.kargs) else: self.job(*self.args, **self.kargs) class JobManager(object): def __init__(self, maxnumthreads = 1): """Set maxnumthreads to specify the max number of threads which runs concurrently.""" self._workers = {} self._unmanagedWorkers = [] self._workSem = threading.Semaphore(maxnumthreads) self.maxnumthreads = maxnumthreads self._lock = threading.RLock() self._nextJobIdCounter = 1 def postJob(self, job, *args, **kargs): """job must be a callable, args and kargs are arguments passed to it.""" self._gc() wt = _WorkerThread(self._workSem, job, *args, **kargs) wt.start() id = self._nextJobIdCounter self._workers[id] = wt self._nextJobIdCounter += 1 return id def cancelJob(self, jobid): """Cancel a posted job. jobid must be an object returned by postJob(). It returns True if the job gets canceled, False it it has started.""" self._gc() worker = self._workers.get(jobid, None) if not worker: #It doesn't exist because it finished execution and removed from the _workers return False with worker._cancelLock: worker.isCanceled = True return not worker.hasStarted def waitOnIdle(self): """Blocks until the every worker thread terminates.""" self._gc() while self._workers or self._unmanagedWorkers: wt = (self._workers.values() + self._unmanagedWorkers).pop() wt.join() self._gc() def getNumWaitingJobs(self): """Returns the number of jobs waiting. It includes threads currently running.""" self._gc() return len(self._workers) def forceExecuteOnWorkerThread(self, job, *args, **kargs): """Execute the job immediately on a thread. It is not queued.""" wt = _WorkerThread(None, job, *args, **kargs) wt.start() self._unmanagedWorkers.append(wt) def executeWhenNoWorkerThreadsRunning(self, job, *args, **kargs): """The calling thread execute the job (callable), ensuring no worker threads running. It blocks when a thread is running.It DOESN'T mean job is executed after the every worker threads has been terminated. (Though it looks the current Python implementation awakens a thread which called acquire() earlier.) It doesn't take jobs launched by forceExecuteOnWorkerThread() into account.""" for i in range(self.maxnumthreads): self._workSem.acquire() try: job(*args, **kargs) finally: for i in range(self.maxnumthreads): self._workSem.release() def _gc(self): with self._lock: self._workers = dict([w for w in self._workers.items() if w[1].isAlive()]) self._unmanagedWorkers = [wt for wt in self._unmanagedWorkers if wt.isAlive()] if __name__ == '__main__': import time def somejob(i, wait = 0.1): time.sleep(wait) print 'somejob', i time.sleep(1) def endMessage(msg): print msg jm = JobManager() jobIds = [] for i in range(5): id = jm.postJob(somejob, i + 1) jobIds.append(id) print 'posted', id time.sleep(2) print "cancel", jobIds[0], jm.cancelJob(jobIds[0]) print "cancel", jobIds[-1], jm.cancelJob(jobIds[-1]) #print jm.getNumWaitingJobs() jm.forceExecuteOnWorkerThread(somejob, 'unmanaged1', 4) jm.waitOnIdle() #It waits for unmanaged1 termination jm.forceExecuteOnWorkerThread(somejob, 'unmanaged2') jm.executeWhenNoWorkerThreadsRunning(endMessage, 'done') #It doesn't wait for unmanaged2
No comments:
Post a Comment