时间:2021-07-01 10:21:17 帮助过:2人阅读
consumer
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 ‘localhost‘)) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ? we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=‘hello‘) 14 15 16 def callback(ch, method, properties, body): 17 print(" 收到: %r" % body.decode("utf-8")) 18 19 channel.basic_consume(callback, 20 queue=‘hello‘, 21 no_ack=True) 22 print(‘ 等待。。。‘) 23 channel.start_consuming()
注意代码中的英文注释,特别是为什么又一次声明queue。。。
1.4 轮询原理
1.3中如果依次运行两个consumer,分别记consumer1、consumer2,那么producer第一次发消息是consumer1收到,第二次发是consumer2收到,第三次发又是consumer1收到......也就是说,rabbitMQ是依次把消息发给consumer端。
1.5 消息持久化
producer
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 相当于建立一个socket连接 6 channel = connection.channel() # 定义一个管道 7 # 声明Queue 8 channel.queue_declare(queue="hello2",durable=True) # durable=True 是把这个队列持久化,如果rabbitMQ挂掉,队列还在;如果 9 # 队列中的消息没有持久化,则消息会丢失 10 channel.basic_publish(exchange="", 11 routing_key="hello2", 12 body="Hi,how are you?", 13 properties=pika.BasicProperties( 14 delivery_mode=2,)) # properties=pika.BasicProperties(delivery_mode=2,) 这是队列中的消息持久化 15 print("发送了一句话。。。") 16 connection.close()
上述代码中,第8行只是队列持久化,如果rabbitMQ挂掉,队列还在;但如果队列中的消息没有持久化,则消息会丢失。
1.6 消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
1.7 消息发布/订阅(Publish/Subscribe)
1.8 有选择地接收消息(exchange type = direct)
python之路day11【RabbitMQ、Redis、Mysql】
标签: