RabbitMQ Routing

RabbitMQ可以通过路由选择订阅者来发布消息。

Bindings

通过下面的函数绑定Exchange与消息队列:

1
channel.queue_bind(exchange=exchange_name, queue=queue_name)

可以通过添加routing_key来做路由选择,如下:

1
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')

Direct Exchange

为了使用直接转发,可以设置Exchange的类型为direct

rabbitmq中直接转发的算法很简单,如果binding keyrouting key相同,消息会直接添加到相应的消息队列中取。

Multiple bindings

rabbitmq允许一个Exchange绑定多个消息队列,那么该Exchange会把消息分别发布到绑定的多个消息队列中。

例子

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
# coding=utf-8

import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='news', type='fanout')
for i in range(100):
message = str(i) + 'Hello World!'
if i%2 == 0:
channel.basic_publish(exchange='news', routing_key='0', body=message)
else:
channel.basic_publish(exchange='news', routing_key='1', body=message)
print " [x] Sent %r" % (message,)
time.sleep(2)
connection.close()

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pika
import sys

parameters = pika.URLParameters('amqp://mtest:root@rabbit-server:5672/%2F')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange='news', type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='news', queue=queue_name, routing_key=sys.argv[1])

print ' [*] Waiting for news. To exit press CTRL+C'

def callback(ch, method, properties, body):
print " [x] %r" % (body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

parameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F')