Henson¶
Henson is a library for building services that are driven by consumers. Henson applications read from objects that implement the Consumer Interface and provide the message received to a callback for processing. The messsage can be processed before handing it off to the callback, and the callback’s results can be processed after they are returned to the application.
Note
This documentation uses the async
/await
syntax introduced to Python
3.5 by way of PEP 492. If you
are using an older version of Python, replace async
with the
@asyncio.coroutine
decorator and await
with yield from
.
Installation¶
You can install Henson using Pip:
$ python -m pip install henson
You can also install it from source:
$ python setup.py install
Quickstart¶
from henson import Abort, Application
class FileConsumer:
"""Read lines from a file."""
def __init__(self, filename):
self.filename = filename
self._file = None
def __iter__(self):
"""FileConsumer objects are iterators."""
return self
def __next__(self):
"""Return the next line of the file, if available."""
if not self._file:
self._file = open(self.filename)
try:
return next(self._file)
except StopIteration:
self._file.close()
raise Abort('Reached end of file', None)
async def read(self):
"""Return the next line in the file."""
return next(self)
async def callback(app, message):
"""Print the message retrieved from the file consumer."""
print(app.name, 'received:', message)
return message
app = Application(
__name__,
callback=callback,
consumer=FileConsumer(__file__),
)
@app.startup
async def print_header(app):
"""Print a header for the file being processed."""
print('# Begin processing', app.consumer.filename)
@app.teardown
async def print_footer(app):
"""Print a footer for the file being processed."""
print('# Done processing', app.consumer.filename)
@app.message_preprocessor
async def remove_comments(app, line):
"""Abort processing of comments (lines that start with #)."""
if line.strip().startswith('#'):
raise Abort('Line is a comment', line)
return line
Running Applications¶
Henson provides a henson
command to run your applications from the command
line. To run the application defined in the quickstart above, cd
to the
directory containing the module and run:
$ henson run file_printer
Henson’s CLI can also be invoked by running the installed package as a script. To avoid confusion and prevent different installations of Henson from interfering with one another, this is the recommended way to run Henson applications:
$ python -m henson run file_printer
If a module contains only one instance of a Henson
Application
, python -m henson run
will automatically
detect and run it. If more than one instance exists, the desired application’s
name must be specified:
$ python -m henson run file_printer:app
This form always takes precedence over the former, and the henson
command
won’t attempt to auto-detect an instance even if there is a problem with the
name specified. If the attribute specified by the name after :
is callable,
python -m henson run
will call it and use the returned value as the
application. Any callable specified this way should require no arguments and
return an instance of Application
. Autodiscovery of
callables that return applications is not currently supported.
More detailed information about Henson’s command line interface can be found in Command Line Interface.
Logging¶
Henson applications provide a default logger. The logger returned by calling
logging.getLogger()
will be used. The name of the logger is the name
given to the application. Any configuration needed (e.g.,
logging.basicConfig()
, logging.config.dictConfig()
, etc.) should be
done before the application is started.
Debug Mode¶
Debugging with asyncio can be tricky. Henson provides a debug mode enables asyncio’s debug mode as well as debugging information through Henson’s logger.
Debug mode can be enabled through a configuration setting:
app.settings['DEBUG'] = True
or by providing a truthy value for debug
when calling
run_forever()
:
app.run_forever(debug=True)
Contents:
Consumer Interface¶
To work with Henson, a consumer must conform to the Consumer Interface. To
conform to the interface, the object must expose a coroutine()
function named read
.
Below is a sample implementation.
from henson import Abort, Application
class FileConsumer:
"""Read lines from a file."""
def __init__(self, filename):
self.filename = filename
self._file = None
def __iter__(self):
"""FileConsumer objects are iterators."""
return self
def __next__(self):
"""Return the next line of the file, if available."""
if not self._file:
self._file = open(self.filename)
try:
return next(self._file)
except StopIteration:
self._file.close()
raise Abort('Reached end of file', None)
async def read(self):
"""Return the next line in the file."""
return next(self)
async def callback(app, message):
"""Print the message retrieved from the file consumer."""
print(app.name, 'received:', message)
return message
app = Application(
__name__,
callback=callback,
consumer=FileConsumer(__file__),
)
@app.startup
async def print_header(app):
"""Print a header for the file being processed."""
print('# Begin processing', app.consumer.filename)
@app.teardown
async def print_footer(app):
"""Print a footer for the file being processed."""
print('# Done processing', app.consumer.filename)
@app.message_preprocessor
async def remove_comments(app, line):
"""Abort processing of comments (lines that start with #)."""
if line.strip().startswith('#'):
raise Abort('Line is a comment', line)
return line
Callbacks¶
Henson operates on messages through a series of asyncio.coroutine()
callback functions. Each callback type serves a unique purpose.
callback
¶
This is the only one of the callback settings that is required. Its purpose is to process the incoming message. If desired, it should return the result(s) of processing the message as an iterable.
async def callback(application, message):
return ['spam']
Application('name', callback=callback)
Note
There can only be one function registered as callback
.
error
¶
These callbacks are called when an exception is raised while processing a message.
app = Application('name')
@app.error
async def log_error(application, message, exception):
logger.error('spam')
Note
Exceptions raised while postprocessing a result will not be processed through these callbacks.
message_acknowledgement
¶
These callbacks are intended to acknowledge that a message has been received and should not be made available to other consumers. They run after a message and its result(s) have been fully processed.
app = Application('name')
@app.message_acknowledgement
async def acknowledge_message(application, original_message):
await original_message.acknowledge()
message_preprocessor
¶
These callbacks are called as each message is first received. Any modifications
they make to the message will be reflected in what is passed to callback
for processing.
app = Application('name')
@app.message_preprocessor
async def add_process_id(application, message):
message['pid'] = os.getpid()
return message
result_postprocessor
¶
These callbacks will operate on the result(s) of callback
. Each callback is
applied to each result.
app = Application('name')
@app.result_postprocessor
async def store_result(application, result):
with open('/tmp/result', 'w') as f:
f.write(result)
startup
¶
These callbacks will run as an application is starting.
app = Application('name')
@app.startup
async def connect_to_database(application):
await db.connect(application.settings['DB_HOST'])
teardown
¶
These callbacks will run as an application is shutting down.
app = Application('name')
@app.teardown
async def disconnect_from_database(application):
await db.close()
Command Line Interface¶
Henson provides the following command line interface.
henson¶
usage: henson [-h] [--version] [-a APP] {run} ...
-
-h
,
--help
¶
show this help message and exit
-
--version
¶
show program’s version number and exit
-
-a
,
--app
¶
the path to the application to run
henson run¶
Import and run an application.
usage: henson run [-h] [--verbose | --quiet] [-r] [-w WORKERS] [-d]
application-path
-
application-path
¶
the path to the application to run
-
-h
,
--help
¶
show this help message and exit
-
--verbose
,
-v
¶
verbose mode
-
--quiet
,
-q
¶
quiet mode
-
-r
,
--reloader
¶
reload the application on changes
-
-w
<workers>
,
--workers
<workers>
¶ the number of asynchronous tasks to run
-
-d
,
--debug
¶
enable debug mode
Further Details¶
When developing locally, applications often need to be restarted as changes are
made. To make this easier, Henson provides a --reloader
option to the
run
command. With this option enabled, Henson will watch an application’s
root directory and restart the application automatically when changes are
detected:
$ python -m henson run file_printer --reloader
Note
The --reloader
option is not recommended for production use.
It’s also possible to enable Henson’s Debug Mode through the --debug
option:
$ python -m henson run file_printer --debug
Note
The --debug
option is not recommended for production use.
This will also enable the reloader.
Extending the Command Line¶
For information about how to extension Henson’s command line interface, see Extending the Command Line.
Extensions¶
Extensions provide additional functionality to applications. Configuration management is shared between applications and extensions in a central location.
Using Extensions¶
from henson import Application
from henson_sqlite import SQLite
app = Application(__name__)
db = SQLite(app)
db.connection.execute('SELECT 1;')
Developing Extensions¶
Henson provides an Extension
base class to make
extension development easier.
from henson import Extension
class SQLite(Extension):
DEFAULT_SETTINGS = {'SQLITE_CONNECTION_STRING': ':memory:'}
def __init__(self, app=None):
self._connection = None
super().__init__(app)
@property
def connection(self):
if not self._connection:
conn_string = self.app.settings['SQLITE_CONNECTION_STRING']
self._connection = sqlite3.connect(conn_string)
return self._connection
The Extension
class provides two special attributes
that are meant to be overridden:
DEFAULT_SETTINGS
provides default values for an extension’s settings during theinit_app()
step. When a value is used by an extension and has a sensible default, it should be stored here (e.g., a database hostname).REQUIRED_SETTINGS
provides a list of keys that are checked for existence during theinit_app()
step. If one or more required settings are not set on the application instance assigned to the extension, aKeyError
is raised. Extensions should set this when a value is required but has no default (e.g., a database password).
Extending the Command Line¶
Henson offers an extensible command line interface. To register your own
commands, use register_commands()
. Any function passed to it
will have its usage created directly from its signature. During the course of
initializing the application for use with the extension (i.e.,
init_app()
), Henson will check for a method
on the extension’s instance named register_cli
and call it. If you place
any calls to register_commands()
inside it, the command line
interface will be extended automatically.
In order to access the new commands, the henson
command line utility must
be given a reference to an Application
. This is done
through the --app
argument:
$ henson --app APP_PATH
Note
For details about the syntax to use when passing a reference to an
Application
, see Running Applications.
A positional argument in the Python function will result in a required positional argument in the command:
def trash(grouch):
pass
$ henson --app APP_PATH NAMESPACE trash GROUCH
A keyword argument in the Python function will result in a positional argument in the command with a default value to be used when the argument is omitted:
def trash(grouch='oscar'):
pass
$ henson --app APP_PATH NAMESPACE trash [GROUCH]
A keyword-only argument in the Python function will result in an optional argument in the command:
def trash(*, grouch='oscar'):
pass
$ henson --app APP_PATH NAMESPACE trash [--grouch GROUCH]
By default, all optional arguments will have a flag that matches the function argument’s name. When no other optional arguments start with the same character, a single-character abbreviated flag can also be used.
$ henson --app APP_PATH NAMESPACE trash [-g GROUCH]
The trash
function can then be registered with the CLI:
register_commands('sesame', [trash])
$ henson --app APP_PATH sesame trash --help
Additionally, if a command includes a quiet
or verbose
argument, it
will automatically receive the count of the number of times it was specified
(e.g., -v
will have the value 1
, -vv
will have the value 2
).
When both arguments are included, they will be added as a mutually exclusive
group.
Note
Due to how argparse
handles argument counts, quiet
and verbose
will be set to None
rather than 0
when the flag isn’t specified when the command is
invoked.
$ henson --app APP_PATH sesame trash -vvvv
$ henson --app APP_PATH sesame trash --quiet
Available Extensions¶
Several extensions are available for use:
contrib
Packages¶
While it is possible to build your own plugins, the Henson contrib package contains those that we think will most enhance your application.
Retry¶
Retry is a plugin to add the ability for Henson applications to automatically retry messages that fail to process.
Warning
Retry registers itself as an error callback on the
Application
instance. When doing so, it inserts itself
at the beginning of the list of error callbacks. It does this so that it can
prevent other callbacks from running.
If you have an error callback that you want to run even when retrying a message, you will need to manually inject it into the list of error callbacks after initializing Retry.
Configuration¶
Retry provides a couple of settings to control how many times a message will be
retried. RETRY_THESHOLD
and RETRY_TIMEOUT
work in tandem. If values are
specified for both, whichever limit is reached first will cause Henson to stop
retrying the message. By default, Henson will try forever (yes, this is
literally insane).
RETRY_BACKOFF |
A number that, if provided, will be used in conjunction with the number of retry attempts already made to calculate the total delay for the current retry. Defaults to 1. |
RETRY_CALLBACK |
A coroutine that encapsulates the functionality
needed to retry the message. TypeError will be
raised if the callback isn’t a
coroutine() . |
RETRY_DELAY |
The number of seconds to wait before scheduling a
retry. If RETRY_BACKOFF has a value greater than
1, the delay will increase between each retry.
Defaults to 0. |
RETRY_EXCEPTIONS |
An exception or tuple of exceptions that will cause
Henson to retry the message. Defaults to
RetryableException . |
RETRY_THRESHOLD |
The maximum number of times that a Henson
application will try to process a message before
marking it as a failure. if set to 0, the message
will not be retried. If set to None, the limit will
be controlled by RETRY_TIMEOUT . Defaults to
None. |
RETRY_TIMEOUT |
The maximum number of seconds during which a message
can be retried. If set to None, the limit will be
controlled by RETRY_THRESHOLD . Defaults to None. |
Usage¶
Application definition:
from henson import Application
from henson.contrib.retry import Retry
async def print_message(app, message):
print(message)
app = Application('retryable-application', callback=my_callback)
app.settings['RETRY_CALLBACK'] = print_message
Retry(app)
Somwhere inside the application:
from henson.contrib.retry import RetryableException
async def my_callback(app, message):
raise RetryableException
API¶
-
class
henson.contrib.retry.
Retry
(app=None)[source]¶ A class that adds retries to an application.
-
init_app
(app)[source]¶ Initialize an
Application
instance.Parameters: app (henson.base.Application) – Application instance to be initialized.
Raises: TypeError
– If the callback isn’t a coroutine.ValueError
– If the delay or backoff is negative.
-
Sphinx¶
The Sphinx contrib plugin adds a directive that can be used to document extensions to the Henson command line interface.
-
class
henson.contrib.sphinx.
HensonCLIDirective
(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]¶ A Sphinx directive that can be used to document a CLI extension.
This class wraps around autoprogram to generate Sphinx documentation for extensions that extend the Henson CLI.
.. hensoncli:: henson_database:Database :start_command: db
New in version 1.1.0.
Changed in version 1.2.0: The
prog
option will default to the proper way to invoke command line extensions.
For full details of the options support by the hensoncli
directive, please
refer to the
sphinxcontrib-autoprogram documentation.
API¶
Here’s the public API for Henson.
Application¶
-
class
henson.base.
Application
(name, settings=None, *, consumer=None, callback=None)[source]¶ A service application.
Each message received from the consumer will be passed to the callback.
Parameters: - name (str) – The name of the application.
- settings (Optional[object]) – An object with attributed-based settings.
- consumer (optional) – Any object that is an iterator or an
iterable and yields instances of any type that is supported
by
callback
. While this isn’t required, it must be provided before the application can be run. - callback (Optional[asyncio.coroutine]) – A callable object that
takes two arguments, an instance of
henson.base.Application
and the (possibly) preprocessed incoming message. While this isn’t required, it must be provided before the application can be run.
-
error
(callback)[source]¶ Register an error callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes three arguments: an instance of henson.base.Application
, the incoming message, and the exception that was raised. It will be called any time there is an exception while reading a message from the queue.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
-
message_acknowledgement
(callback)[source]¶ Register a message acknowledgement callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes two arguments: an instance of henson.base.Application
and the original incoming message as its only argument. It will be called once a message has been fully processed.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
-
message_preprocessor
(callback)[source]¶ Register a message preprocessing callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes two arguments: an instance of henson.base.Application
and the incoming message. It will be called for each incoming message with its result being passed tocallback
.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
-
result_postprocessor
(callback)[source]¶ Register a result postprocessing callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes two arguments: an instance of henson.base.Application
and a result of processing the incoming message. It will be called for each result returned fromcallback
.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
-
run_forever
(num_workers=1, loop=None, debug=False)[source]¶ Consume from the consumer until interrupted.
Parameters: - num_workers (Optional[int]) – The number of asynchronous tasks to use to process messages received through the consumer. Defaults to 1.
- loop (Optional[asyncio.asyncio.BaseEventLoop]) – An event loop that, if provided, will be used for running the application. If none is provided, the default event loop will be used.
- debug (Optional[bool]) – Whether or not to run with debug mode enabled. Defaults to True.
Raises: TypeError
– If the consumer is None or the callback isn’t a coroutine.Changed in version 1.2: Unhandled exceptions resulting from processing a message while the consumer is still active will stop cause the application to shut down gracefully.
-
startup
(callback)[source]¶ Register a startup callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes an instance of Application
as its only argument. It will be called once when the application first starts up.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
-
teardown
(callback)[source]¶ Register a teardown callback.
Parameters: callback (asyncio.coroutine) – A callable object that takes an instance of Application
as its only argument. It will be called once when the application is shutting down.Returns: The callback. Return type: asyncio.coroutine Raises: TypeError
– If the callback isn’t a coroutine.
Command Line Interface¶
Collection of Henson CLI tasks.
-
henson.cli.
register_commands
(namespace, functions, namespace_kwargs=None, func_kwargs=None)[source]¶ Register commands with the henson CLI.
The signature of each function provided through
functions
will be mapped to its command’s interface. Any positional arguments in the function’s signature will become required positional arguments to the command. Keyword arguments in the signature will also become positional arguments, although they will use the default value from the signature when not specified on the command line. Keyword-only arguments in the signature will become optional arguments on the command line.Parameters: - namespace (str) – A name representing the group of commands. The namespace is required to access the commands being added.
- functions (List[callable]) – A list of callables that are used to
create subcommands. More details can be found in the
documentation for
add_commands()
.
Note
This function is a wrapper around
add_commands()
. Please refer to its documentation for any arguments not explained here.New in version 1.1.0.
Configuration¶
-
class
henson.config.
Config
[source]¶ Custom mapping used to extend and override an app’s settings.
Exceptions¶
Custom exceptions used by Henson.
-
exception
henson.exceptions.
Abort
(reason, message)[source]¶ An exception that signals to Henson to stop processing a message.
When this exception is caught by Henson it will immediately stop processing the message. None of the remaining callbacks will be called.
If the exception is caught while processing a result, that result will no longer be processed. Any other results generated by the same message will still be processed.
Parameters: - reason (str) – The reason the message is being aborted. It should be in the form of “noun.verb” (e.g., “provider.ignored”).
- message – The message that is being aborted. Usually this will be the incoming message, but it can also be the result.
Extensions¶
-
class
henson.extensions.
Extension
(app=None)[source]¶ A base class for Hension extensions.
Parameters: app (Optional[henson.base.Application]) – An application instance that has an attribute named settings that contains a mapping of settings to interact with a database. -
DEFAULT_SETTINGS
¶ A
dict
of default settings for the extension.When a setting is not specified by the application instance and has a default specified, the default value will be used. Extensions should define this where appropriate. Defaults to
{}
.
-
REQUIRED_SETTINGS
¶ An
iterable
of required settings for the extension.When an extension has required settings that do not have default values, their keys may be specified here. Upon extension initialization, an exception will be raised if a value is not set for each key specified in this list. Extensions should define this where appropriate. Defaults to
()
.
-
app
¶ Return the registered app.
-
init_app
(app)[source]¶ Initialize the application.
In addition to associating the extension’s default settings with the application, this method will also check for the extension’s required settings.
Parameters: app (henson.base.Application) – An application instance that will be initialized.
-
Changelog¶
Version 1.2.0¶
Released 2018-04-04
- Unhandled exceptions raised while processing a message will stop the application
- Set the event loop when running with the reloader
Version 1.1.0¶
Released 2016-11-11
- Add
henson.cli.register_commands
to extend the command line interface - Messages are logged using
logging.DEBUG
instead oflogging.INFO
- Calls to
print
inhenson.cli.run
are updated toapp.logger.info
- References to objects used by
henson.Application
are removed once they are no longer needed to allow the memory to be freed up before the next message is received. - uvloop will be used for the event loop if it’s installed.
- Automatically register extensions to a registry on the application
- Add
hensoncli
Sphinx directive to document extensions to the command line interface henson.cli.run
and any command line extensions that request it supportquiet
andverbose
flags to set verbosity