retools - A Python Redis Toolset¶
retools is a concise set of well-tested extensible Python Redis tools.
retools is available on PyPI at https://pypi.python.org/pypi/retools
- Caching
- Hit/Miss Statistics
- Regions for common expiration periods and invalidating batches of functions at once.
- Write-lock to prevent the Thundering Herd
- Distributed Locking
- Python context-manager with lock timeouts and retries
- Queuing
- Simple forking worker based on Resque
- Jobs stored as JSON in Redis for easy introspection
- setproctitle used by workers for easy worker introspection on the command line
- Rich event system for extending job processing behavior
- Limiter
- Useful for making sure that only N operations for a given process happen at the same time
- Well Tested [1]
- 100% statement coverage
- 100% condition coverage (via instrumental)
Reference Material¶
Reference material includes documentation for every retools API.
API Documentation¶
Comprehensive reference material for every public API exposed by retools is available within this chapter. The API documentation is organized alphabetically by module name.
retools.cache¶
Caching
Cache regions are used to simplify common expirations and group function caches.
To indicate functions should use cache regions, apply the decorator:
from retools.cache import cache_region
@cache_region('short_term')
def myfunction(arg1):
return arg1
To configure the cache regions, setup the CacheRegion object:
from retools.cache import CacheRegion
CacheRegion.add_region("short_term", expires=60)
Constants¶
- retools.cache.NoneMarker¶
A module global returned to indicate no value is present in Redis rather than a None object.
Functions¶
- retools.cache.cache_region(region, *deco_args, **kwargs)[source]¶
Decorate a function such that its return result is cached, using a “region” to indicate the cache arguments.
Parameters: - region (string) – Name of the region to cache to
- *deco_args – Optional str()-compatible arguments which will uniquely identify the key used by this decorated function, in addition to the positional arguments passed to the function itself at call time. This is recommended as it is needed to distinguish between any two functions or methods that have the same name (regardless of parent class or not).
Note
The function being decorated must only be called with positional arguments, and the arguments must support being stringified with str(). The concatenation of the str() version of each argument, combined with that of the *args sent to the decorator, forms the unique cache key.
Example:
from retools.cache import cache_region @cache_region('short_term', 'load_things') def load(search_term, limit, offset): '''Load from a database given a search term, limit, offset.''' return database.query(search_term)[offset:offset + limit]
The decorator can also be used with object methods. The self argument is not part of the cache key. This is based on the actual string name self being in the first argument position:
class MyThing(object): @cache_region('short_term', 'load_things') def load(self, search_term, limit, offset): '''Load from a database given a search term, limit, offset.''' return database.query(search_term)[offset:offset + limit]
Classmethods work as well - use cls as the name of the class argument, and place the decorator around the function underneath @classmethod:
class MyThing(object): @classmethod @cache_region('short_term', 'load_things') def load(cls, search_term, limit, offset): '''Load from a database given a search term, limit, offset.''' return database.query(search_term)[offset:offset + limit]
Note
When a method on a class is decorated, the self or cls argument in the first position is not included in the “key” used for caching.
- retools.cache.invalidate_region(region)[source]¶
Invalidate all the namespace’s in a given region
Note
This does not actually clear the region of data, but just sets the value to expire on next access.
Parameters: region (string) – Region name
- retools.cache.invalidate_function(callable, *args)¶
Invalidate the cache for a callable
Parameters: - callable (callable object) – The callable that was cached
- *args – Arguments the function was called with that should be invalidated. If the args is just the differentiator for the function, or not present, then all values for the function will be invalidated.
Example:
@cache_region('short_term', 'small_engine') def local_search(search_term): # do search and return it @cache_region('long_term') def lookup_folks(): # look them up and return them # To clear local_search for search_term = 'fred' invalidate_function(local_search, 'fred') # To clear all cached variations of the local_search function invalidate_function(local_search) # To clear out lookup_folks invalidate_function(lookup_folks)
Classes¶
- class retools.cache.CacheKey(region, namespace, key, today=None)[source]¶
Cache Key object
Generator of cache keys for a variety of purposes once provided with a region, namespace, and key (args).
- class retools.cache.CacheRegion[source]¶
CacheRegion manager and configuration object
For organization sake, the CacheRegion object is used to configure the available cache regions, query regions for currently cached keys, and set batches of keys by region for immediate expiration.
Caching can be turned off globally by setting enabled to False:
CacheRegion.enabled = False
Statistics should also be turned on or off globally:
CacheRegion.statistics = False
However, if only some namespaces should have statistics recorded, then this should be used directly.
- classmethod add_region(name, expires, redis_expiration=604800)[source]¶
Add a cache region to the current configuration
Parameters: - name (string) – The name of the cache region
- expires (integer) – The expiration in seconds.
- redis_expiration (integer) – How long the Redis key expiration is set for. Defaults to 1 week.
- classmethod invalidate(region)[source]¶
Invalidate an entire region
Note
This does not actually clear the region of data, but just sets the value to expire on next access.
Parameters: region (string) – Region name
- classmethod load(region, namespace, key, regenerate=True, callable=None, statistics=None)[source]¶
Load a value from Redis, and possibly recreate it
This method is used to load a value from Redis, and usually regenerates the value using the callable when provided.
If regenerate is False and a callable is not passed in, then NoneMarker will be returned.
Parameters: - region (string) – Region name
- namespace (string) – Namespace for the value
- key (string) – Key for this value under the namespace
- regenerate (bool) – If False, then existing keys will always be returned regardless of cache expiration. In the event that there is no existing key and no callable was provided, then a NoneMarker will be returned.
- callable – A callable to use when the cached value needs to be created
- statistics (bool) – Whether or not hit/miss statistics should be updated
retools.exc¶
retools exceptions
retools.limiter¶
Generic Limiter to ensure N parallel operations
Note
The limiter functionality is new. Please report any issues found on the retools Github issue tracker.
The limiter is useful when you want to make sure that only N operations for a given process happen at the same time, i.e.: concurrent requests to the same domain.
The limiter works by acquiring and releasing limits.
Creating a limiter:
from retools.limiter import Limiter
def do_something():
limiter = Limiter(limit=10, prefix='my-operation') # using default redis connection
for i in range(100):
if limiter.acquire_limit('operation-%d' % i):
execute_my_operation()
limiter.release_limit('operation-%d' % i) # since we are releasing it synchronously
# all the 100 operations will be performed with
# one of them locked at a time
Specifying a default expiration in seconds:
def do_something():
limiter = Limiter(limit=10, expiration_in_seconds=45) # using default redis connection
Specifying a redis connection:
def do_something():
limiter = Limiter(limit=10, redis=my_redis_connection)
Every time you try to acquire a limit, the expired limits you previously acquired get removed from the set.
This way if your process dies in the mid of its operation, the keys will eventually expire.
Public API Classes¶
- class retools.limiter.Limiter(limit, redis=None, prefix='retools_limiter', expiration_in_seconds=10)[source]¶
Configures and limits operations
- __init__(limit, redis=None, prefix='retools_limiter', expiration_in_seconds=10)[source]¶
Initializes a Limiter.
Parameters: - limit – An integer that describes the limit on the number of items
- redis – A Redis instance. Defaults to the redis instance on the global_connection.
- prefix – The default limit set name. Defaults to ‘retools_limiter’.
- expiration_in_seconds – The number in seconds that keys should be locked if not explicitly released.
- acquire_limit(key, expiration_in_seconds=None, retry=True)[source]¶
Tries to acquire a limit for a given key. Returns True if the limit can be acquired.
Parameters: - key – A string with the key to acquire the limit for. This key should be used when releasing.
- expiration_in_seconds – The number in seconds that this key should be locked if not explicitly released. If this is not passed, the default is used.
- key – Internal parameter that specifies if the operation should be retried. Defaults to True.
retools.lock¶
A Redis backed distributed global lock
This code uses the formula here: https://github.com/jeffomatic/redis-exp-lock-js
It provides several improvements over the original version based on: http://chris-lamb.co.uk/2010/06/07/distributing-locking-python-and-redis/
It provides a few improvements over the one present in the Python redis library, for example since it utilizes the Lua functionality, it no longer requires every client to have synchronized time.
Classes¶
- class retools.lock.Lock(key, expires=60, timeout=10, redis=None)[source]¶
- __init__(key, expires=60, timeout=10, redis=None)[source]¶
Distributed locking using Redis Lua scripting for CAS operations.
Usage:
with Lock('my_lock'): print "Critical section"
Parameters: - expires – We consider any existing lock older than expires seconds to be invalid in order to detect crashed clients. This value must be higher than it takes the critical section to execute.
- timeout – If another client has already obtained the lock, sleep for a maximum of timeout seconds before giving up. A value of 0 means we never wait.
- redis – The redis instance to use if the default global redis connection is not desired.
retools.queue¶
Queue worker and manager
Note
The queueing functionality is new, and has gone through some preliminary testing. Please report any issues found on the retools Github issue tracker.
Any function that takes keyword arguments can be a job that a worker runs. The QueueManager handles configuration and enqueing jobs to be run.
Declaring jobs:
# mypackage/jobs.py
# jobs
def default_job():
# do some basic thing
def important(somearg=None):
# do an important thing
# event handlers
def my_event_handler(sender, **kwargs):
# do something
def save_error(sender, **kwargs):
# record error
Running Jobs:
from retools.queue import QueueManager
qm = QueueManager()
qm.subscriber('job_failure', handler='mypackage.jobs:save_error')
qm.subscriber('job_postrun', 'mypackage.jobs:important',
handler='mypackage.jobs:my_event_handler')
qm.enqueue('mypackage.jobs:important', somearg='fred')
Note
The events for a job are registered with the QueueManager and are encoded in the job’s JSON blob. Updating events for a job will therefore only take effect for new jobs queued, and not existing ones on the queue.
Events¶
The retools queue has events available for additional functionality without having to subclass or directly extend retools. These functions will be run by the worker when the job is handled.
Available events to register for:
- job_prerun: Runs immediately before the job is run.
- job_wrapper: Wraps the execution of the job, these should be context managers.
- job_postrun: Runs after the job completes successfully, this will not be run if the job throws an exception.
- job_failure: Runs when a job throws an exception.
Event Function Signatures¶
Event functions have different call semantics, the following is a list of how the event functions will be called:
- job_prerun: (job=job_instance)
- job_wrapper: (job_function, job_instance, **job_keyword_arguments)
- job_postrun: (job=job_instance, result=job_function_result)
- job_failure: (job=job_instance, exc=job_exception)
Attributes of interest on the job instance are documented in the Job.__init__() method.
Running the Worker¶
After installing retools, a retools-worker command will be available that can spawn a worker. Queues to watch can be listed in order for priority queueing, in which case the worker will try each queue in order looking for jobs to process.
Example invocation:
$ retools-worker high,medium,main
Public API Classes¶
- class retools.queue.QueueManager(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
Configures and enqueues jobs
- __init__(redis=None, default_queue_name='main', serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
Initialize a QueueManager
Parameters: - redis – A Redis instance. Defaults to the redis instance on the global_connection.
- default_queue_name – The default queue name. Defaults to ‘main’.
- serializer – A callable to serialize json data, defaults to json.dumps().
- deserializer – A callable to deserialize json data, defaults to json.loads().
- enqueue(job, **kwargs)[source]¶
Enqueue a job
Parameters: - job – The pkg_resouce name of the function. I.e. retools.jobs:my_function
- kwargs – Keyword arguments the job should be called with. These arguments must be serializeable by JSON.
Returns: The job id that was queued.
Private API Classes¶
- class retools.queue.Job(queue_name, job_payload, redis, serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
- __init__(queue_name, job_payload, redis, serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
Create a job instance given a JSON job payload
Parameters: - job_payload – A JSON string representing a job.
- queue_name – The queue this job was pulled off of.
- redis – The redis instance used to pull this job.
A Job instance is created when the Worker pulls a job payload off the queue. The current_job global is set upon creation to indicate the current job being processed.
Attributes of interest for event functions:
- job_id: The Job’s ID
- job_name: The Job’s name (it’s package + function name)
- queue_name: The queue this job came from
- kwargs: The keyword arguments the job is called with
- state: The state dict, this can be used by events to retain additional arguments. I.e. for a retry extension, retry information can be stored in the state dict.
- func: A reference to the job function
- redis: A redis.Redis instance.
- serializer: A callable to serialize json data, defaults to json.dumps().
- deserializer: A callable to deserialize json data, defaults to json.loads().
- class retools.queue.Worker(queues, redis=None, serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
A Worker works on jobs
- __init__(queues, redis=None, serializer=<function dumps at 0x7fbbd1c59ed8>, deserializer=<function loads at 0x7fbbd1c59e60>)[source]¶
Create a worker
Parameters: - queues (list) – List of queues to process
- redis – Redis instance to use, defaults to the global_connection.
In the event that there is only a single queue in the list Redis list blocking will be used for lower latency job processing
- work(interval=5, blocking=False)[source]¶
Work on jobs
This is the main method of the Worker, and will register itself with Redis as a Worker, wait for jobs, then process them.
Parameters: - interval (int) – Time in seconds between polling.
- blocking (bool) – Whether or not blocking pop should be used. If the blocking pop is used, then the worker will block for interval seconds at a time waiting for a new job. This affects how often the worker can respond to signals.
Changelog¶
0.4.1 (02/19/2014)¶
Bug Fixes¶
- Properly support StrictRedis with ZADD (used in the limiter). Patch by Bernardo Heynemann.
0.3 (08/13/2012)¶
Bug Fixes¶
- Call redis.expire with proper expires value for RedisLock. Patch by Mike McCabe.
- Use functools.wraps to preserve doc strings for cache_region. Patch by Daniel Holth.
API Changes¶
- Added get_job/get_jobs methods to QueueManager class to get information on a job or get a list of jobs for a queue.
0.2 (02/01/2012)¶
Bug Fixes¶
- Critical fix for caching that prevents old values from being displayed forever. Thanks to Daniel Holth for tracking down the problem-aware.
- Actually sets the Redis expiration for a value when setting the cached value in Redis. This defaults to 1 week.
Features¶
- Statistics for the cache is now optional and can be disabled to slightly reduce the Redis queries used to store/retrieve cache data.
- Added first revision of worker/job Queue system, with event support.
Internals¶
- Heavily refactored Connection to not be a class singleton, instead a global_connection instance is created and used by default.
- Increased conditional coverage to 100% (via instrumental).
Backwards Incompatibilities¶
Changing the default global Redis connection has changed semantics, instead of using Connection.set_default, you should set the global_connection’s redis property directly:
import redis from retools import global_connection global_connection.redis = redis.Redis(host='myhost')
Incompatibilities¶
- Removed clear argument from invalidate_region, as removing keys from the set but not removing the hit statistics can lead to data accumulating in Redis that has no easy removal other than .keys() which should not be run in production environments.
- Removed deco_args from invalidate_callable (invalidate_function) as its not actually needed since the namespace is already on the callable to invalidate.