Last update Jun 10
import thread, threading
class _WorkerThread(threading.Thread):
def __init__(self, sem, job, *args, **kargs):
self._workSem = sem
super(_WorkerThread, self).__init__()
self.job = job
self.args = args
self.kargs = kargs
self.isCanceled = False
self.hasStarted = False
self._cancelLock = threading.Lock()
def run(self):
if self._workSem:
with self._workSem:
with self._cancelLock:
if self.isCanceled:
return
self.hasStarted = True
self.job(*self.args, **self.kargs)
else:
self.job(*self.args, **self.kargs)
class JobManager(object):
def __init__(self, maxnumthreads = 1):
"""Set maxnumthreads to specify the max number of threads which runs concurrently."""
self._workers = {}
self._unmanagedWorkers = []
self._workSem = threading.Semaphore(maxnumthreads)
self.maxnumthreads = maxnumthreads
self._lock = threading.RLock()
self._nextJobIdCounter = 1
def postJob(self, job, *args, **kargs):
"""job must be a callable, args and kargs are arguments passed to it."""
self._gc()
wt = _WorkerThread(self._workSem, job, *args, **kargs)
wt.start()
id = self._nextJobIdCounter
self._workers[id] = wt
self._nextJobIdCounter += 1
return id
def cancelJob(self, jobid):
"""Cancel a posted job. jobid must be an object returned by postJob().
It returns True if the job gets canceled, False it it has started."""
self._gc()
worker = self._workers.get(jobid, None)
if not worker: #It doesn't exist because it finished execution and removed from the _workers
return False
with worker._cancelLock:
worker.isCanceled = True
return not worker.hasStarted
def waitOnIdle(self):
"""Blocks until the every worker thread terminates."""
self._gc()
while self._workers or self._unmanagedWorkers:
wt = (self._workers.values() + self._unmanagedWorkers).pop()
wt.join()
self._gc()
def getNumWaitingJobs(self):
"""Returns the number of jobs waiting. It includes threads currently running."""
self._gc()
return len(self._workers)
def forceExecuteOnWorkerThread(self, job, *args, **kargs):
"""Execute the job immediately on a thread. It is not queued."""
wt = _WorkerThread(None, job, *args, **kargs)
wt.start()
self._unmanagedWorkers.append(wt)
def executeWhenNoWorkerThreadsRunning(self, job, *args, **kargs):
"""The calling thread execute the job (callable), ensuring no worker threads running.
It blocks when a thread is running.It DOESN'T mean job is executed after the every
worker threads has been terminated.
(Though it looks the current Python implementation awakens a thread which called acquire()
earlier.)
It doesn't take jobs launched by forceExecuteOnWorkerThread() into account."""
for i in range(self.maxnumthreads):
self._workSem.acquire()
try:
job(*args, **kargs)
finally:
for i in range(self.maxnumthreads):
self._workSem.release()
def _gc(self):
with self._lock:
self._workers = dict([w for w in self._workers.items() if w[1].isAlive()])
self._unmanagedWorkers = [wt for wt in self._unmanagedWorkers if wt.isAlive()]
if __name__ == '__main__':
import time
def somejob(i, wait = 0.1):
time.sleep(wait)
print 'somejob', i
time.sleep(1)
def endMessage(msg):
print msg
jm = JobManager()
jobIds = []
for i in range(5):
id = jm.postJob(somejob, i + 1)
jobIds.append(id)
print 'posted', id
time.sleep(2)
print "cancel", jobIds[0], jm.cancelJob(jobIds[0])
print "cancel", jobIds[-1], jm.cancelJob(jobIds[-1])
#print jm.getNumWaitingJobs()
jm.forceExecuteOnWorkerThread(somejob, 'unmanaged1', 4)
jm.waitOnIdle() #It waits for unmanaged1 termination
jm.forceExecuteOnWorkerThread(somejob, 'unmanaged2')
jm.executeWhenNoWorkerThreadsRunning(endMessage, 'done') #It doesn't wait for unmanaged2
No comments:
Post a Comment