pytask

A simple asynchronous Python daemon for IO bound tasks, based on greenlets. Uses Redis or Redis Cluster to store state; workers are stateless. Included Monitor and Cleanup tasks to handle failure of workers. pytask is designed distributed and fault tolerant.

Starting Workers

To create a pytask worker you create an instance of PyTask, add your task classes to it and then call its run method:

# Gevent monkey patching for async tasks
from gevent import monkey
monkey.patch_all()

from pytask import PyTask
from mytasks import MyTask

# Create pytask "app"
task_app = PyTask({'host': 'localhost', 'port': 6379})

# Add our task to it
task_app.add_task(MyTask)

# Run the worker
task_app.run()

Writing Tasks

The definitive guide to writing pytask tasks. The first step is to name your task, like so:

class MyTask(Task):
    NAME = 'mytask'

It is recommended to namespace your names, for example the Monitor and Cleanup tasks included with pytask are named pytask/monitor and pytask/cleanup respectively.

Data & Init

Tasks should define an __init__ function which will be passed task data as kwargs. Remember task data is defined in JSON, so you’ll need to handle non-JSON-serializable objects.

class MyTask(Task):
    def __init__(self, my_task_option='default'):
        self.option = my_task_option

Start & Stop

Tasks must implement a .start method, this is called directly in a greenlet to start and execute the task. When this function ends the task is considered to be ended.

class MyTask(Task):
    ...
    def start(self):
        do_some_work()

This works well for short lived tasks, but sometimes we want to run something which runs forever/a-long-time. To achieve this we can use a separate greenlet and the .stop method to gracefully handle shutdown:

class MyTask(Task):
    ...
    def start(self):
        self.loop = gevent.spawn(self.work)
        self.loop.get()

    def stop(self):
        self.loop.kill()

    def work(self):
        do_some_work()

Emitting Events

Tasks can emit events to Redis pubsub:

class MyTask(Task):
    ...
    def start(self):
        self.emit('event-name', {'some': 'data'})

Handling Errors

When tasks encounter an error, they should raise a self.Error exception. This will put them into ERROR state - crucially different to EXCEPTION which is reserved for unexpected errors.

class MyTask(Task):
    ...
    def start(self):
        raise self.Error('Task failz!')

Context

Sometimes it’s desirable to wrap task method calls with some kind of application specific context (eg a Flask app). To do this a task needs to define a static provide_context method, like so:

class MyTask(Task):
    @staticmethod
    def provide_context():
        # Wrap task methods in Flask.app_context
        return web_app.app_context()

The context will be created before the task instance is created, and all task methods (__init__, start & stop) will be nested inside the context.

pytask API

class pytask.pytask.PyTask(redis_instance, update_task_interval=5, **kwargs)

Bases: pytask.helpers._PyTaskRedisConf

A daemon that starts/stops tasks & replicates that to a Redis instance tasks can be control via Redis pubsub.

Redis Instance:
The first argument can be either a Redis client or a list of host/port details. When using a list, pytask will use redis-py with one host and redis-py-cluster where multiple hosts are present.
Parameters:
  • redis_instance (client or list) – Redis client or list of (host, port) tuples
  • task_set (str) – name of task set
  • task_prefix (str) – prefix for task names
  • new_queue (str) – queue to read new task IDs from
  • end_queue (str) – where to push complete task IDs
  • update_task_interval (int) – interval in s to update task times
exception PyTaskException

Bases: exceptions.Exception

exception PyTask.StopTask

Bases: pytask.pytask.PyTaskException

PyTask.add_exception_handler(handler)

Add an exception handler.

PyTask.add_task(task_class)

Add a task class.

PyTask.add_tasks(*task_classes)

Add multiple task classes.

PyTask.run(task_map=None)

Run pytask, basically a wrapper to handle KeyboardInterrupt.

PyTask.start_local_task(task_name, **task_data)

Used to start local tasks on this worker, which will start when .run is called.

Helpers

class pytask.helpers.PyTaskHelpers(redis_instance, task_set='tasks', task_prefix='task-', new_queue='new-task', end_queue='end-task')

Bases: pytask.helpers._PyTaskRedisConf

Helper functions for managing task data within Redis. All PyTask instances have an instance attached on their helpers attribute.

Parameters:
  • redis_instance (client or list) – Redis client or list of (host, port) tuples
  • task_set (str) – name of task set
  • task_prefix (str) – prefix for task names
  • new_queue (str) – queue to read new task IDs from
  • end_queue (str) – where to push complete task IDs
get_active_task_ids()

Get a list of active task_ids.

get_end_task_ids()

Get task IDs in the end queue.

get_new_task_ids()

Get task IDs in the new queue.

get_task(task_id, keys=None)

Get task hash data.

reload_task(task_id)

Reload a task.

remove_task(task_id)
restart_if_state(task_id, states)
restart_task(task_id)
set_task(task_id, data, value=None)

Set task hash data.

start_task(task_name, task_id=None, cleanup=True, **task_data)

Start a new task.

stop_task(task_id)

Stop a task.

pytask.helpers.run_loop(function, interval)

Like JavaScripts setInterval, slight time drift as usual, useful in long running tasks.

Included Tasks

class pytask.tasks.Cleanup(task_handler=None)

Bases: pytask.task.Task

A pytask which cleans up other pytasks! Drains the end queue and optionally triggers a handler/callback function.

Parameters:task_handler (str) – module:attribute type string
NAME = 'pytask/cleanup'
start()
class pytask.tasks.Monitor(loop_interval=10, task_timeout=60)

Bases: pytask.task.Task

A pytask which monitors pytasks! Checks all task state in Redis at a configured interval, will re-queue tasks which timeout.

NAME = 'pytask/monitor'
check_tasks()
start()
stop()

Clustering pytask

pytask leans on Redis to handle its state - this means pytask workers are stateless, and can be scaled horizontally. Combined with a Redis cluster you have a fullly distributed and somewhat fault tolerant task system. For example:

  • 3x servers
  • Each has one instance of Redis clustered
  • Each runs 4 workers

Fault Tolerance

This document describes the data model and related fault tolerance properties of pytask instances and clusters.

Data Model

  • New task IDs and ended task IDs are pushed to two Redis list queues (LPUSH/RPOP)
  • “Active” tasks are stored in a set
  • A task is represented by a Redis hash (it’s key is the task ID w/prefix):
{
    'task': 'task_name', # the NAME attribute of the task class
    'data': 'json_data', # JSON data passed to the task
    'last_update': 0, # Internal update counter, used to requeue failed worker tasks
    'state': '[RUNNING|STOPPED|SUCCESS|ERROR|EXCEPTION]', # tasks current state

    # Present on ended tasks:
    'output': 'json_data' # output JSON data or exception data
}

At any one time, all task_ids the system is aware of are stored in the task set, new and end queues. Task hashes without their keys in one of these lists would be orphaned.

Task States

Before task starts:
When the task is waiting to be picked by a worker - it’s ID should be in the new task queue. Represented by no value or WAIT.
RUNNING:
These tasks are monitored and “locked” to their worker by update time. Monitor tasks will requeue these tasks when their update time is over a configurable threshold.
STOPPED:
These tasks were intentionally stopped before they could complete. They can be restarted (possibly any worker) by pushing the task ID onto the new queue.
SUCCESS, ERROR & EXCEPTION:
These tasks have finished, and should have an associated output in their hash set.

Worker Failure

Expected/SIGINT:
The worker will stop and requeue any running tasks, and set their state to WAIT.
Unexpected:
The worker will stop updating update_time on its tasks. Any monitor tasks will pick this up (depending on configuration) and requeue the failed workers tasks.

Redis Failure

Partitons:
When workers can’t reach Redis, they stop all their tasks. So long as there is still a Redis cluster and some worker instances reaching it, these tasks will be requeued.
Complete failure:
In the case where Redis is offline for all workers, the task cluster will essentialy stop. A usable Redis cluster is always required.