Source code for pulpdist.cli.thread_pool

"""Simple size constrained thread pool that blocks when all threads are busy"""

# Based on Python recipe: http://code.activestate.com/recipes/577187/ (r9)
# Posted by Emilio Monti: http://code.activestate.com/recipes/users/4173642/
# Used under MIT License: http://www.opensource.org/licenses/mit-license.php
# Retrieved 3rd April, 2012

# Updates relative to original recipe:
#   - uses the traceback module to print a full traceback for *all* exceptions
#   - uses a PriorityQueue
#   - uses a namedtuple for Task objects (with a leading priority field)
#   - saves references to the workers from the pool
#   - customises the thread identifiers for the worker threads
#   - timeout support when waiting for task completion

from Queue import PriorityQueue
from threading import Thread
from traceback import format_exc
from collections import namedtuple
from time import time
import sys

Task = namedtuple("Task", "priority func args kwds")

[docs]class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks, name=None): Thread.__init__(self, name=name) self.tasks = tasks self.daemon = True self.start()
[docs] def run(self): while True: task = self.tasks.get() try: task.func(*task.args, **task.kwds) except: header = "Error in {0}:\n".format(self.name) tb = format_exc() sys.stderr.write(header+tb) sys.stderr.flush() finally: self.tasks.task_done()
[docs]class PendingTasks(Exception): """Exception thrown if ThreadPool.wait_for_tasks() times out"""
[docs]class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads, name="ThreadPool"): self.name=name self.tasks = PriorityQueue(num_threads) self.workers = [self._add_worker(i) for i in range(num_threads)] def _add_worker(self, index): name = "{0}-Worker-{1}".format(self.name, index) return Worker(self.tasks, name)
[docs] def add_task(self, priority, func, *args, **kwds): """Add a task to the queue. Blocks if all threads are busy.""" self.tasks.put(Task(priority, func, args, kwds))
def _join_with_timeout(self, timeout): """Workaround for the fact Queue.join() doesn't support timeouts""" tasks = self.tasks tasks.all_tasks_done.acquire() try: endtime = time() + timeout while tasks.unfinished_tasks: remaining = endtime - time() if remaining <= 0.0: raise PendingTasks tasks.all_tasks_done.wait(remaining) finally: tasks.all_tasks_done.release()
[docs] def wait_for_tasks(self, timeout=None): """Wait for completion of all the tasks in the queue""" if timeout is None: self.tasks.join() elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: self._join_with_timeout(timeout)