时间:2021-07-01 10:21:17 帮助过:35人阅读
消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列。结构图如下:
rabbitmq的python实例工作队列
准备工作(Preparation)
在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。
修改send.py,从命令行参数里接收信息,并发送
- import sys
- message= ' '.join(sys.argv[1:])or "Hello World!"
- channel.basic_publish(exchange='',
- routing_key='hello',
- body=message)
- print " [x] Sent %r" % (message,)
修改receive.py的回调函数。
- import time
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py
- $ python new_task.py First message.
- $ python new_task.py Second message..
- $ python new_task.py Third message...
- $ python new_task.py Fourth message....
- $ python new_task.py Fifth message.....
观察worker.py接收到任务,其中一个工作者接收到3个任务 :
- $ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'First message.'
- [x] Received 'Third message...'
- [x] Received 'Fifth message.....'
另外一个工作者接收到2个任务 :
- $ python worker.py
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'Second message..'
- [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep(5)
- print " [x] Done"
- ch.basic_ack(delivery_tag= method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
- channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
- channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
- channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
- channel.basic_publish(exchange='',
- routing_key="task_queue",
- body=message,
- properties=pika.BasicProperties(
- delivery_mode= 2,# make message persistent
- ))
公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
- channel.basic_qos(prefetch_count=1)
new_task.py完整代码
- #!/usr/bin/env python
- import pika
- import sys
- connection= pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel= connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- message= ' '.join(sys.argv[1:])or "Hello World!"
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode= 2,# make message persistent
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
worker.py完整代码
- #!/usr/bin/env python
- import pika
- import time
- connection= pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel= connection.channel()
- channel.queue_declare(queue='task_queue', durable=True)
- print ' [*] Waiting for messages. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
- ch.basic_ack(delivery_tag= method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(callback,
- queue='task_queue')
- channel.start_consuming()
以上就是Python rabbitmq的使用(二)的内容,更多相关内容请关注PHP中文网(www.gxlcms.com)!