Creating a queue is simple.
>>> from bunnymq import Queue
>>> queue = Queue('test', host='localhost', port=5672, vhost='/', username='guest', password='guest')
This will create a queue named bunnymq.test
on the RabbitMQ server running at localhost:5672
.
The username
, password
, host
, port
, vhost
arguments are optional; if none are given the defaults will be used.
You can have one (or many) Python programs pushing to the queue using Queue.put
>>> queue.put(1)
>>> queue.put('hello')
>>> queue.put({'a': 1})
You can push any python object that can be pickled. For example:
>>> import datetime
>>> queue.put(datetime.now())
If you want to turn off serialization or change the serializer from pickle
to something else e.g. json
, refer this section.
You can indicate the priority of the item:
>>> queue.put({'b': 1}, priority=8)
This item will be consumed before the ones with lower priority. The priority values range from 1 (low) through 10 (high). The default priority is 5.
You can use the len
function to find the number of items currently present in the queue
>>> len(queue)
8
You can have one (or many) Python programs pulling items off the queue using Queue.get
>>> queue.get()
1
If you try again,
>>> q.get()
This will raise an exception:
Exception: The previous message was neither marked done nor requeued.
The server needs to know what happened with the message you just pulled. This brings us to the topic of consumer acknowledement.
The consumer processes the message once it pulls it off the queue. One of three things can happen
Inform RabbitMQ that the message should be deleted:
>>> queue.task_done()
On the other hand, if you want to retry again:
>>> queue.requeue()
You can optionally specify a priority while requeuing,
>>> queue.requeue(priority=3)
Here is an example:
>>> queue.get()
'hello'
>>> queue.task_done()
>>> queue.get()
{'a': 1}
>>> queue.requeue()
>>> queue.get() # some other consumer may get this message
{'a': 1}
The Queue
object implements the iterable protocol, which means you can pull items off the queue as follows:
>>> for item in queue:
... process(item)
... queue.task_done() # <- do not forget
If there are no more items in the queue, this will block indefinitely.
You can register any function of one argument as a worker, by using the decorator Queue.worker
:
@queue.worker
def process(item):
print(item)
Then start the consumer with
>>> queue.consume()
This will run the iteration internally.
The processing logic usually contains try/except to catch errors and decide wheather to mark it done or requeue. The decorator approach decreases one level of indentation resulting in somewhat more readable code.
Once you have the consumer code ready (iterative or decorator version), you can start multiple of them, in different python processes. If your code is dockerized, using docker-compose makes this really straight forward.
:warning: A
Queue
object must never be accessed by multiple threads. One worker per process, that’s a rule, else bad things will happen.
You can spawn as many workers as you need.
To purge all the messages in the queue
>>> queue.clear()
To delete a queue
>>> queue.delete()
You can change the default pickle
serializer easily. You can turn off
serialization as follows:
>>> queue = Queue('test', serializer=None)
Any object that implements the following two methods can work as a serializer:
dumps
, takes an object and returns str/bytes
loads
, takes a bytes
objectFor example, the standard library json
module would be a drop in replacement.
>>> import json
>>> queue = Queue('test', serializer=json)
When using custom serializers make sure that both producer and consumers are using the same serializer.