RabbitMQ Publish/Subscribe

RabbitMQ支持一对多的模式,一般称为发布/订阅。也就是说,生产者产生一条消息后,RabbitMQ会把该消息分发给所有的消费者。

Exchanges

之前的教程中,仅仅使用了基本的消息模型:

  • 生产者产生消息
  • 把消息添加到消息队列
  • 消费者接收消息

而在rabbitmq完整的消息模型中,并不是这样的。事实上,生产者并不知道消息是否发送到队列,而是把消息直接发送给Exchanges

Exchanges的功能理解起来非常简单,它只负责接收生产者发送的数据并把这些数据添加到消息队列。但是,在存在多个消息队列的情况下,Exchanges必须知道每条消息要添加到哪一个消息队列。

RabbitMQ提供了几种Exchanges,包括:direct, topic, headers and fanout

这里,仅仅介绍fanout的使用。

1
channel.exchange_declare(exchange='news', type='fanout')

那么,发布消息:

1
channel.basic_publish(exchange='news', routing_key='', body=message)

Temporary queues

由于在生产者和消费者中需要指定相同的消息队列才能实现消息通信,那么如果不特殊指定某个消息队列会如何呢?
那么需要使用默认参数让系统给生成一个特定的消息队列。

1
result = channel.queue_declare()

Bindings

为了发送指定发送的消息队列,必须创建exchange和消息队列之间的关系:

1
channel.queue_bind(exchange='news', queue=result.method.queue)

例子

作为生产者的publish:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# coding=utf-8

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='news',
type='fanout')
for i in range(100):
message = str(i) + 'Hello World!'
channel.basic_publish(exchange='news', routing_key='', body=message)
print " [x] Sent %r" % (message,)
import time
time.sleep(2)
connection.close()

作为消费者的subscribe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python
# coding=utf-8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='news', type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='news',
queue=queue_name)

print ' [*] Waiting for news. To exit press CTRL+C'

def callback(ch, method, properties, body):
print " [x] %r" % (body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()