Wednesday, July 8, 2009

Thread Manager

Last update Jun 10

import thread, threading

class _WorkerThread(threading.Thread):
    def __init__(self, sem, job, *args, **kargs):
        self._workSem = sem
        super(_WorkerThread, self).__init__()
        self.job = job
        self.args = args
        self.kargs = kargs
        self.isCanceled = False
        self.hasStarted = False
        self._cancelLock = threading.Lock()

    def run(self):
        if self._workSem:
            with self._workSem:
                with self._cancelLock:
                    if self.isCanceled:
                        return
                    self.hasStarted = True
                self.job(*self.args, **self.kargs)
        else:
            self.job(*self.args, **self.kargs)

class JobManager(object):
    def __init__(self, maxnumthreads = 1):
        """Set maxnumthreads to specify the max number of threads which runs concurrently."""
        self._workers = {}
        self._unmanagedWorkers = []
        self._workSem = threading.Semaphore(maxnumthreads)
        self.maxnumthreads = maxnumthreads
        self._lock = threading.RLock()
        self._nextJobIdCounter = 1

    def postJob(self, job, *args, **kargs):
        """job must be a callable, args and kargs are arguments passed to it."""
        self._gc()
        wt = _WorkerThread(self._workSem, job, *args, **kargs)
        wt.start()
        id = self._nextJobIdCounter
        self._workers[id] = wt
        self._nextJobIdCounter += 1
        return id

    def cancelJob(self, jobid):
        """Cancel a posted job. jobid must be an object returned by postJob().
        It returns True if the job gets canceled, False it it has started."""
        self._gc()
        worker = self._workers.get(jobid, None)
        if not worker: #It doesn't exist because it finished execution and removed from the _workers
            return False
        with worker._cancelLock:
            worker.isCanceled = True
            return not worker.hasStarted

    def waitOnIdle(self):
        """Blocks until the every worker thread terminates."""
        self._gc()
        while self._workers or self._unmanagedWorkers:
            wt = (self._workers.values() + self._unmanagedWorkers).pop()
            wt.join()
            self._gc()

    def getNumWaitingJobs(self):
        """Returns the number of jobs waiting. It includes threads currently running."""
        self._gc()
        return len(self._workers)

    def forceExecuteOnWorkerThread(self, job, *args, **kargs):
        """Execute the job immediately on a thread. It is not queued."""
        wt = _WorkerThread(None, job, *args, **kargs)
        wt.start()
        self._unmanagedWorkers.append(wt)

    def executeWhenNoWorkerThreadsRunning(self, job, *args, **kargs):
        """The calling thread execute the job (callable), ensuring no worker threads running.
        It blocks when a thread is running.It DOESN'T mean job is executed after the every
        worker threads has been terminated.
        (Though it looks the current Python implementation awakens a thread which called acquire()
        earlier.)
        It doesn't take jobs launched by forceExecuteOnWorkerThread() into account."""
        for i in range(self.maxnumthreads):
            self._workSem.acquire()
        try:
            job(*args, **kargs)
        finally:
            for i in range(self.maxnumthreads):
                self._workSem.release()

    def _gc(self):
        with self._lock:
            self._workers = dict([w for w in self._workers.items() if w[1].isAlive()])
            self._unmanagedWorkers = [wt for wt in self._unmanagedWorkers if wt.isAlive()]


if __name__ == '__main__':
    import time
    def somejob(i, wait = 0.1):
        time.sleep(wait)
        print 'somejob', i
        time.sleep(1)
    def endMessage(msg):
        print msg

    jm = JobManager()
    jobIds = []
    for i in range(5):
        id = jm.postJob(somejob, i + 1)
        jobIds.append(id)
        print 'posted', id
    time.sleep(2)
    print "cancel", jobIds[0], jm.cancelJob(jobIds[0])
    print "cancel", jobIds[-1], jm.cancelJob(jobIds[-1])
    #print jm.getNumWaitingJobs()
    jm.forceExecuteOnWorkerThread(somejob, 'unmanaged1', 4)
    jm.waitOnIdle() #It waits for unmanaged1 termination
    jm.forceExecuteOnWorkerThread(somejob, 'unmanaged2')
    jm.executeWhenNoWorkerThreadsRunning(endMessage, 'done') #It doesn't wait for unmanaged2

No comments: