Spinach¶
Release v0.0.17. (Installation)
Spinach is a Redis task queue for Python 3 heavily inspired by Celery and RQ.
Distinctive features:
- Threaded and asyncio workers
- At-least-once or at-most-once delivery per task
- Periodic tasks without an additional process
- Concurrency limits on queued jobs
- Scheduling of tasks in batch
- Embeddable workers for easier testing
- Integrations with Flask, Django, Logging, Sentry and Datadog
- See design choices for more details
Installation:
pip install spinach
Quickstart
from spinach import Engine, MemoryBroker
spin = Engine(MemoryBroker())
@spin.task(name='compute')
def compute(a, b):
print('Computed {} + {} = {}'.format(a, b, a + b))
# Schedule a job to be executed ASAP
spin.schedule(compute, 5, 3)
print('Starting workers, ^C to quit')
spin.start_workers()
The Engine
is the central part of Spinach, it allows to define tasks, schedule jobs to
execute in the background and start background workers. More details.
The Broker is the backend that background workers use to retrieve jobs to execute. Spinach provides two brokers: MemoryBroker for development and RedisBroker for production.
The Engine.task()
decorator is used to register tasks. It requires at least a name to
identify the task, but other options can be given to customize how the task behaves. More
details.
Background jobs can then be scheduled by using either the task name or the task function:
spin.schedule('compute', 5, 3) # identify a task by its name
spin.schedule(compute, 5, 3) # identify a task by its function
Getting started with spinach:
Installation¶
Prerequisites¶
Spinach is written in Python 3, prior to use it you must make sure you have a Python 3.6+ interpreter on your system.
Pip¶
If you are familiar with the Python ecosystem, you won’t be surprised that Spinach can be installed with:
$ pip install spinach
That’s it, you can call it a day!
From Source¶
Spinach is developed on GitHub, you can find the code at NicolasLM/spinach.
You can clone the public repository:
$ git clone https://github.com/NicolasLM/spinach.git
Once you have the sources, simply install it with:
$ cd spinach
$ pip install -e .
Tasks¶
A tasks is a unit of code, usually a function, to be executed in the background on remote workers.
To define a task:
from spinach import Tasks
tasks = Tasks()
@tasks.task(name='add')
def add(a, b):
print('Computed {} + {} = {}'.format(a, b, a + b))
Note
The args and kwargs of a task must be JSON serializable.
Retries¶
Spinach knows two kinds of tasks: the ones that can be retried safely (idempotent tasks) and the ones that cannot be retried safely (non-idempotent tasks). Since Spinach cannot guess if a task code is safe to be retried multiple times, it must be annotated when the task is created.
Note
Whether a task is retryable or not affects the behavior of jobs in case of normal errors during their execution but also when a worker catastrophically dies (power outage, OOM killed…).
Non-Retryable Tasks¶
Spinach assumes that by default tasks are not safe to be retried (tasks are assumed to have side effects).
These tasks are defined with max_retries=0 (the default):
@tasks.task(name='foo')
def foo(a, b):
pass
- use at-most-once delivery
- it is guarantied that the job will not run multiple times
- it is guarantied that the job will not run simultaneously in multiple workers
- the job is not automatically retried in case of errors
- the job may never even start in very rare conditions
Retryable Tasks¶
Idempotent tasks can be executed multiple times without changing the result beyond the initial execution. It is a nice property to have and most tasks should try to be idempotent to gracefully recover from errors.
Retryable tasks are defined with a positive max_retries value:
@tasks.task(name='foo', max_retries=10)
def foo(a, b):
pass
- use at-least-once delivery
- the job is automatically retried, up to max_retries times, in case of errors
- the job may be executed more than once
- the job may be executed simultaneously in multiple workers in very rare conditions
When a worker catastrophically dies it will be detected dead after 30 minutes of inactivity and the retryable jobs that were running will be rescheduled automatically.
Retrying¶
When a retryable task is being executed it will be retried when it encounters an unexpected exception:
@tasks.task(name='foo', max_retries=10)
def foo(a, b):
l = [0, 1, 2]
print(l[100]) # Raises IndexError
To allow the system to recover gracefully, a default backoff strategy is applied.
-
spinach.utils.
exponential_backoff
(attempt: int, cap: int = 1200) → datetime.timedelta¶ Calculate a delay to retry using an exponential backoff algorithm.
It is an exponential backoff with random jitter to prevent failures from being retried at the same time. It is a good fit for most applications.
Parameters: - attempt – the number of attempts made
- cap – maximum delay, defaults to 20 minutes
To be more explicit, a task can also raise a RetryException
which allows to precisely
control when it should be retried:
from spinach import RetryException
@tasks.task(name='foo', max_retries=10)
def foo(a, b):
if status_code == 429:
raise RetryException(
'Should retry in 10 minutes',
at=datetime.now(tz=timezone.utc) + timedelta(minutes=10)
)
-
class
spinach.task.
RetryException
(message, at: Optional[datetime.datetime] = None)¶ Exception raised in a task to indicate that the job should be retried.
Even if this exception is raised, the max_retries defined in the task still applies.
Parameters: at – Optional date at which the job should be retried. If it is not given the job will be retried after a randomized exponential backoff. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
A task can also raise a AbortException
for short-circuit behavior:
-
class
spinach.task.
AbortException
¶ Exception raised in a task to indicate that the job should NOT be retried.
If this exception is raised, all retry attempts are stopped immediately.
Limiting task concurrency¶
If a task is idempotent it may also have a limit on the number of concurrent jobs spawned across all workers. These types of tasks are defined with a positive max_concurrency value:
@tasks.task(name='foo', max_retries=10, max_concurrency=1)
def foo(a, b):
pass
With this definition, no more than one instance of the Task will ever be spawned as a running Job, no matter how many are queued and waiting to run.
Periodic tasks¶
Tasks marked as periodic get automatically scheduled. To run a task every 5 seconds:
from datetime import timedelta
from spinach import Engine, MemoryBroker
spin = Engine(MemoryBroker())
every_5_sec = timedelta(seconds=5)
@spin.task(name='make_coffee', periodicity=every_5_sec)
def make_coffee():
print("Making coffee...")
print('Starting workers, ^C to quit')
spin.start_workers()
Periodic tasks get scheduled by the workers themselves, there is no need to run an additional process only for that. Of course having multiple workers on multiple machine is fine and will not result in duplicated tasks.
Periodic tasks run at most every period. If the system scheduling periodic tasks gets delayed, nothing compensates for the time lost. This has the added benefit of periodic tasks not being scheduled if all the workers are down for a prolonged amount of time. When they get back online, workers won’t have a storm of periodic tasks to execute.
Tasks Registry¶
Before being attached to a Spinach Engine
, tasks are created inside a Tasks
registry.
Attaching tasks to a Tasks
registry instead of directly to the Engine
allows to
compose large applications in smaller units independent from each other, the same way a Django
project is composed of many small Django apps.
This may seem cumbersome for trivial applications, like the examples in this documentation or some
single-module projects, so those can create tasks directly on the Engine
using:
spin = Engine(MemoryBroker())
@spin.task(name='fast')
def fast():
time.sleep(1)
Note
Creating tasks directly in the Engine
is a bit like creating a Flask app
globally instead of using an app factory: it works until a change introduces a circular
import. Its usage should really be limited to tiny projects.
-
class
spinach.task.
Tasks
(queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)¶ Registry for tasks to be used by Spinach.
Parameters: - queue – default queue for tasks
- max_retries – default retry policy for tasks
- periodicity – for periodic tasks, delay between executions as a timedelta
- max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
-
add
(func: Callable, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)¶ Register a task function.
Parameters: - func – a callable to be executed
- name – name of the task, used later to schedule jobs
- queue – queue of the task, the default is used if not provided
- max_retries – maximum number of retries, the default is used if not provided
- periodicity – for periodic tasks, delay between executions as a timedelta
- max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
>>> tasks = Tasks() >>> tasks.add(lambda x: x, name='do_nothing')
-
schedule
(task: Union[str, Callable, spinach.task.Task], *args, **kwargs) → Job¶ Schedule a job to be executed as soon as possible.
Parameters: - task – the task or its name to execute in the background
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
Returns: The Job that was created and scheduled.
This method can only be used once tasks have been attached to a Spinach
Engine
.
-
schedule_at
(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs) → Job¶ Schedule a job to be executed in the future.
Parameters: - task – the task or its name to execute in the background
- at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
Returns: The Job that was created and scheduled.
This method can only be used once tasks have been attached to a Spinach
Engine
.
-
schedule_batch
(batch: Batch) → Iterable[Job]¶ Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker.
Parameters: batch – Batch
instance containing jobs to scheduleReturns: The Jobs that were created and scheduled.
-
task
(func: Optional[Callable] = None, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)¶ Decorator to register a task function.
Parameters: - name – name of the task, used later to schedule jobs
- queue – queue of the task, the default is used if not provided
- max_retries – maximum number of retries, the default is used if not provided
- periodicity – for periodic tasks, delay between executions as a timedelta
- max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
>>> tasks = Tasks() >>> @tasks.task(name='foo') >>> def foo(): ... pass
Batch¶
-
class
spinach.task.
Batch
¶ Container allowing to schedule many jobs at once.
Batching the scheduling of jobs allows to avoid doing many round-trips to the broker, reducing the overhead and the chance of errors associated with doing network calls.
In this example 100 jobs are sent to Redis in one call:
>>> batch = Batch() >>> for i in range(100): ... batch.schedule('compute', i) ... >>> spin.schedule_batch(batch)
Once the
Batch
is passed to theEngine
it should be disposed off and not be reused.-
schedule
(task: Union[str, Callable, spinach.task.Task], *args, **kwargs)¶ Add a job to be executed ASAP to the batch.
Parameters: - task – the task or its name to execute in the background
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
-
schedule_at
(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs)¶ Add a job to be executed in the future to the batch.
Parameters: - task – the task or its name to execute in the background
- at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
-
Jobs¶
A Job
represents a specific execution of a task. To make an analogy with Python,
a Task
gets instantiated into many Job
, like a class that gets instantiated
into many objects.
Job¶
-
class
spinach.job.
Job
¶ Represent the execution of a
Task
by background workers.The
Job
class should not be instantiated by the user, instead jobs are automatically created when they are scheduled.Variables: - id – UUID of the job
- status –
JobStatus
- task_name – string name of the task
- queue – string name of the queue
- at – timezone aware datetime representing the date at which the job should start
- max_retries – int representing how many times a failing job should be retried
- retries – int representing how many times the job was already executed
- task_args – optional tuple containing args passed to the task
- task_kwargs – optional dict containing kwargs passed to the task
Job Status¶
-
class
spinach.job.
JobStatus
¶ Possible status of a
Job
.Life-cycle:
- Newly created jobs first get the status NOT_SET
- Future jobs are then set to WAITING until they are ready to be QUEUED
- Jobs starting immediately get the status QUEUED directly when they are received by the broker
- Jobs are set to RUNNING when a worker start their execution
- if the job terminates without error it is set to SUCCEEDED
- if the job terminates with an error and can be retried it is set to WAITING until it is ready to be queued again
- if the job terminates with an error and cannot be retried it is set to FAILED for ever
See Signals to be notified on some of these status transitions.
-
FAILED
= 5¶ Job failed and will not be retried
-
NOT_SET
= 0¶ Job created but not scheduled yet
-
QUEUED
= 2¶ Job is in a queue, ready to be picked by a worker
-
RUNNING
= 3¶ Job is being executed
-
SUCCEEDED
= 4¶ Job is finished, execution was successful
-
WAITING
= 1¶ Job is scheduled to start in the future
Engine¶
The Spinach Engine
is what connects tasks, jobs, brokers and workers together.
It is possible, but unusual, to have multiple Engines running in the same Python interpreter.
-
class
spinach.engine.
Engine
(broker: spinach.brokers.base.Broker, namespace: str = 'spinach')¶ Spinach Engine coordinating a broker with workers.
This class does the orchestration of all components, it is the one that starts and terminates the whole machinery.
The Engine can be run in two modes:
- client: synchronously submits jobs.
- worker: asynchronously executes jobs.
Submitting jobs is quite easy, so running the Engine in client mode doesn’t require spawning any thread.
Executing jobs however is a bit more involved, so running the Engine in worker mode ends up spawning a few threads:
- a few worker threads: they are only responsible for executing the task function and advancing the job status once it is finished.
- a result notifier thread: sends back the result of job executions to the Broker backend, acts basically as a client.
- an arbiter thread: fetches jobs from the Broker and gives them to the workers as well as doing some periodic bookkeeping.
- a Broker subscriber thread: receives notifications from the backend when something happens, typically a job is enqueued.
- the process main thread: starts all the above threads, then does nothing waiting for the signal to terminate the threads it started.
This means that a Spinach worker process has at least 5 threads.
Parameters: - broker – instance of a
Broker
- namespace – name of the namespace used by the Engine. When different Engines use the same Redis server, they must use different namespaces to isolate themselves.
-
attach_tasks
(tasks: spinach.task.Tasks)¶ Attach a set of tasks.
A task cannot be scheduled or executed before it is attached to an Engine.
>>> tasks = Tasks() >>> spin.attach_tasks(tasks)
-
namespace
¶ Namespace the Engine uses.
-
schedule
(task: Union[str, Callable, spinach.task.Task], *args, **kwargs) → spinach.job.Job¶ Schedule a job to be executed as soon as possible.
Parameters: - task – the task or its name to execute in the background
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
Returns: The Job that was created and scheduled.
-
schedule_at
(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs) → spinach.job.Job¶ Schedule a job to be executed in the future.
Parameters: - task – the task or its name to execute in the background
- at – date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
- args – args to be passed to the task function
- kwargs – kwargs to be passed to the task function
Returns: The Job that was created and scheduled.
-
schedule_batch
(batch: spinach.task.Batch) → Iterable[spinach.job.Job]¶ Schedule many jobs at once.
Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker.
Parameters: batch – Batch
instance containing jobs to scheduleReturns: The Jobs that were created and scheduled.
-
start_workers
(number: int = 5, queue: str = 'spinach', block: bool = True, stop_when_queue_empty=False, workers_class: Type[spinach.worker.BaseWorkers] = <class 'spinach.worker.ThreadWorkers'>)¶ Start the worker threads.
Parameters: - number – number of workers to launch, each job running uses one worker.
- queue – name of the queue to consume, see Queues.
- block – whether to block the calling thread until a signal arrives and workers get terminated.
- stop_when_queue_empty – automatically stop the workers when the queue is empty. Useful mostly for one-off scripts and testing.
- worker_class – Class to change the behavior of workers, defaults to threaded workers
-
stop_workers
(_join_arbiter=True)¶ Stop the workers and wait for them to terminate.
Namespace¶
Namespaces allow to identify and isolate multiple Spinach engines running on the same Python interpreter and/or sharing the same Redis server.
Having multiple engines on the same interpreter is rare but can happen when using the Flask integration with an app factory. In this case using different namespaces is important to avoid signals sent from one engine to be received by another engine.
When multiple Spinach Engines use the same Redis server, for example when production and staging share the same database, different namespaces must be used to make sure they do not step on each other’s feet.
The production application would contain:
spin = Engine(RedisBroker(), namespace='prod')
While the staging application would contain:
spin = Engine(RedisBroker(), namespace='stg')
Note
Using different Redis database numbers (0, 1, 2…) for different environments is not enough as Redis pubsubs are shared among databases. Namespaces solve this problem.
Queues¶
Queues are an optional feature that allows directing a set of tasks to specific workers.
Queues are useful when different tasks have different usage patterns, for instance one task being fast and high priority while another task is slow and low-priority. To prevent the slow task from blocking the execution of the fast one, each task can be attached to its own queue:
import time
import logging
from spinach import Engine, MemoryBroker
logging.basicConfig(
format='%(asctime)s - %(threadName)s %(levelname)s: %(message)s',
level=logging.DEBUG
)
spin = Engine(MemoryBroker())
@spin.task(name='fast', queue='high-priority')
def fast():
time.sleep(1)
@spin.task(name='slow', queue='low-priority')
def slow():
time.sleep(10)
spin.schedule(slow)
spin.schedule(fast)
spin.start_workers(number=1, queue='high-priority', stop_when_queue_empty=True)
The task decorator accepts an optional queue name that binds the task to a specific queue. Likewise, passing a queue name to start_workers restricts workers to executing only tasks of this particular queue.
Note
By default all tasks and all workers use the spinach
queue
Note
Namespaces and queues are different concepts. While queues share the same Spinach
Engine
, namespaces make two Spinach Engines invisible to each other while still using
the same broker.
Asyncio¶
Spinach allows to define and run tasks as asyncio coroutines. In this mode the worker is a single thread that runs all tasks asynchronously. This allows for greater concurrency as well as compatibility with the asyncio ecosystem.
Creating async tasks¶
To define an asynchronous task, just prefix its definition with the async
keyword:
@spin.task(name='compute')
async def compute(a, b):
await asyncio.sleep(1)
print('Computed {} + {} = {}'.format(a, b, a + b))
To run the workers in asynchronous mode, pass the AsyncioWorkers
class to start_workers
:
from spinach import AsyncioWorkers
spin.start_workers(number=256, workers_class=AsyncioWorkers)
When using the asyncio workers, the number
argument can be set quite high because each worker
is just a coroutine, consuming a negligible amount of resources.
Scheduling jobs¶
Because internally only workers are asyncio aware, jobs are still sent to Redis using a blocking socket. This means that to schedule jobs from asynchronous code, care must be taken to send jobs from outside the event loop. This can be achieve using asyncio.to_thread:
await asyncio.to_thread(spin.schedule, compute, 2, 4)
Code scheduling a lot of jobs should use batches to improve performance.
Example¶
import aiohttp
from spinach import Engine, MemoryBroker, Batch, AsyncioWorkers
spin = Engine(MemoryBroker())
@spin.task(name='get_pokemon_name')
async def get_pokemon_name(pokemon_id: int):
"""Call an HTTP API to retrieve a pokemon name by its ID."""
url = f'https://pokeapi.co/api/v2/pokemon/{pokemon_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
pokemon = await response.json()
print(f'Pokemon #{pokemon_id} is {pokemon["name"]}')
# Schedule a batch of 150 tasks to retrieve the name of the
# first 150 pokemons.
batch = Batch()
for pokemon_id in range(1, 151):
batch.schedule(get_pokemon_name, pokemon_id)
spin.schedule_batch(batch)
# Start the asyncio workers and process the tasks
spin.start_workers(
number=256,
workers_class=AsyncioWorkers,
stop_when_queue_empty=True
)
Note
If an application defines both sync and async tasks, each kind of task should go in its own queue so that sync tasks are picked by threaded workers and async tasks by asyncio workers.
Note
Not all contrib integrations may work with asynchronous workers.
Integrations¶
Integration with third-party libraries and frameworks.
Logging¶
Spinach uses the standard Python logging package. Its logger prefix is spinach
. Spinach does
nothing else besides creating its loggers and emitting log records. The user is responsible for
configuring logging before starting workers.
For simple applications it is enough to use:
import logging
logging.basicConfig(
format='%(asctime)s - %(threadName)s %(levelname)s: %(message)s',
level=logging.DEBUG
)
More complex applications will probably use dictConfig.
Flask¶
The Flask integration follows the spirit of Flask very closely, it provides two ways of getting started: a single module approach for minial applications and an application factory approach for more scalable code.
The Spinach extension for Flask pushes an application context for the duration of the tasks, which means that it plays well with other extensions like Flask-SQLAlchemy and doesn’t require extra precautions.
Single Module¶
from flask import Flask
from spinach.contrib.flask_spinach import Spinach
app = Flask(__name__)
spinach = Spinach(app)
@spinach.task(name='say_hello')
def say_hello():
print('Hello from a task')
@app.route('/')
def home():
spinach.schedule('say_hello')
return 'Hello from HTTP'
Application Factory¶
This more complex layout includes an Application Factory create_app
and an imaginary auth
Blueprint containing routes and tasks.
app.py
:
from flask import Flask
from spinach import RedisBroker
from spinach.contrib.flask_spinach import Spinach
spinach = Spinach()
def create_app():
app = Flask(__name__)
app.config['SPINACH_BROKER'] = RedisBroker()
spinach.init_app(app)
from . import auth
app.register_blueprint(auth.blueprint)
spinach.register_tasks(app, auth.tasks)
return app
auth.py
:
from flask import Blueprint, jsonify
from spinach import Tasks
from .app import spinach
blueprint = Blueprint('auth', __name__)
tasks = Tasks()
@blueprint.route('/')
def create_user():
spinach.schedule('send_welcome_email')
return jsonify({'user_id': 42})
@tasks.task(name='send_welcome_email')
def send_welcome_email():
print('Sending email...')
Running workers¶
Workers can be launched from the Flask CLI:
$ FLASK_APP=examples.flaskapp flask spinach
The working queue and the number of threads can be changed with:
$ FLASK_APP=examples.flaskapp flask spinach --queue high-priority --threads 20
Note
When in development mode, Flask uses its reloader to automatically restart the process when the code changes. When having periodic tasks defined, using the MemoryBroker and Flask reloader users may see their periodic tasks scheduled each time the code changes. If this is a problem, users are encouraged to switch to the RedisBroker for development.
Configuration¶
SPINACH_BROKER
, defaultspinach.RedisBroker()
SPINACH_NAMESPACE
, defaults to the Flask app name
Django¶
A Django application is available for integrating Spinach into Django projects.
To get started, add the application spinach.contrib.spinachd
to settings.py
:
INSTALLED_APPS = (
...
'spinach.contrib.spinachd',
)
On startup, Spinach will look for a tasks.py
module in all installed applications. For instance
polls/tasks.py
:
from spinach import Tasks
from .models import Question
tasks = Tasks()
@tasks.task(name='polls:close_poll')
def close_poll(question_id: int):
Question.objects.get(pk=question_id).delete()
Tasks can be easily scheduled from views:
from .models import Question
from .tasks import tasks
def close_poll_view(request, question_id):
question = get_object_or_404(Question, pk=question_id)
tasks.schedule('polls:close_poll', question.id)
Users of the Django Datadog app get their jobs reported to Datadog APM automatically in task workers.
Running workers¶
Workers can be launched from manage.py
:
$ python manage.py spinach
The working queue and the number of threads can be changed with:
$ python manage.py spinach --queue high-priority --threads 20
Sending emails in the background¶
The Spinach app provides an EMAIL_BACKEND
allowing to send emails as background tasks. To use
it simply add it to settings.py
:
EMAIL_BACKEND = 'spinach.contrib.spinachd.mail.BackgroundEmailBackend'
SPINACH_ACTUAL_EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
Emails can then be sent using regular Django functions:
from django.core.mail import send_mail
send_mail('Subject', 'Content', 'sender@example.com', ['receiver@example.com'])
Periodically clearing expired sessions¶
Projects using django.contrib.sessions
must remove expired session from the database from time
to time. Django comes with a management command to do that manually, but this can be automated.
Spinach provides a periodic task, disabled by default, to do that. To enable it give it
a periodicity in settings.py
. For instance to clear sessions once per week:
from datetime import timedelta
SPINACH_CLEAR_SESSIONS_PERIODICITY = timedelta(weeks=1)
Configuration¶
SPINACH_BROKER
, defaultspinach.RedisBroker()
SPINACH_NAMESPACE
, defaultspinach
SPINACH_ACTUAL_EMAIL_BACKEND
, defaultdjango.core.mail.backends.smtp.EmailBackend
SPINACH_CLEAR_SESSIONS_PERIODICITY
, defaultNone
(disabled)
Sentry¶
With the Sentry integration, failing jobs can be automatically reported to Sentry with full traceback, log breadcrumbs and job information. Moreover performance tracing of task is enabled.
The Sentry integration requires Sentry SDK:
pip install sentry_sdk
It then just needs to be registered before starting workers:
import sentry_sdk
from spinach.contrib.sentry_sdk_spinach import SpinachIntegration
sentry_sdk.init(
dsn="https://sentry_dsn/42",
integrations=[SpinachIntegration(send_retries=False)]
)
Datadog¶
With the Datadog integration, all jobs are automatically reported to Datadog APM.
The integration requires ddtrace, the Datadog APM client for Python:
pip install ddtrace
The integration just needs to be registered before starting workers:
from spinach.contrib.datadog import register_datadog
register_datadog()
spin = Engine(MemoryBroker())
spin.start_workers()
This only installs the integration with Spinach, other libraries still need to be patched by ddtrace. It is recommended to run your application patched as explained in the ddtrace documentation.
-
spinach.contrib.datadog.
register_datadog
(tracer=None, namespace: Optional[str] = None, service: str = 'spinach')¶ Register the Datadog integration.
Parameters: - tracer – optionally use a custom ddtrace Tracer instead of the global one.
- namespace – optionally only register the Datadog integration for a
particular Spinach
Engine
- service – Datadog service associated with the trace, defaults to spinach
Signals¶
Signals are events broadcasted when something happens in Spinach, like a job starting or a worker shutting down.
Subscribing to signals allows your code to react to internal events in a composable and reusable way.
Subscribing to signals¶
Subscribing to a signal is done via its connect
decorator:
from spinach import signals
@signals.job_started.connect
def job_started(namespace, job, **kwargs):
print('Job {} started'.format(job))
The first argument given to your function is always the namespace of your Spinach Engine
,
the following arguments depend on the signal itself.
Subscribing to signals of a specific Spinach Engine¶
As your application gets bigger you may end up running multiple Engines in the same interpreter.
The connect_via
decorator allows to subscribe to the signals sent by a specific Spinach
Engine
:
from spinach import Engine, MemoryBroker, signals
foo_spin = Engine(MemoryBroker(), namespace='foo')
bar_spin = Engine(MemoryBroker(), namespace='bar')
@signals.job_started.connect_via(foo_spin.namespace)
def job_started(namespace, job, **kwargs):
print('Job {} started on Foo'.format(job))
In this example only signals sent by the foo Engine
will be received.
Available signals¶
-
spinach.signals.
job_started
= SafeNamedSignal "job_started"¶ Sent by a worker when a job starts being executed.
Signal handlers receive:
- namespace Spinach namespace
- job
Job
being executed
-
spinach.signals.
job_finished
= SafeNamedSignal "job_finished"¶ Sent by a worker when a job finishes execution.
The signal is sent no matter the outcome, even if the job fails or gets rescheduled for retry.
Signal handlers receive:
- namespace Spinach namespace
- job
Job
being executed
-
spinach.signals.
job_schedule_retry
= SafeNamedSignal "job_schedule_retry"¶ Sent by a worker when a job gets rescheduled for retry.
Signal handlers receive:
- namespace Spinach namespace
- job
Job
being executed - err exception that made the job retry
-
spinach.signals.
job_failed
= SafeNamedSignal "job_failed"¶ Sent by a worker when a job failed.
A failed job will not be retried.
Signal handlers receive:
- namespace Spinach namespace
- job
Job
being executed - err exception that made the job fail
-
spinach.signals.
worker_started
= SafeNamedSignal "worker_started"¶ Sent by a worker when it starts.
Signal handlers receive:
- namespace Spinach namespace
- worker_name name of the worker starting
-
spinach.signals.
worker_terminated
= SafeNamedSignal "worker_terminated"¶ Sent by a worker when it shutdowns.
Signal handlers receive:
- namespace Spinach namespace
- worker_name name of the worker shutting down
Tips¶
Received objects¶
Objects received via signals should not be modified in handlers as it could break something in Spinach internals.
Exceptions¶
If your receiving function raises an exception while processing a signal, this exception will be
logged in the spinach.signals
logger.
Going further¶
Have a look at the blinker documentation for other ways using signals.
Running in Production¶
Advices to read before deploying an application using Spinach to production.
Spinach¶
Since by default Spinach executes jobs in a separate threads, the user’s code must be thread-safe. This is usually quite easy to achieve on a traditional web application because frameworks like Flask or Django make that straightforward.
Tasks should not store state in the process between invocations. Instead all state must be stored in an external system, like a database or a cache. This advice also applies to views in a web application.
Redis¶
Most Spinach features are implemented as Lua scripts running inside Redis. Having a solid installation of Redis is the key to Spinach reliability.
To ensure that no tasks are lost or duplicated, Redis must be configured with persistence enabled.
It is recommended to use AOF persistence (appendonly yes
) instead of periodic RDB dumps. The
default of fsync every second (appendfsync everysec
) is a good trade-off between performance
and security against sudden power failures.
Using Redis as a task queue is very different from using it as a cache. If an application uses Redis for both task queue and cache, it is recommended to have two separated Redis servers. One would be configured with persistence and without eviction while the other would have no persistence but would evict keys when running low on memory.
Finally standard security practices apply: Redis should not accept connections from the Internet and it should require a password even when connecting locally.
System¶
If the application is deployed on multiple servers it is important that their clocks be approximately synchronized. This is because Spinach uses the system time to know when a job should start. Running an ntp daemon is highly recommended.
Workers should be started by an init system that will restart them if they get killed or if the host reboots.
To gracefully shutdown a worker, it is recommended to send it a SIGINT or a SIGTERM and let it finish its running jobs. If the worker gets killed before it terminates gracefully, non-retryable jobs will be lost and retryable jobs will be rescheduled automatically after the worker is identified as dead, which takes 30 minutes by default. This is important if Spinach workers run in docker containers because docker gives 10 seconds to a container to finish before killing it.
Production Checklist¶
Spinach:
- Tasks that are NOT safe to be retried have their max_retries set to 0
- Tasks that are safe to be retried have their max_retries set to a positive number
- Retries happen after an exponential delay with randomized jitter (the default)
- Task args and kwargs are JSON serializable and small in size
- Jobs are sent in
Batch
to the broker when multiple jobs are to be scheduled at once - The user’s code is thread-safe when using the default threaded workers
- Tasks do not store state in the process between invocations
- Logging is configured and exceptions are sent to Sentry, see Integrations
- Different queues are used if tasks have different usage pattens, see Queues
- Different namespaces are used if multiple Spinach applications share the same Redis server, see Engine
Redis:
- Redis uses AOF persistence
- Redis does not evict keys when running low on memory
- The Redis server used by Spinach is not also used as a cache
- Connections are secured by a long password
- Connections are encrypted if they go through the public Internet
System:
- Servers have their clock synchronized by ntp
- Workers get restarted by an init system if they get killed
Design choices¶
I have used the Celery task queue for a long time and while it is a rock solid piece of software, there are some design decisions that just drive me crazy.
This page presents and explains the key design decisions behind Spinach. It can be summed up as: explicit is better than implicit. Spinach makes sure that it does not provide any convenient feature that can backfire in more complex usages.
Threaded & asynchronous workers¶
Spinach workers are either threaded or asynchronous while other task queues like Celery or RQ rely on processes by default.
Threaded and asynchronous workers work best with IO bound tasks: tasks that make requests to other services, query a database or read files. If your tasks are CPU bound, meaning that you do heavy computations in Python, a process based worker will be more efficient.
Tasks in a typical web application are more often than not IO bound. The choice of threads or coroutines as unit of concurrency is a sensible one.
Threads and coroutines also have the advantage of being lighter than processes, a system can handle more threads than processes before resources get exhausted.
Fork¶
Another reason why Spinach does not use processes for concurrency is because the fork
system
call used to create the workers is a very special one. It has Copy-On-Write semantics that are
unfamiliar to many Python developers.
On the other hand thread-safety is a more understood problem in Python, the standard library providing most of the solutions to write thread-safe programs.
Not relying on fork
also makes Spinach compatible with Windows.
Embeddable workers¶
As workers are just threads they are easily embeddable in any other Python process. This opens the door to two nice usages:
During automated tests a worker can be launched processing jobs exactly like a normal worker would do in production. What is more by using an in-memory broker there is no need for having a Redis server running during tests.
For small web projects, the task workers can be launched from the same process as the web application. As the application gets bigger the workers can be moved to a separate process very easily.
Logging¶
One issue I have with Celery is the way it handles logging: the framework tries to be too smart, resulting in great pain when the logging setup gets more complex.
That is why Spinach keeps it simple: as a well behaved library it uses the standard logging module and writes logs in its own loggers.
The choice of what to do with theses log records is up to the final user.
Jobs scheduled for the future¶
Spinach has full support for jobs that need to be executed in the future. These jobs go to a special queue until the are ready to be launched. At that time they are moved to a normal queue where they are picked by a worker.
Celery emulates this behavior by immediately sending the task to a worker and waiting there until the time has come to execute it. It means tasks cannot be scheduled much in advance without wasting resources in the worker.
Periodic jobs¶
One annoying thing with Celery is that you can launch as many distributed workers as you want but there must be one and only one Celery beat process running in the cluster at a time.
This approach does not work well with containerized applications that run in a cluster that often redeploys and move containers around.
All Spinach workers are part of the system that schedules periodic jobs, there is no need to have a pet in the cattle farm.
Only two brokers¶
Spinach lets the user pick between the in-memory broker for local development and the Redis broker for production. Both support exactly the same set of features.
Redis was chosen because it is an incredibly versatile database. With Lua scripting it becomes possible to develop entirely new patterns which are essential to create a useful and reliable task queue.
Other services like Google PubSub, Amazon SQS or AMQP are very opinionated and not as versatile as Redis, making them difficult to use within Spinach without cutting down on features.
Namespace¶
Multiple Spinach applications (production, staging…) can use the same Redis database without interfering with each other.
Likewise, a single interpreter can run multiple Spinach applications without them interfering with each other.
Minimize import side-effects¶
Spinach encourages users to write applications that have minimal side-effects when imported. There is no global state that gets created or modified when importing or using Spinach.
The user is free to use Spinach in a scoped fashion or declaring everything globally.
This makes it possible for a single interpreter to run multiple Spinach applications without them interfering with each other, which is particularly useful for running automated tests.
No worker entrypoint¶
Celery has this celery worker
entrypoint that can be launched from the command line to load an
application and spawn the workers.
The problem I often face is that I never know if a setting should be defined in my code as part of the app setup or as a flag of this command line.
Moreover command line flags and application settings often have slightly different names, making things more confusing.
Spinach thus makes it foolproof, you are responsible for configuring the Spinach app though your Python code. You can read settings from environment variables, from a file or anything else possible in Python.
It is then easy to use it to create your own entrypoint to launch the workers.
Schedule tasks in batch¶
A pattern that is used frequently with task queues is to periodically scan all entities and schedule an individual task for each entity that needs further work. For instance closing user accounts of member who haven’t logged in in a year.
With Celery this results in having to do as many round-trips to the broker as there are tasks to schedule. There are some workarounds but they just move the problem elsewhere.
Spinach supports sending tasks to the broker in batch to avoid this overhead.
Written for the Cloud¶
Latency between workers and Redis can be high, for example when they are deployed in two separate regions. Spinach leverages Lua scripting in Redis to avoid unnecessary round-trips by batching calls as much as possible.
In a cloud environment network connections can get dropped and packets get lost. Spinach retries failed actions after applying an exponential backoff with randomized jitter to avoid the thundering herd problem when the network gets back to normal.
Workers are expected to be deployed in containers, probably managed by an orchestrator like Kubernetes or Nomad that often scale and shuffle containers around. Workers can join and leave the cluster at any time without impacting the ability to process jobs.
Because worker processes can die unexpectedly (power loss, OOM killed, extended network outage…), Spinach tries to detect dead workers and reschedule the jobs that were running on them if the jobs are safe to be retried.
FAQ¶
Should I use Spinach?¶
Spinach was designed from the ground up to be reliable. It is built using proven technologies (Redis, Python queues, thread pools…), is heavily tested and in my experience just works.
The project has been around for long enough that I am now confident it is a good option among task frameworks. If after reading this documentation you feel like giving it a try, go for it!
Threads are not enough, can I use Processes?¶
Threading is the only concurrency primitive however it is possible to run many processes each containing one worker thread. This will open more connections to Redis, but Redis is known to support thousands of concurrent connections so this should not be a problem.
The best approach to achieve this is to rely on an init system like systemd, supervisord or docker. The init system will be responsible for spawning the correct number of processes and making sure they are properly restarted if they terminate prematurely.
Writing this init system yourself in Python using the multiprocessing module is possible but it must not import your actual application using Spinach. This is because mixing threads and forks in a single interpreter is a minefield. Anyway you are probably better off using a battle tested init system.
How do I get in touch?¶
Bug reports and feature requests can be sent on GitHub.
For help with integrating Spinach to a project or giving feedback there is the IRC channel #spinach on irc.libera.chat.
What is the licence?¶
Spinach is released under the BSD license
.
Hacking guide:
Contributing¶
This page contains the few guidelines and conventions used in the code base.
Pull requests¶
The development of Spinach happens on GitHub, the main repository is https://github.com/NicolasLM/spinach. To contribute to Spinach:
- Fork
NicolasLM/spinach
- Clone your fork
- Create a feature branch
git checkout -b my_feature
- Commit your changes
- Push your changes to your fork
git push origin my_feature
- Create a GitHub pull request against
NicolasLM/spinach
’s master branch
Note
Avoid including multiple commits in your pull request, unless it adds value to a future
reader. If you need to modify a commit, git commit --amend
is your friend. Write
a meaningful commit message, see How to write a commit message.
Python sources¶
The code base follows pep8 guidelines with lines wrapping at the 79th character. You can verify that the code follows the conventions with:
$ pycodestyle --ignore=E252,W503,W504 spinach tests
Running tests is an invaluable help when adding a new feature or when refactoring. Try to add the
proper test cases in tests/
together with your patch. The test suite can be run with pytest:
$ pytest tests
Because the Redis broker tests require a running Redis server, there is also a convenience tox.ini that runs all the tests and pep8 checks for you after starting Redis in a container via docker-compose. Simply running:
$ tox
will build a virtualenv, install Spinach and its dependencies into it, start the Redis server in the container, and run tests and pycodestyle, tearing down the Redis server container when done.
Compatibility¶
Spinach runs on all versions of Python starting from 3.6. Tests are run via GitHub actions to ensure that.
Documentation sources¶
Documentation is located in the doc
directory of the repository. It is written in
reStructuredText and built
with Sphinx.
If you modify the docs, make sure it builds without errors:
$ cd doc/
$ make html
The generated HTML pages should land in doc/_build/html
.
Internals¶
This page provides the basic information needed to start reading and modifying the source code of Spinach. It presents how it works inside and how the project is designed.
Todo
Document how Spinach works internally