Python连接MQ核心是选对客户端库、建立可靠连接、正确收发消息并做好异常与确认处理;主流MQ对应库包括RabbitMQ用pika、Kafka用kafka-python、Redis用redis-py、RocketMQ用rocketmq-client-python。
Python连接消息队列(MQ)系统,核心是选对客户端库、建立可靠连接、正确收发消息,并做好异常与确认处理。不同MQ系统协议和API略有差异,但通用逻辑一致。
主流MQ及推荐库:
安装示例(以RabbitMQ为例):
pip install pika
典型步骤:建立连接 → 创建信道 → 声明交换机/队列 → 发布消息
import pika1. 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
2. 确保队列存在(不存在则自动创建)
channel.queue_declare(queue='task_queue', durable=True)
3. 发送消息(持久化 + 消息确认)
channel.basic_publish( exchange='', routing_key='task_queue', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息持久化,重启后不丢失 ) ) print(" [x] Sent 'Hello World!'") connection.close()
避免消息丢失的关键是关闭自动确认(auto_ack=False),并显式调用 basic_ack。
def callback(ch, method, properties, body):
print(f" [x] Receiv
ed {body.decode()}")
# 模拟处理耗时任务
import time
time.sleep(2)
# 手动确认消息已处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
关闭自动确认
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
注意:若处理中崩溃且未ack,RabbitMQ会将消息重新入队(前提是队列和消息都设为durable,且消费者未设置requeue=False)。
基本上就这些。MQ不是黑盒,理解“连接-声明-发/收-确认-异常兜底”这条主线,就能稳住大多数业务场景。