RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
Python38RabbitMQ消息队列

title: Python38 RabbitMQ
tags: Python学习
grammar_cjkRuby: true


RabbitMQ 消息队列介绍

RabbitMQ是一种消息队列,与线程queue和进程QUEUE作用是一样的。
RabbitMQ是一个中间程序,可以实现不同进程之间的通信(比如python和Java之间,QQ和Word之间等);
普通情况下A进程与B进程之间通信,两者之间需要建立很多连接和单独写一些代码,但是使用RabbitMQ的话就可以实现帮助不同进程之间的数据通信。
A进程交给RabbitMQ,RabbitMQ在交给B,同样B交给RabbitMQ,RabbitMQ在交给A,RabbitMQ可以实现A与B进程之间的连接和信息转换。
使用RabbitMQ可以实现很多个独立进程之间的交互,所有其他独立进程都可以用RabbitMQ作为中间程序。

创新互联建站专注于淮南企业网站建设,成都响应式网站建设公司,商城建设。淮南网站建设公司,为淮南等地区提供建站服务。全流程定制开发,专业设计,全程项目跟踪,创新互联建站专业和态度为您提供的服务

py 消息队列:
线程 queue(同一进程下线程之间进行交互)
进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)

如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。
虽然可以通过硬盘的方式实现多个独立进程交互,但是硬盘速度比较慢,而RabbitMQ则能够很好的、快速的帮助两个独立进程实现交互。

像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列
其中pika是RabbitMQ常用的模块

RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html

几个概念说明:

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

Python38 RabbitMQ 消息队列
RabbitMQ不像之前学的python Queue都在一个队列里实现交互,RabbitMQ有多个队列(图中红色部分代表队列),每个队列都可以将消息发给多个接收端(C是接收端,P是生产消息端)

RabbitMQ基本示例.

1、Rabbitmq 安装

Windos系统

pip install pika

ubuntu系统

install  rabbitmq-server  # 直接搞定

以下centos系统
1)Install Erlang

# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm

yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start

Python38 RabbitMQ 消息队列

rabbitmq已经开启,等待传输

2、基本示例

发送端 producer

import pika

# 建立一个实例;相当于建立一个socket。
#通过ctrl+ConnectionParameters可以看到能传很多参数,如果远程还可以传用户名密码。
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默认端口5672,可不写
    )
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明一个叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue名字,将消息发给hello这个queue
                      body='Hello World!')  # 消息内容
print(" [x] Sent 'Hello World!'")
connection.close()  # 发完消息后关闭队列 

执行结果:

[x] Sent 'Hello World!'

注意一定要开启rabbitmq,否则会报错

接收端 consumer

import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()

# 为什么又声明了一个‘hello’队列?
# 如果这个queue确定已经声明了,可以不声明。但是你不知道是生产者还是消费者先运行,所以要声明两次。如果消费者没声明,且消费者先运行的话,就会报错。
# 生产者先声明queue,消费者不声明,但是消费者后运行就不会报错。
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  # 四个参数为标准格式
    print(ch, method, properties)  # 打印看一下是什么
    # ch是管道内存对象地址;method是内容相关信息  properties后面讲  body消息内容
    print(" [x] Received %r" % body)
    #time.sleep(15)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  # 消费消息
        'hello',  # 你要从哪个队列里收消息 
        callback,  # 如果收到消息,就调用callback函数来处理消息  # 注意调用的函数(callback)以前在basic_consume模块是放在形参第一个位置的,后面修改到第二个位置了,如果放错位置会报错
        # auto_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
        # 一般不写。宕机则生产者检测到发给其他消费者
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始消费消息(开始接收消息,一直收,如果没消息就卡主在这里了)

执行结果:
 [*] Waiting for messages. To exit press CTRL+C
 params=>>>  
 [x] Received b'Hello World!'

收到了bytes格式的 Hello World!

Python38 RabbitMQ 消息队列

消费者(接收端)这边看到已经卡主了

Python38 RabbitMQ 消息队列

如果此时单独在运行一下生产者(发送端),直接可以从消费者看到新收到的消息


rabbitmq 消息分发轮询

Python38 RabbitMQ 消息队列
重新开启rabbitmq

Python38 RabbitMQ 消息队列
运行三个接收者(消费者)

Python38 RabbitMQ 消息队列
运行发送者,可以看到被第一个接收者给收到信息了

Python38 RabbitMQ 消息队列
第二次运行发送者,第二个接收者收到信息了

Python38 RabbitMQ 消息队列
第三次运行发送者,第三个接收者收到信息了

上面几次运行说明了,依次的将信息发送每一个接收者


接收端 consumer

import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # 正常回调函数(callback)执行完成就表示信息接收完成,如果在还没执行完成时就出现异常就表示信息没有正常接收,比如断网、断电等,会导致信息不能正常接收。
    # 下面sleep 60秒,在60秒之前就将该模块终止执行来模拟异常情况。
    time.sleep(60)  
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True 表示不管消息是否接收(处理)完成,都不会回复确认消息
        # 如果producer不关心 comsumer是否处理完,可以使用该参数
        # 但是一般都不会使用它

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息队列
在centos中重新执行rabbitmq-server start来清空队列里的消息
然后在pycharm开启三个comsumer,在去运行等待接收消息
再去执行producer来发送消息,执行producer后,立即关闭第一个comsumer,这样消息就会因为第一个comsumer没接收成功跑到第二个comsumer去,以此类推。

Python38 RabbitMQ 消息队列
关闭第二个comsumer,第三个comsumer收到信息

Python38 RabbitMQ 消息队列
这张图是将三个comsumer同时都关闭了,这样三个comsumer都收不到消息,说明producer的消息没有被接收,此时再去开启第一个comsumer,这时第一个comsumer会将消息给接收过来。

我们将sleep注释掉,也是这种现象,这是因为comsumer并没有发送确认消息给producer

import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 声明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    time.sleep(30)  
    ch.basic_ack(delivery_tag = method.delivery_tag)   # 告诉生成者,消息处理完成

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True  

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

此时的代码:当其中一个comsumer执行完成,并发送确认消息后再去中断,下一个comsumer就不会收到消息;反之,如果还没发送确认消息就中断了,那么消息就会被下一个comsumer接收到。

rabbitmq 消息持久化

如果producer端宕机,那么队列的数据也会消失;这样就需要让队列消息持久化

# durable=True 该代码只是将生成的队列持久化(不是消息),如果producer宕机,队列会存在,单消息会丢
# 要注意需要在producer端和 comsumer端都要 写durable=True
channel.queue_declare(queue='hello',durable=True) 
在centos重新开启 rabbitmq-server start

在producer端

将producer代码执行三次,将三个消息放入队列

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      # 下面的代码是让消息持久化
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  

将producer代码执行三次,将三个消息放入队列
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body): 
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # time.sleep(30) #注释掉
    ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume( 
        'hello', 
        callback
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息队列
可以看到因为producer执行了三次,所以运行comsumer端收到了三条消息


  • 协商处理
producer端没有改变

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  
comsumer 1(消费者:1)

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    # time.sleep(30) #注释掉 
    ch.basic_ack(delivery_tag = method.delivery_tag)   

# channel.basic_qos可以使其消费者最多同时多少个消息;如果其中一个消费者处理慢(如:CPU处理性能低下),达到了最多处理的限制的话 生产者就不会再发送给该消费者。

channel.basic_qos(prefetch_count=1)  #这里限制最多同时只处理1个消息

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息队列

此时有两个comsumer模块,comsumer2比comsumer1多用了sleep 30秒来模拟性能处理慢的情况

comsumer 2(消费者:2)
复制一个comsumer模块为comsumer2

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    time.sleep(30) #comsumer2这里sleep30秒
    ch.basic_ack(delivery_tag = method.delivery_tag)   

channel.basic_qos(prefetch_count=1)  

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息队列

Python38 RabbitMQ 消息队列

我们运行两个comsumer后,一直去运行producer。 可以看到comsumer 1接收到了3条信息,而comsumer 2只接收到了1条信息,这是因为comsumer 2 sleep了30秒来模拟信息处理慢的情况;
comsumer 1 和 comsumer 2都指定了同时只能处理1条信息,producer会与comsumer 2协商,因为comsumer2一直没有处理完限制的1条信息,所以信息都被comsumer 1处理了。


Rabbitmq fanout广播模式

Python38 RabbitMQ 消息队列

新建fanout_publiser模块,用于发送广播的producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 定义一个转发器叫logs,属于一个中间人的角色,用于将producer的消息转发给消费者(comsumer)
# 定义广播类型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)   # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空

print(" [x] Send %r " % message)

connection.close()
新建fanout_consumer模块,用于接收广播的消费者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费断开后,自动将queue删除
# 就是这里会随机生成一个随机的唯一queue,用完之后会将生成的queue删除
# 这里要写queue='',如果不指定队列名字,但也要写一个空的字符串,不然会报错缺少参数
result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue  # 拿到随机生成的queue名字

# producer绑定了logs转发器
# 消费者将随机生成的队列也绑定了logs转发器
# 这样producer将消息交给logs转发器,logs转发器将消息交给对应绑定的随机队列,消费者从队列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] %r" % body)

channel.basic_consume(

    queue=queue_name, on_message_callback=callback

    # auto_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
                        )

channel.start_consuming()

Python38 RabbitMQ 消息队列
因为是广播,所以两个consumer都收到了发送者发送的消息。
不过有一点要注意!!!!!!!!!
要先运行consumer(接收者),在运行发送者。就好比收音机一样,只有你先打开收音机,发送者才能将信息发给你。 如果发送者先发送,你却没有接收,之前发送的信息,你就不会再接收到了。


Rabbitmq direct广播模式

Python38 RabbitMQ 消息队列
direct 可以区分广播,将指定的消息发送给指定的接收者;
图中显示了将error级别消息发送给C1,将info、error、warning级别消息发送给C2。

producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 定义消息级别
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ''.join(sys.argv[2:])  or "direct info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)   # routing_key为空即可,因为是广播没有定义队列,所以也不需要指定队列,但这里必须要定义为空

print(" [x] Send %r " % message)

connection.close()

consumer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

# 获取参数列表
log_levels = sys.argv[1:]

if not log_levels: # 如果没有参数,就报错,提示要指定消息级别
    sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
    sys.exit(1) # 没有参数就退出程序

# print(log_levels)

for severity in log_levels:  # 循环参数列表并绑定
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    ) #所有发送到severity的参数,都接收

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

下面在centos中运行代码

Python38 RabbitMQ 消息队列
运行C1,只接收error的消息

Python38 RabbitMQ 消息队列
运行C2,接收 info、warning、error的消息

Python38 RabbitMQ 消息队列

Python38 RabbitMQ 消息队列
producer运行,指定发送消息给error,可以看到两个consumer都接收到了error的消息

Python38 RabbitMQ 消息队列
只有C2接收到了warning的消息


RabbitMQ topic细致的消息过滤广播模式

producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 定义消息级别
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'  # 发送*.info的信息

message = ''.join(sys.argv[2:])  or "topic info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)   

print(" [x] Send %r:%r " % (routing_key,message))

connection.close()

consumer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print('[*] Waiting for logs. To exit press CTRL+c')

def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

Python38 RabbitMQ 消息队列
图中显示过滤中间有".orange."的数据,过滤以rabbit为结尾的数据,过滤以lazy开头的数据。

Python38 RabbitMQ 消息队列

Python38 RabbitMQ 消息队列

运行了两个consumer。C1接收.info为结尾的数据,C2接收.error为结尾和MySQL为开头的数据。

Python38 RabbitMQ 消息队列
在运行publisher(已经定义了发送anonymous.info,相当于以.info为结尾的信息)

Python38 RabbitMQ 消息队列
C1接收到了信息

Python38 RabbitMQ 消息队列
执行publisher代码时 后面加上 test.error,然后此时在去看C2

Python38 RabbitMQ 消息队列
C2 看到test.error相关信息

Python38 RabbitMQ 消息队列
执行publisher代码 加上 mysql.info,这样C1和C2都可以收到消息了

Python38 RabbitMQ 消息队列

Python38 RabbitMQ 消息队列

Python38 RabbitMQ 消息队列

运行C3,代码后面加一个 '#' 符号,表示C3可以接收所有信息(注意#号要被引号括起来)

Python38 RabbitMQ 消息队列
在publisher随意发送信息,C3都能收到


分享文章:Python38RabbitMQ消息队列
浏览地址:http://scpingwu.com/article/ggooeg.html