This Asphalt framework component provides a WAMP (Web Application Message Protocol) client, implemented on top of the autobahn library.
Table of contents¶
Configuration¶
WAMP, being a routed protocol, requires a router to connect to. If you do not have one already, the reference implementation, Crossbar, should work nicely. The recommended way of setting it up is with Docker, though setting up a dedicated virtualenv for it would also do the trick.
Most WAMP clients need very little configuration. You usually have to set the realm name, host name (if not running on localhost) and port (if not running on port 8080) and TLS, if connecting to a remote instance securely.
Suppose you’re connecting to realm myrealm
on crossbar.example.org
, port 8181 using TLS,
your configuration would look like this:
components:
wamp:
realm: myrealmname
host: crossbar.example.org
port: 8181
tls: true
Your wamp client resource (default
) would then be accessible on the context as ctx.wamp
.
Multiple clients¶
You can also configure multiple WAMP clients if necessary. For that, you will need to have a structure along the lines of:
components:
wamp:
tls: true
clients:
wamp1:
realm: myrealmname
host: crossbar.example.org
port: 8181
wamp2:
realm: otherrealm
host: crossbar.company.com
In this example, two client resources (wamp1 / ctx.wamp1
and wamp2 / ctx.wamp2
) are
created. The first one is like the one in the previous example. The second connects to the realm
named otherrealm
on crossbar.company.com
on the default port using TLS. Setting
tls: true
(or any other option) on the same level as clients
means it’s the default value
for all clients.
For a comprehensive list of all client options, see the documentation of the the
WAMPClient
class.
User guide¶
The following sections explain how to use the most common functions of a WAMP client. The more advanced options have been documented in the API reference.
For practical examples, see the examples directory.
Calling remote procedures¶
To call a remote procedure, use the call()
method:
result = await ctx.wamp.call('procedurename', arg1, arg2, arg3='foo')
To receive progressive results from the call, you can give a callback as the on_progress
option:
def progress(status):
print('operation status: {}'.format(status))
result = await ctx.wamp.call('procedurename', arg1, arg2, arg3='foo',
options=dict(on_progress=progress))
To set a time limit for how long to wait for the call to complete, use the timeout
option:
# Wait 10 seconds until giving up
result = await ctx.wamp.call('procedurename', arg1, arg2, arg3='foo', options=dict(timeout=10))
Note
This will not stop the remote handler from finishing; it will just make the client stop waiting and discard the results of the call.
Registering procedure handlers¶
To register a procedure on the router, create a callable that takes a
CallContext
as the first argument and use the
call()
method to register it:
async def procedure_handler(ctx: CallContext, *args, **kwargs):
...
await ctx.wamp.register(procedure_handler, 'my_remote_procedure')
The handler can be either an asynchronous function or a regular function, but the latter will
obviously have fewer use cases due to the lack of await
.
To send progressive results, you can call the progress
callback on the
CallContext
object. For this to work, the caller must have used the
on_progress
option when making the call. Otherwise progress
will be None
.
For example:
async def procedure_handler(ctx: CallContext, *args, **kwargs):
for i in range(1, 11):
await asyncio.sleep(1)
if ctx.progress:
ctx.progress('{}% complete'.format(i * 10))
return 'Done'
await ctx.wamp.register(procedure_handler, 'my_remote_procedure')
Publishing messages¶
To publish a message on the router, call publish()
with the
topic as the first argument and then add any positional and keyword arguments you want to include
in the message:
await ctx.wamp.publish('some_topic', 'hello', 'world', another='argument')
By default, publications are not acknowledged by the router. This means that a published message
could be silently discarded if, for example, the publisher does not have proper permissions to
publish it. To avoid this, use the acknowledge
option:
await ctx.wamp.publish('some_topic', 'hello', 'world', another='argument',
options=dict(acknowledge=True))
Subscribing to topics¶
You can use the subscribe()
method to receive published
messages from the router:
async def subscriber(ctx: EventContext, *args, **kwargs):
print('new message: args={}, kwargs={}'.format(args, kwargs))
await ctx.wamp.subscribe(subscriber, 'some_topic')
Just like procedure handlers, subscription handlers can be either an asynchronous or regular functions.
Mapping WAMP exceptions to Python exceptions¶
Exceptions transmitted over WAMP are identified by a specific URI. WAMP errors can be mapped to
Python exceptions by linking a specific URI to a specific exception class by means of either
exception()
,
map_exception()
or
map_exception()
.
When you map an exception, you can raise it in your procedure or subscription handlers and it will be automatically translated using the given error URI so that the recipients will be able to properly map it on their end as well. Likewise, when a matching error is received from the router, the appropriate exception class is instantiated and raised in the calling code.
Any unmapped exceptions manifest themselves as ApplicationError
exceptions.
Using registries to structure your application¶
While it may at first seem convenient to register every procedure and subscription handler using
register()
and
subscribe()
, it does not scale very well when your
handlers are distributed over several packages and modules.
The WAMPRegistry
class provides an alternative to this.
Each registry object stores registered procedure handlers, subscription handlers and mapped
exceptions, and can apply defaults on each of these. Each registry can have a separate namespace
prefix so you don’t have to repeat it in every single procedure name, topic or mapped error.
Suppose you want to register two procedures and one subscriber, all under the foo
prefix and
you want to apply the invoke='roundrobin'
setting to all procedures:
from asphalt.wamp import WAMPRegistry
registry = WAMPRegistry('foo', procedure_defaults={'invoke': 'roundrobin'})
@registry.procedure
def multiply(ctx, factor1, factor2):
return factor1 * factor2
@registry.procedure
def divide(ctx, numerator, denominator):
return numerator / denominator
@registry.subscriber
def message_received(ctx, message):
print('new message: %s' % message)
To use the registry, pass it to the WAMP component as an option:
class ApplicationComponent(ContainerComponent):
async def start(ctx):
ctx.add_component('wamp', registry=registry)
await super.start(ctx)
This will register the foo.multiply
, foo.divide
procedures and a subscriptions for the
foo.message_received
topic.
Say your procedures and/or subscribers are spread over several modules and you want a different
namespace for every module, you could have a separate registry in every module and then combine
them into a single registry using add_from()
:
from asphalt.wamp import WAMPRegistry
from myapp.services import accounting, deliveries, production # these are modules
registry = WAMPRegistry()
registry.add_from(accounting.registry, 'accounting')
registry.add_from(deliveries.registry, 'deliveries')
registry.add_from(production.registry, 'production')
You can set the prefix either in the call to add_from()
or when creating the registry of each subsection. Note that if you do both, you end up with two
prefixes!
Implementing dynamic authentication and authorization¶
While static configuration of users and permissions may work for trivial applications, you will probably find yourself wanting for more flexibility for both authentication and authorization as your application grows larger. Crossbar, the reference WAMP router implementation, supports dynamic authentication and dynamic authorization. That means that instead of a preconfigured list of users or permissions, the router itself will call named remote procedures to determine whether the credentials are valid (authentication) or whether a session has permission to register/call a procedure or subscribe/publish to a topic (authorization).
The catch-22 in this is that the WAMP client that provides these procedures has to have
permission to register these procedures. This chicken and egg problem can be solved by providing
a trusted backdoor for this particular client. In the example below, the client providing the
authenticator and authorizer services connects via port 8081 which will be only made accessible for
that particular client. Unlike the other two configured roles, the server
role has a static
authorization configuration, which is required for this to work.
version: 2
workers:
- type: router
realms:
- name: myrealm
roles:
- name: regular
authorizer: authorize
- name: admin
authorizer: authorize
- name: server
permissions:
- uri: "*"
allow: {call: true, publish: true, register: true, subscribe: true}
transports:
- type: websocket
endpoint:
type: tcp
port: 8080
auth:
ticket:
type: dynamic
authenticator: authenticate
- type: websocket
endpoint:
type: tcp
port: 8081
auth:
anonymous:
type: static
role: server
The client performing the server
role will then register the authenticate()
and
authorize()
procedures on the router:
from typing import Dict
from asphalt.core import ContainerComponent
from asphalt.wamp import CallContext, WAMPRegistry
from autobahn.wamp.exception import ApplicationError
registry = WAMPRegistry()
users = {
'joe_average': ('1234', 'regular'),
'bofh': ('B3yt&4_+', 'admin')
}
@registry.procedure
def authenticate(ctx: CallContext, realm: str, auth_id: str, details: Dict[str, Any]):
# Don't do this in real apps! This is a security hazard!
# Instead, use a password hashing algorithm like argon2, scrypt or bcrypt
user = users.get(authid)
if user:
# This applies for "ticket" authentication as configured above
password, role = user
if password == details['ticket']:
return {'authrole': role}
raise ApplicationError(ApplicationError.AUTHENTICATION_FAILED, 'Authentication failed')
@registry.procedure
def authorize(ctx: CallContext, session: Dict[str, Any], uri: str, action: str):
# Cache any positive answers
if session['authrole'] == 'regular':
# Allow regular users to call and subscribe to public.*
if action in ('call', 'subscribe') and uri.startswith('public.'):
return {'allow': True, 'cache': True}
elif session['authrole'] == 'admin':
# Allow admins to call, subscribe and publish anything anywhere
# (but not register procedures)
if action in ('call', 'subscribe', 'publish'):
return {'allow': True, 'cache': True}
return {'allow': False}
class ServerComponent(ContainerComponent):
async def start(ctx):
ctx.add_component('wamp', registry=registry)
await super().start(ctx)
For more information, see the Crossbar documentation:
Warning
At the time of this writing (2017-04-29), caching of authorizer responses has not been implemented in Crossbar. This documentation assumes that it will be present in a future release.
Version history¶
This library adheres to Semantic Versioning.
2.2.2 (2018-03-02)
- Fixed error in
Client.stop()
when the session is alreadyNone
2.2.1 (2018-02-22)
- Fixed mapped custom exceptions being reported via asphalt-exceptions
2.2.0 (2018-02-15)
- Added integration with asphalt-exceptions
- Raised connection logging level to
INFO
- Added a configurable shutdown timeout
- Renamed
WAMPClient.close()
toWAMPClient.stop()
- Improved the reliability of the connection/session teardown process
2.1.0 (2017-09-21)
- Added the
protocol_options
option toWAMPClient
- Added the
connection_timeout
option toWAMPClient
2.0.1 (2017-06-07)
- Fixed failure to register option-less procedures and subscriptions added from a registry
2.0.0 (2017-06-07)
- BACKWARD INCOMPATIBLE Upgraded minimum Autobahn version to v17.5.1
- BACKWARD INCOMPATIBLE Changed the default value of the
path
option onWAMPClient
to/ws
to match the default Crossbar configuration - BACKWARD INCOMPATIBLE Changed subscriptions to use the
details
keyword argument to accept subscription details (sincedetails_arg
is now deprecated in Autobahn) - BACKWARD INCOMPATIBLE Replaced
SessionJoinEvent.session_id
with thedetails
attribute which directly exposes all session details provided by Autobahn - BACKWARD INCOMPATIBLE Changed the way registration/subscription/call/publish options are
passed. Keyword arguments were replaced with a single
options
keyword-only argument. - BACKWARD INCOMPATIBLE Registry-based subscriptions and exception mappings now inherit the parent prefixes, just like procedures did previously
- Added compatibility with Asphalt 4.0
- Added the
WAMPClient.details
property which returns the session details when joined to one - Fixed error during
WAMPClient.close()
if a connection attempt was in progress - Fixed minor documentation errors
1.0.0 (2017-04-29)
- Initial release