Index¶
Quick start¶
Supported versions¶
- Python 2.7
- Python 3.5+
Requirements¶
pika
gevent
(server only)
Installation¶
pip install isclib
Usage¶
Comprehensive example¶
server.py
#!/usr/bin/env python3.6
from isc.server import Node, expose, on, local_timer
from time import sleep
import logging
class ExampleService(object):
name = 'example'
def __init__(self):
self.tracker = None
@expose
def add(self, a, b, wait=0):
sleep(wait)
return str(a + b) * 8000
@expose
def raise_error(self):
raise Exception('testing')
def private_method(self):
return 'Cannot call me!'
@on('boom')
def do_stuff(self, arg):
print('Got stuff:', arg)
@expose
def slow_method(self):
sleep(3)
@expose
def start_tracking(self):
self.tracker = tracker.SummaryTracker()
@expose
def get_summary(self):
return list(self.tracker.format_diff())
# @local_timer(timeout=3)
# def print_stats(self):
# print('Stats: foobar')
service = ExampleService()
node = Node(exchange='isctest')
node.set_logging_level(logging.DEBUG)
node.register_service(service)
if __name__ == '__main__':
try:
node.run()
except KeyboardInterrupt:
node.stop()
client.py
#!/usr/bin/env python3.6
from isc.client import Client, RemoteException, TimeoutException
client = Client(exchange='isctest')
client.start()
import time
time.sleep(2)
client.notify('boom', dict(foo='bar'))
a
assert client.example.add(2, 3) == '5' * 8000
assert client.invoke('example', 'add', 2, 3) == '5' * 8000
try:
client.example.add(2, '3')
except RemoteException:
pass
else:
assert False
try:
client.example.raise_error()
except RemoteException:
pass
else:
assert False
try:
client.example.private_method()
except RemoteException:
pass
else:
assert False
try:
client.example.unexisting_method()
except RemoteException:
pass
else:
assert False
try:
client.set_invoke_timeout(1)
client.example.slow_method()
except TimeoutException:
pass
else:
assert False
client.stop()
Caveats¶
If you import anything from isc.server
, keep in mind that
this library uses gevent
to patch built-in libraries. This
doesn’t apply to isc.client
though.
Simple server & client¶
# users_service.py
from isc.server import Node, expose
class UserService(object):
name = 'users'
@expose
def get_user(self, id):
if id == 1:
return 'Andrew'
elif id == 2:
return 'Victoria'
return None
node = Node()
node.register_service(UserService())
try:
node.run()
except KeyboardInterrupt:
node.stop()
# app.py
from isc.client import Client
client = Client()
print(client.users.get_user(1)) # prints 'Andrew'
print(client.users.get_user(2)) # prints 'Victoria'
print(client.users.get_user(3)) # prints 'None'
Two services¶
# users_service.py
from isc.server import Node, expose, on
from superapp.models import User
class UserService(object):
name = 'users'
@expose
def get_user(self, id):
# Let's use some ORM to retrieve the user from DB
user = User.objects.filter(id=id).first()
if user:
# User not found!
return {'username': user.username}
return None
@on('new_message')
def on_new_message(self, username, message):
print('New message for user {}: {}'.format(username, message))
node = Node()
node.register_service(UserService())
try:
node.run()
except KeyboardInterrupt:
node.stop()
# messages_service.py
from isc.server import Node, expose
from isc.client import Client
from superapp.models import Message
client = Client()
class MessageService(object):
name = 'messages'
@expose
def send_message(self, body, receipt):
user = client.users.get_user(receipt)
if not user:
# User not found!
raise Exception('Cannot send message: user not found')
Message.objects.create(receipt=receipt, message=body)
# Broadcast to all instances
client.notify('new_message', user['username'], message)
node = Node()
node.register_service(MessageService())
try:
node.run()
except KeyboardInterrupt:
node.stop()
# app.py
from isc.client import Client
client = Client()
# ...
try:
client.messages.send_message('Hello!', some_user_id)
except RemoteException as e:
print('Failed to send message, error was: {}'.format(str(e)))
else:
print('Message send!')
Timers¶
# tick_service.py
from isc.server import Node, expose, local_timer
class TickerService(object):
name = 'ticker'
def __init__(self):
"""
WARNING:
Do NOT do this in real projects. (I'm speaking about local state
which is represented by `self.ticks` attribute here.)
Services MUST be stateless.
This dirty trick right here is used just to demonstrate
how the timer works without involving any external storage.
In real project:
- ALWAYS database or any other external storage instead of `self`
- NEVER mutate service object.
So in real project you would have done something like this:
self.db_conn = SomeDatabaseConnection()
# ...
spam = next(self.db_conn.query('SELECT spam;').fetchone(), None)
"""
self.ticks = 0
@expose
def get_ticks(self, id):
return self.ticks
@expose
def reset_ticks(self):
self.ticks = 0
@local_timer(timeout=5)
def local_timer(self):
self.ticks += 1
node = Node()
node.register_service(TickerService())
try:
node.run()
except KeyboardInterrupt:
node.stop()
# app.py
from isc.client import Client
from time import sleep
client = Client()
client.ticker.reset_ticks()
print(client.ticker.get_ticks()) # prints 0
sleep(10)
print(client.ticker.get_ticks()) # prints 2
sleep(10)
print(client.ticker.get_ticks()) # prints 4
client.ticker.reset_ticks()
print(client.ticker.get_ticks()) # prints 0
sleep(10)
print(client.ticker.get_ticks()) # prints 2
Server bindings¶
Client bindings¶
-
class
isc.client.
Client
(host='amqp://guest:guest@127.0.0.1:5672/', exchange='isc', codec=None, connect_timeout=2, reconnect_timeout=3, invoke_timeout=20)[source]¶ Represents a single low-level connection to the ISC messaging broker. Thread-safe.
-
class
isc.client.
ConsumerThread
(hostname, exchange_name, connect_timeout, reconnect_timeout, codec)[source]¶ Internal class. Represents connection & message consuming thread.
-
class
isc.client.
FutureResult
(cannonical_name, **extra)[source]¶ Encapsulates future result. Provides interface to block until future data is ready. Thread-safe.
-
class
isc.client.
MethodProxy
(client, service_name, method_name)[source]¶ Convenience wrapper for method.
It allows you to perform attribute chaining (e. g.
client.example.add(2, 3)
)-
call_async
(*args, **kwargs)[source]¶ Finalizes the chain & performs actual RPC invocation. Does not block.
Returns
FutureResult
.This is same as calling
invoke_async()
-
-
class
isc.client.
PublisherThread
(consumer)[source]¶ Internal class. Represents message publishing thread.
-
class
isc.client.
QueuedInvocation
(codec, service, method, args, kwargs)[source]¶ Internal class. Represents pending outgoing method call.
-
class
isc.client.
QueuedNotification
(codec, event, data)[source]¶ Internal class. Represents pending outgoing notification.
Custom codecs¶
-
class
isc.codecs.
AbstractCodec
[source]¶ Abstract base class for implementing codecs.
“Codec” is a class that tells ISC how to encode & decode message payloads.
Server can support multiple codecs while client can use only one at a time.
Default codec is
PickleCodec
. You can implement your own codec by extendingAbstractCodec
and overriding its methods.Important: Don’t forget that both client and server should have the codec installed!