Seth-Pusher User Documentation

Seth-Pusher is tornado backed pub/sub websocket server implementation designed as a set of replacable components :

$ pip install -U sethpusher

Seth-Pusher’s may also be used with redis pub/sub capabilities

$ pip install -U redis && pip install -U hiredis && pip install -U tornado-redis==2.4.18
Tutorial
A quick Seth-Pusher tutorial & overview.
Core API
The complete API documentation.

Tutorial

This tutorial introduces Seth-Pusher core concepts by example.

Getting started

Before starting installation, creation of virtualenv is advised. To install server run:

$ pip install -U sethpusher

Basic usage

$ seth-pusher --debug=true

SethPusher relies on sockjs so developing js client make sure to take a look at:

  1. https://github.com/mrjoes/sockjs-tornado
  2. https://github.com/sockjs/sockjs-client

When debug flag is set to true we can inspect basic statistics by:

$ curl http://localhost:8080/debug/

or

$ curl curl http://localhost:8080/debug/channels/<channel_name>/

Using redis pubsub functionality

It requires redis-server and additional setup so first:

$ sudo apt-get install redis-server
$ sudo service redis-server start

Install additional requirements:

$ pip install -U redis
$ pip install -U hiredis (optionally)
$ pip install -U tornado-redis==2.4.18

Run:

$ seth-pusher --debug=true --app='sethpusher.applications.SethPusherOnRedisApplication'

Specify host & port

$ seth-pusher --debug=true --host=127.0.0.1 --port=8003

Connecting via websocket or other transport

By default predefined applications expose transport connection via /connect endpoint.

Connecting, subscribing & unsubscring

Those messages are handled by /connect endpoint by default.

  1. Connecting:

    {
        'type': 'connect',
        'subscriber_id': '<unique_user_id>'
    }
    
  2. Subscribing:

    {
        'type': 'subscribe',
        'channel': '<channel_name>'
    }
    
  3. Unsubscribing:

    {
        'type': 'unsubscribe',
        'channel': '<channel_name>'
    }
    

Publishing to channels

In order to subscribe/unsubsribe from channel send proper message to ws handler.

$ curl -X POST -H "Content-Type: application/json" -d '{"my": "data"}' http://localhost:8080/api/channels/<channel_name>/

Sending message to user

By default every user that connects to sethpusher is subscribed to all channel which allows us to send him a private message.

$ curl -X POST -H "Content-Type: application/json" -d '{"my": "data"}' http://localhost:8080/api/users/<subscriber_id>/

or

$ curl -X POST -H "Content-Type: application/json" -d '{"my": "data", "sender_id": "<author_id>"}' http://localhost:8080/api/users/<subscriber_id>/

Creating your own app

Do You want your own application class ? Use our AppRunner and mixins / components.

  1. Create file my_app.py
  2. Run:
$ python my_app.py --logging=debug
  1. Test it with:
$ curl http://localhost:6666/debug/\?batman\=bruce_wayne

Code:

from tornado.options import parse_command_line

from sethpusher.default_server import AppRunner
from sethpusher.applications import SethPusherInMemoryApplication
from sethpusher.authenticators import ApiKeyAuthenticationPolicy
from sethpusher.persistors import ExternalApiPersistencePolicy


class MyPersistence(ExternalApiPersistencePolicy):
    api_url = 'http://www.my_super_api.com'

    def persist(self, data, **kwargs):
        print data
        print kwargs
        print "KABOOM !"


class MyAuth(ApiKeyAuthenticationPolicy):
    key_name = u'batman'
    key_secret_value = u'bruce_wayne'


class MyVeryOwnApp(SethPusherInMemoryApplication):

    def get_authentication_policy(self, **kwargs):
        return MyAuth()

    def get_persistence_policy(self, **kwargs):
        return MyPersistence()


if __name__ == '__main__':
    parse_command_line()
    runner = AppRunner(port=6666, debug=True)
    runner.start(app=MyVeryOwnApp)

Core API

class sethpusher.applications.DefaultSethApplication(**kwargs)

Bases: tornado.web.Application

authentication_policy = None
get_authentication_policy(**kwargs)
get_dispatcher()
get_persistence_policy(**kwargs)
persist(data, **kwargs)
persistence_policy = None
class sethpusher.applications.SethPusherInMemoryApplication(**kwargs)

Bases: sethpusher.applications.DefaultSethApplication, sethpusher.mixins.InMemoryPubSubMixin

class sethpusher.applications.SethPusherOnRedisApplication(**kwargs)

Bases: sethpusher.applications.DefaultSethApplication, sethpusher.mixins.RedisPubSubMixin

class sethpusher.authenticators.ApiKeyAuthenticationPolicy

Bases: sethpusher.authenticators.BaseAuthenticationPolicy

authenticate(request, **kwargs)
key_name = u'api_key'
key_secret_value = u'secret'
class sethpusher.authenticators.BaseAuthenticationPolicy

Bases: object

authenticate(request, **kwargs)
class sethpusher.authenticators.EchoAuthenticationPolicy

Bases: sethpusher.authenticators.BaseAuthenticationPolicy

authenticate(request, **kwargs)
class sethpusher.authenticators.HeaderAuthenticationPolicy

Bases: sethpusher.authenticators.BaseAuthenticationPolicy

authenticate(request, **kwargs)
secret_header = u'Seth-Authentication-Header'
secret_header_value = u'Seth-Value'
class sethpusher.default_server.AppRunner(*args, **kwargs)

Bases: object

host = '127.0.0.1'
port = 8080
start(app)
sethpusher.default_server.main()
sethpusher.default_server.shutdown(server_instance)
class sethpusher.dispatchers.DefaultMessageDispatcher

Bases: object

CONNECT = 'connect'
DISCONNECT = 'disconnect'
SUBSCRIBE = 'subscribe'
UNSUBSCRIBE = 'unsubscribe'
channels
dispatch(msg)
handle_connect(msg)
handle_subscribe(msg)
handle_unsubscribe(msg)
is_connected
register_handler(handler)
validate_message(msg)
class sethpusher.mixins.InMemoryPubSubMixin

Bases: sethpusher.mixins.PubSubMixin

get_publisher(**kwargs)
get_subscriber(**kwargs)
get_subscriber_count(channel=None)
get_subscribers(channel=None)
get_user(user_id)
pub_sub
class sethpusher.mixins.PubSubMixin

Bases: object

add_subscriber(channel, subscriber)
broadcast(message, channel=None)
get_publisher()
get_subscriber()
get_subscriber_count(channel=None)
get_subscribers(channel=None)
get_user(user)
remove_subscriber(channel, subscriber)
send_to_channel(msg, channel)
send_to_user(msg, user)
class sethpusher.mixins.RedisPubSubMixin

Bases: sethpusher.mixins.PubSubMixin

add_subscriber(channel, subscriber)
get_publisher(**kwargs)
get_subscriber(**kwargs)
get_subscriber_count(channel=None)
get_subscribers(channel=None)
get_user(user_id)
is_subscribed_to_channel(subscriber, channel)
redis_host = 'localhost'
redis_password = None
redis_port = 6379
redis_selected_db = None
class sethpusher.persistors.DefaultPersistencePolicy

Bases: object

Provides interface to persistence policies.

persist(data, **kwargs)
class sethpusher.persistors.ExternalApiPersistencePolicy

Bases: sethpusher.persistors.DefaultPersistencePolicy

Posts message to selected API endpoint using AsyncHTTPClient

api_url = ''
get_api_url(**kwargs)
persist(data, **kwargs)
class sethpusher.persistors.RedisPersistencePolicy

Bases: sethpusher.persistors.DefaultPersistencePolicy

Stores messages in redis

persist(data, **kwargs)
class sethpusher.utils.BasicPubSubManager

Bases: object

publish(channel, message)
subscribe(channels, subscriber)
unsubscribe(channel, subscriber)
sethpusher.utils.is_request_body_json(f)

Websocket API

class sethpusher.ws.urls.SethSockJSRouter(connection, prefix='', user_settings={}, io_loop=None, **kwargs)

Bases: sockjs.tornado.router.SockJSRouter

sethpusher.ws.urls.get_ws_handlers(application_instance)
class sethpusher.ws.views.SethSocketHandler(session)

Bases: sockjs.tornado.conn.SockJSConnection

application
check_origin(origin)
close(code=3000, message='')
handle_connect(source_msg, subscriber_id)
handle_subscribe(source_msg)
handle_unsubscribe(source_msg)
on_close()
on_message(raw_msg)
on_open(request)

Web API

sethpusher.api.urls.get_api_handlers(application_instance)
class sethpusher.api.views.ChannelDebugHandler(application, request, **kwargs)

Bases: sethpusher.api.views.SethHandler

get(*args, **kwargs)
class sethpusher.api.views.ChannelHandler(application, request, **kwargs)

Bases: sethpusher.api.views.SethHandler

post(*args, **kwargs)
class sethpusher.api.views.OverallDebugHandler(application, request, **kwargs)

Bases: sethpusher.api.views.SethHandler

get(*args, **kwargs)
class sethpusher.api.views.SethHandler(application, request, **kwargs)

Bases: tornado.web.RequestHandler

error(message, code=500)
json_response(data, code=200)
prepare()
class sethpusher.api.views.UserHandler(application, request, **kwargs)

Bases: sethpusher.api.views.SethHandler

post(*args, **kwargs)

Indices and tables