Starts any instantiated nsq.Reader or nsq.Writer
Reader provides high-level functionality for building robust NSQ consumers in Python on top of the async module.
Reader receives messages over the specified topic/channel and calls message_handler for each message (up to max_tries).
Multiple readers can be instantiated in a single process (to consume from multiple topics/channels at once).
Supports various hooks to modify behavior when heartbeats are received, to temporarily disable the reader, and pre-process/validate messages.
When supplied a list of nsqlookupd addresses, it will periodically poll those addresses to discover new producers of the specified topic.
It maintains a sufficient RDY count based on the # of producers and your configured max_in_flight.
Handlers should be defined as shown in the examples below. The handler receives a nsq.Message object that has instance methods nsq.Message.finish(), nsq.Message.requeue(), and nsq.Message.touch() to respond to nsqd.
It is responsible for sending FIN or REQ commands based on return value of message_handler. When re-queueing, an increasing delay will be calculated automatically.
Additionally, when message processing fails, it will backoff in increasing multiples of requeue_delay between updating of RDY count.
Synchronous example:
import nsq
def handler(message):
print message
return True
r = nsq.Reader(message_handler=handler,
lookupd_http_addresses=['http://127.0.0.1:4161'],
topic="nsq_reader", channel="asdf", lookupd_poll_interval=15)
nsq.run()
Asynchronous example:
import nsq
buf = []
def process_message(message):
global buf
message.enable_async()
# cache the message for later processing
buf.append(message)
if len(buf) >= 3:
for msg in buf:
print msg
msg.finish()
buf = []
else:
print 'deferring processing'
r = nsq.Reader(message_handler=process_message,
lookupd_http_addresses=['http://127.0.0.1:4161'],
topic="nsq_reader", channel="async", max_in_flight=9)
nsq.run()
Parameters: |
|
---|
Adds a connection to nsqd at the specified address.
Parameters: |
|
---|
Called as part of RDY handling to identify whether this Reader has been disabled
This is useful to subclass and override to examine a file on disk or a key in cache to identify if this reader should pause execution (during a deploy, etc.).
Called when a message has been received where msg.attempts > max_tries
This is useful to subclass and override to perform a task (such as writing to disk, etc.)
Parameters: | message – the nsq.Message received |
---|
Called whenever a heartbeat has been received
This is useful to subclass and override to perform an action based on liveness (for monitoring, etc.)
Parameters: | conn – the nsq.AsyncConn over which the heartbeat was received |
---|
Used to identify when buffered messages should be processed and responded to.
When max_in_flight > 1 and you’re batching messages together to perform work is isn’t possible to just compare the len of your list of buffered messages against your configured max_in_flight (because max_in_flight may not be evenly divisible by the number of producers you’re connected to, ie. you might never get that many messages... it’s a max).
Example:
def message_handler(self, nsq_msg, reader):
# buffer messages
if reader.is_starved():
# perform work
reader = nsq.Reader(...)
reader.set_message_handler(functools.partial(message_handler, reader=reader))
nsq.run()
Called when a message is received in order to execute the configured message_handler
This is useful to subclass and override if you want to change how your message handlers are called.
Parameters: | message – the nsq.Message received |
---|
Trigger a query of the configured nsq_lookupd_http_addresses.
Assigns the callback method to be executed for each message received
Parameters: | message_handler – a callable that takes a single argument |
---|