Welcome to messaging’s documentation!¶
Contents:
Message Abstraction¶
Message
- abstraction of a message
Synopsis¶
Example:
import messaging.message as message
# constructor + setters
msg = message.Message()
msg.body = "hello world"
msg.header = {"subject" : "test"}
msg.header["message-id"] = "123"
# fancy constructor
msg = message.Message(
body = "hello world",
header = {
"subject" : "test",
"message-id" : "123",
},
)
# getters
if (msg.body) {
...
}
id = msg.header["message-id"]
# serialize it
msg.serialize()
# serialize it and compress the body with zlib
msg.serialize({"compression" : "zlib"})
# serialize it and compress the body with lz4
msg.serialize({"compression" : "lz4"})
# serialize it and compress the body with snappy
msg.serialize({"compression" : "snappy"})
Description¶
This module provides an abstraction of a message, as used in messaging, see for instance: http://en.wikipedia.org/wiki/Enterprise_messaging_system.
A message consists of header fields (collectively called “the header of the message”) and a body.
Each header field is a key/value pair where the key and the value are text strings. The key is unique within the header so we can use a dict to represent the header of the message.
The body is either a text string or a binary string. This distinction is needed because text may need to be encoded (for instance using UTF-8) before being stored on disk or sent across the network.
To make things clear:
- a text string (aka character string) is a sequence of Unicode characters
- a binary string (aka byte string) is a sequence of bytes
Both the header and the body can be empty.
Json Mapping¶
In order to ease message manipulation (e.g. exchanging between applications, maybe written in different programming languages), we define here a standard mapping between a Message object and a JSON object.
A message as defined above naturally maps to a JSON object with the following fields:
- header
- the message header as a JSON object (with all values being JSON strings).
- body
- the message body as a JSON string.
- text
- a JSON boolean specifying whether the body is text string (as opposed to binary string) or not.
- encoding
- a JSON string describing how the body has been encoded (see below).
All fields are optional and default to empty/false if not present.
Since JSON strings are text strings (they can contain any Unicode character), the message header directly maps to a JSON object. There is no need to use encoding here.
For the message body, this is more complex. A text body can be put as-is in the JSON object but a binary body must be encoded beforehand because JSON does not handle binary strings. Additionally, we want to allow body compression in order to optionally save space. This is where the encoding field comes into play.
The encoding field describes which transformations have been applied to the message body. It is a + separated list of transformations that can be:
- base64
base64
encoding (for binary body or compressed body).- lz4
lz4
compression.- snappy
snappy
(http://code.google.com/p/snappy/).- utf8
utf8
encoding (only needed for a compressed text body).- zlib
zlib
compression.
Here is for instance the JSON object representing an empty message
(i.e. the result of Message()
):
{}
Here is a more complex example, with a binary body:
{
"header":{"subject":"demo","destination":"/topic/test"},
"body":"YWJj7g==",
"encoding":"base64"
}
You can use the Message.jsonify()
method to convert a
Message
object into a dict representing the equivalent
JSON object.
Conversely, you can create a new Message
object from a
compatible JSON object (again, a dict
) with the
dejsonify()
method.
Using this JSON mapping of messages is very convenient because you can easily put messages in larger JSON data structures. You can for instance store several messages together using a JSON array of these messages.
Here is for instance how you could construct a message containing in its body another message along with error information:
try:
import simplejson as json
except ImportError:
import json
import time
# get a message from somewhere...
msg1 = ...
# jsonify it and put it into a simple structure
body = {
"message" : msg1.jsonify(),
"error" : "an error message",
"time" : time.time(),
}
# create a new message with this body
msg2 = message.Message(body = json.dumps(body))
msg2.header["content-type"] = "message/error"
A receiver of such a message can easily decode it:
try:
import simplejson as json
except ImportError:
import json
# get a message from somewhere...
msg2 = ...
# extract the body which is a JSON object
body = json.loads(msg2.body)
# extract the inner message
msg1 = message.dejsonify(body['message'])
Stringification and Serialization¶
In addition to the JSON mapping described above, we also define how to stringify and serialize a message.
A stringified message is the string representing its equivalent
JSON object. A stringified message is a text string and can for
instance be used in another message. See the Message.stringify()
and destringify()
methods.
A serialized message is the UTF-8 encoding of its stringified
representation. A serialized message is a binary string and can for
instance be stored in a file. See the Message.serialize()
and deserialize()
methods.
For instance, here are the steps needed in order to store a message into a file:
- transform the programming language specific abstraction of the message into a JSON object
- transform the JSON object into its (text) string representing
- transform the JSON text string into a binary string using UTF-8 encoding
1 is called Message.jsonify()
, 1 + 2 is called
Message.stringify()
and 1 + 2 + 3 is called
Message.serialize()
.
To sum up:
Message object
| ^
jsonify() | | dejsonify()
v |
JSON compatible dict
| ^
JSON encode | | JSON decode
v |
text string
| ^
UTF-8 encode | | UTF-8 decode
v |
binary string
Copyright (C) 2013-2016 CERN
-
class
messaging.message.
Message
(body='', header=None)¶ A Message abstraction class.
-
body
¶ Returns the body of the message.
-
clone
()¶ Returns a clone of the message.
-
equals
(other)¶ Check if the message is equal to the given one.
-
get_body
()¶ Returns the body of the message.
-
get_header
()¶ Return the header of the message.
-
get_text
()¶ Is it a text message?
-
header
¶ Return the header of the message.
-
is_text
()¶ Is it a text message?
-
jsonify
(option={})¶ Transforms the message to JSON.
-
md5
()¶ Return the checksum of the message.
-
serialize
(option={})¶ Serialize message.
-
set_body
(value)¶ Set the message body to new value.
-
set_header
(value)¶ Set the message header to new value.
-
set_text
(value)¶ Set if the message is text.
-
size
()¶ Returns an approximation of the message size.
-
stringify
(option={})¶ Transforms the message to string.
-
text
¶ Is it a text message?
-
-
messaging.message.
dejsonify
(obj)¶ Returns a message from json structure.
-
messaging.message.
deserialize
(binary)¶ Deserialize a message.
-
messaging.message.
destringify
(string)¶ Destringify the given message.
-
messaging.message.
is_ascii
(string)¶ Returns True is the string is ascii.
-
messaging.message.
is_bytes
(string)¶ Check if given string is a byte string.
Message Generator¶
Synopsis¶
Example:
import messaging.generator as generator;
# create the generator
mg = generator.Generator(
body_content = "binary",
body_size = 1024,
)
# use it to generate 10 messages
for i in range(10):
msg = mg.message()
... do something with it ...
Description¶
This module provides a versatile message generator that can be useful for stress testing or benchmarking messaging brokers or libraries.
Copyright (C) 2013-2016 CERN
-
class
messaging.generator.
Generator
(**kwargs)¶ A message generator tool.
-
message
()¶ Returns a newly generated Message object
Options
When creating a message generator, the following options can be given:
body-content
- string: specifying the body content type; depending on this value, the body will be made of:
- base64: only Base64 characters
- binary: anything
- index: the message index number, starting at 1, optionally adjusted to match the C<body-size> (this is the default)
- text: only printable 7-bit ASCII characters
- body-size
- integer specifying the body size
- header-count
- integer specifying the number of header fields
- header-value-size
- integer specifying the size of each header field value (default is -32)
- header-name-size
- integer specifying the size of each header field name (default is -16)
- header-name-prefix
- string to prepend to all header field names (default is C<rnd->)
Note: all integer options can be either positive (meaning exactly this value) or negative (meaning randomly distributed around the value).
For instance:
mg = Generator( header_count = 10, header_value_size = -20, )
It will generate messages with exactly 10 random header fields, each field value having a random size between 0 and 40 and normally distributed around 20.
-
set
(option, value)¶ Set Generator option to value provided.
-
-
messaging.generator.
maybe_randomize
(size)¶ Maybe randomize int.
-
messaging.generator.
rndb64
(size)¶ Returns a random text string of the given size (Base64 characters).
-
messaging.generator.
rndbin
(size)¶ Returns a random binary string of the given size.
-
messaging.generator.
rndint
(size)¶ Returns a random integer between 0 and 2*size with a normal distribution.
See Irwin-Hall in http://en.wikipedia.org/wiki/Normal_distribution
-
messaging.generator.
rndstr
(size)¶ Returns a random text string of the given size (all printable characters).
Message Queue¶
Queue
- abstraction of a message queue
Synopsis¶
Example:
import messaging.queue as queue
mq = queue.new({"type":"Foo", ... options ...});
# is identical too
mq = queue.foo.Foo(... options ...);
Description¶
This module provides an abstraction of a message queue. Its only
purpose is to offer a unified method to create a new queue. The
functionality is implemented in child modules such as
messaging.queue.dqs.DQS
.
Copyright (C) 2013-2016 CERN
-
messaging.queue.
new
(option)¶ Create a new message queue object; options must contain the type of queue (which is the name of the child class), see above.
Directory Queue Normal¶
DQN
- abstraction of a dirq.queue.Queue
message queue.
Synopsis¶
Example:
from messaging.message import Message
from messaging.queue.dqn import DQN
# create a message queue
mq = DQN(path = "/some/where")
# add a message to the queue
msg = Message(body="hello world")
print("msg added as %s" % mq.add_message(msg))
# browse the queue
for name in mq:
if mq.lock(name):
msg = mq.get_message(name)
# unlock the element
mq.unlock(name)
# othwerwise, if you want to remove the element
# mq.remove(name)
Description¶
This module provides an abstraction of a message queue. It derives
from the dirq.queue.Queue
module that provides a generic
directory-based queue.
It uses the following dirq.queue.Queue
schema to store a
message:
schema = {
"header" = "table",
"binary" = "binary?",
"text" = "string?",
}
The message header is therefore stored as a table and the message body is stored either as a text or binary string.
Copyright (C) 2013-2016 CERN
-
class
messaging.queue.dqn.
DQN
(**data)¶ Abstraction of a Normal Queue message queue.
-
add_message
(msg)¶ Add the given message (a
messaging.message.Message
object) to the queue and return the corresponding element name.- Raise:
TypeError
if the parameter is not amessaging.message.Message
.
-
dequeue_message
(element)¶ Dequeue the message from the given element and return a
messaging.message.Message
object.- Raise:
TypeError
if the parameter is not astring
.
-
get_message
(element)¶ Get the message from the given element (which must be locked) and return a
messaging.message.Message
object.
-
Directory Queue Simple¶
DQS
- abstraction of a dirq.QueueSimple.QueueSimple
message queue.
Synopsis¶
Example:
from messaging.message import Message
from messaging.queue.dqs import DQS
# create a message queue
mq = DQS(path = "/some/where")
# add a message to the queue
msg = Message(body = "hello world")
print("msg added as %s" % mq.add_message(msg))
# browse the queue
for name in mq:
if mq.lock(name):
msg = mq.get_message(name)
# unlock the element
mq.unlock(name)
# othwerwise, if you want to remove the element
# mq.remove(name)
Description¶
This module provides an abstraction of a message queue. It derives
from the dirq.QueueSimple.QueueSimple
module that provides
a generic directory-based queue.
It simply stores the serialized message (with optional compression) as
a dirq.QueueSimple.QueueSimple
element.
Copyright (C) 2013-2016 CERN
-
class
messaging.queue.dqs.
DQS
(**data)¶ Abstraction of a
dirq.QueueSimple.QueueSimple
message queue.-
add_message
(msg)¶ Add the given message (a
messaging.message.Message
object) to the queue and return the corresponding element name.- Raise:
- TypeError if the parameter is not a
messaging.message.Message
.
-
get_message
(element)¶ Dequeue the message from the given element and return a
messaging.message.Message
object.
-
Stomppy helper¶
This class add support for Message module in stomppy default listener.
It can be passed directly to Connection.set_listener()
.
Copyright (C) 2013-2016 CERN
-
class
messaging.stomppy.
MessageListener
¶ This class add support for Message module in stomppy default listener. It can be passed directly to Connection.set_listener().
-
connected
(message)¶ Called by the STOMP connection when a CONNECTED frame is received, that is after a connection has been established or re-established.
Parameters: message – the message received from server
-
error
(message)¶ Called by the STOMP connection when an ERROR frame is received.
Parameters: message – the message received from server
-
message
(message)¶ Called by the STOMP connection when a MESSAGE frame is received.
Parameters: message – the message received from server
-
on_connected
(headers, body)¶ Translate standard call to custom one.
-
on_connecting
(host_and_port)¶ Called by the STOMP connection once a TCP/IP connection to the STOMP server has been established or re-established. Note that at this point, no connection has been established on the STOMP protocol level. For this, you need to invoke the “connect” method on the connection.
Parameters: host_and_port – a tuple containing the host name and port number to which the connection has been established
-
on_disconnected
()¶ Called by the STOMP connection when a TCP/IP connection to the STOMP server has been lost. No messages should be sent via the connection until it has been reestablished.
-
on_error
(headers, body)¶ Translate standard call to custom one.
-
on_heartbeat_timeout
()¶ Called by the STOMP connection when a heartbeat message has not been received beyond the specified period.
-
on_message
(headers, body)¶ Translate standard call to custom one.
-
on_receipt
(headers, body)¶ Translate standard call to custom one.
-
on_send
(headers, body)¶ Translate standard call to custom one.
-
receipt
(message)¶ Called by the STOMP connection when a RECEIPT frame is received, sent by the server if requested by the client using the ‘receipt’ header.
Parameters: message – the message received from server
-
send
(message)¶ Called by the STOMP connection when it is in the process of sending a message.
Parameters: message – the message being sent to server
-
Messaging is a set of Python modules useful to deal with messages, as used in messaging, see for instance: http://en.wikipedia.org/wiki/Enterprise_messaging_system.
The modules include a transport independent message abstraction, a versatile message generator and several message queues/spools to locally store messages.
You can download the module at the following link: http://pypi.python.org/pypi/messaging/
An Perl implementation of the same abstractions and queue algorithms is available at the following page: http://search.cpan.org/dist/Messaging-Message/
Copyright (C) 2013-2016 CERN