pytask¶
A simple asynchronous Python daemon for IO bound tasks, based on greenlets. Uses Redis or
Redis Cluster to store state; workers are stateless. Included Monitor
and Cleanup
tasks to handle failure of workers. pytask is designed distributed and fault tolerant.
Starting Workers¶
To create a pytask worker you create an instance of PyTask
, add your task classes to
it and then call its run
method:
# Gevent monkey patching for async tasks
from gevent import monkey
monkey.patch_all()
from pytask import PyTask
from mytasks import MyTask
# Create pytask "app"
task_app = PyTask({'host': 'localhost', 'port': 6379})
# Add our task to it
task_app.add_task(MyTask)
# Run the worker
task_app.run()
Writing Tasks¶
The definitive guide to writing pytask tasks. The first step is to name your task, like so:
class MyTask(Task):
NAME = 'mytask'
It is recommended to namespace your names, for example the Monitor
and Cleanup
tasks included with pytask are named pytask/monitor
and pytask/cleanup
respectively.
Data & Init¶
Tasks should define an __init__
function which will be passed task data as kwargs.
Remember task data is defined in JSON, so you’ll need to handle non-JSON-serializable
objects.
class MyTask(Task):
def __init__(self, my_task_option='default'):
self.option = my_task_option
Start & Stop¶
Tasks must implement a .start
method, this is called directly in a greenlet to start
and execute the task. When this function ends the task is considered to be ended.
class MyTask(Task):
...
def start(self):
do_some_work()
This works well for short lived tasks, but sometimes we want to run something which runs
forever/a-long-time. To achieve this we can use a separate greenlet and the .stop
method to gracefully handle shutdown:
class MyTask(Task):
...
def start(self):
self.loop = gevent.spawn(self.work)
self.loop.get()
def stop(self):
self.loop.kill()
def work(self):
do_some_work()
Emitting Events¶
Tasks can emit events to Redis pubsub:
class MyTask(Task):
...
def start(self):
self.emit('event-name', {'some': 'data'})
Handling Errors¶
When tasks encounter an error, they should raise a self.Error
exception. This will
put them into ERROR
state - crucially different to EXCEPTION
which is reserved
for unexpected errors.
class MyTask(Task):
...
def start(self):
raise self.Error('Task failz!')
Context¶
Sometimes it’s desirable to wrap task method calls with some kind of application specific
context (eg a Flask app). To do this a task needs to define a static provide_context
method, like so:
class MyTask(Task):
@staticmethod
def provide_context():
# Wrap task methods in Flask.app_context
return web_app.app_context()
The context will be created before the task instance is created, and all task methods
(__init__
, start
& stop
) will be nested inside the context.
pytask API¶
-
class
pytask.pytask.
PyTask
(redis_instance, update_task_interval=5, **kwargs)¶ Bases:
pytask.helpers._PyTaskRedisConf
A daemon that starts/stops tasks & replicates that to a Redis instance tasks can be control via Redis pubsub.
- Redis Instance:
- The first argument can be either a Redis client or a list of host/port details.
When using a list, pytask will use
redis-py
with one host andredis-py-cluster
where multiple hosts are present.
Parameters: - redis_instance (client or list) – Redis client or list of
(host, port)
tuples - task_set (str) – name of task set
- task_prefix (str) – prefix for task names
- new_queue (str) – queue to read new task IDs from
- end_queue (str) – where to push complete task IDs
- update_task_interval (int) – interval in s to update task times
-
exception
PyTaskException
¶ Bases:
exceptions.Exception
-
exception
PyTask.
StopTask
¶ Bases:
pytask.pytask.PyTaskException
-
PyTask.
add_exception_handler
(handler)¶ Add an exception handler.
-
PyTask.
add_task
(task_class)¶ Add a task class.
-
PyTask.
add_tasks
(*task_classes)¶ Add multiple task classes.
-
PyTask.
run
(task_map=None)¶ Run pytask, basically a wrapper to handle KeyboardInterrupt.
-
PyTask.
start_local_task
(task_name, **task_data)¶ Used to start local tasks on this worker, which will start when
.run
is called.
Helpers¶
-
class
pytask.helpers.
PyTaskHelpers
(redis_instance, task_set='tasks', task_prefix='task-', new_queue='new-task', end_queue='end-task')¶ Bases:
pytask.helpers._PyTaskRedisConf
Helper functions for managing task data within Redis. All
PyTask
instances have an instance attached on theirhelpers
attribute.Parameters: - redis_instance (client or list) – Redis client or list of
(host, port)
tuples - task_set (str) – name of task set
- task_prefix (str) – prefix for task names
- new_queue (str) – queue to read new task IDs from
- end_queue (str) – where to push complete task IDs
-
get_active_task_ids
()¶ Get a list of active
task_ids
.
-
get_end_task_ids
()¶ Get task IDs in the end queue.
-
get_new_task_ids
()¶ Get task IDs in the new queue.
-
get_task
(task_id, keys=None)¶ Get task hash data.
-
reload_task
(task_id)¶ Reload a task.
-
remove_task
(task_id)¶
-
restart_if_state
(task_id, states)¶
-
restart_task
(task_id)¶
-
set_task
(task_id, data, value=None)¶ Set task hash data.
-
start_task
(task_name, task_id=None, cleanup=True, **task_data)¶ Start a new task.
-
stop_task
(task_id)¶ Stop a task.
- redis_instance (client or list) – Redis client or list of
-
pytask.helpers.
run_loop
(function, interval)¶ Like JavaScripts
setInterval
, slight time drift as usual, useful in long running tasks.
Clustering pytask¶
pytask leans on Redis to handle its state - this means pytask workers are stateless, and can be scaled horizontally. Combined with a Redis cluster you have a fullly distributed and somewhat fault tolerant task system. For example:
- 3x servers
- Each has one instance of Redis clustered
- Each runs 4 workers
Fault Tolerance¶
This document describes the data model and related fault tolerance properties of pytask instances and clusters.
Data Model¶
- New task IDs and ended task IDs are pushed to two Redis list queues (LPUSH/RPOP)
- “Active” tasks are stored in a set
- A task is represented by a Redis hash (it’s key is the task ID w/prefix):
{
'task': 'task_name', # the NAME attribute of the task class
'data': 'json_data', # JSON data passed to the task
'last_update': 0, # Internal update counter, used to requeue failed worker tasks
'state': '[RUNNING|STOPPED|SUCCESS|ERROR|EXCEPTION]', # tasks current state
# Present on ended tasks:
'output': 'json_data' # output JSON data or exception data
}
At any one time, all task_ids the system is aware of are stored in the task set, new and end queues. Task hashes without their keys in one of these lists would be orphaned.
Task States¶
- Before task starts:
- When the task is waiting to be picked by a worker - it’s ID should be in the new task
queue. Represented by no value or
WAIT
. RUNNING
:- These tasks are monitored and “locked” to their worker by update time. Monitor tasks will requeue these tasks when their update time is over a configurable threshold.
STOPPED
:- These tasks were intentionally stopped before they could complete. They can be restarted (possibly any worker) by pushing the task ID onto the new queue.
SUCCESS
,ERROR
&EXCEPTION
:- These tasks have finished, and should have an associated
output
in their hash set.
Worker Failure¶
- Expected/SIGINT:
- The worker will stop and requeue any running tasks, and set their state to
WAIT
. - Unexpected:
- The worker will stop updating
update_time
on its tasks. Any monitor tasks will pick this up (depending on configuration) and requeue the failed workers tasks.
Redis Failure¶
- Partitons:
- When workers can’t reach Redis, they stop all their tasks. So long as there is still a Redis cluster and some worker instances reaching it, these tasks will be requeued.
- Complete failure:
- In the case where Redis is offline for all workers, the task cluster will essentialy stop. A usable Redis cluster is always required.