Phillip Pearson
Posts: 1083
Nickname: myelin
Registered: Aug, 2003
|
Phillip Pearson is a Python hacker from New Zealand
|
|
|
|
Initial queuing experiments
|
Posted: Aug 3, 2008 10:05 PM
|
|
|
This post originated from an RSS feed registered with Python Buzz
by Phillip Pearson.
|
Original Post: Initial queuing experiments
Feed Title: Second p0st
Feed URL: http://www.myelin.co.nz/post/rss.xml
Feed Description: Tech notes and web hackery from the guy that brought you bzero, Python Community Server, the Blogging Ecosystem and the Internet Topic Exchange
|
Latest Python Buzz Posts
Latest Python Buzz Posts by Phillip Pearson
Latest Posts From Second p0st
|
|
I've been playing with RabbitMQ over the last couple of days. I've found that the reports of AMQP being a pain in the ass to get started with a right on the mark.
py-amqplib was quite good, although when I use basic_consume and a callback, it seems to use a lot of CPU and I can only handle about 170 msgs/sec (which remains fairly constant as I add processes - i.e. two processes can do ~85 msgs/sec each). With basic_get it never gets far above 20 msgs/sec but doesn't use much CPU - I guess it spends most of its time waiting for data.
I gave the QPid Python library a go after that and couldn't get basic_consume to work, but basic_get was all right. Forking off 20 worker processes and getting them to poll with basic_get, it would handle about 300 msgs/sec easily (with the generator and RabbitMQ, on the same machine, each using 20-30% CPU). Each worker sat at around 3% CPU. Trying again with py-amqplib, the worker processes used about 9% CPU each (with the same throughput).
So if you're looking for higher performance but don't mind battling through the complete lack of documentation, QPid appears to be the way to go. Here's some working receiver code:
import qpid, time
conn = qpid.client.Client('localhost', 5672, qpid.spec.load('qpid/specs/amqp.0-8.xml'), vhost='/')
print conn.start({"LOGIN": "your login name here", "PASSWORD": "your password here"})
ch = conn.channel(1)
print ch.channel_open()
r = ch.access_request('/data', active=True, read=True, write=True)
ticket = 0
ch.exchange_declare(ticket, "tempexch", "direct", durable=False, auto_delete=False)
ch.queue_declare(queue="tempqueue", durable=False, exclusive=False, auto_delete=False)
ch.queue_bind(queue="tempqueue", exchange="tempexch", routing_key="tempqueue")
while 1:
msg = ch.basic_get(queue="tempqueue")
c = msg.content
if c is not None:
# handle message now
print c.body
ch.basic_ack(msg.delivery_tag)
else:
time.sleep(1)
Comment
Read: Initial queuing experiments
|
|