RabbitMQ-入门

RbbitMQ 学习笔记

AMQP协议组成部分

  1. Module layer:协议最高层,定义了供客户端使用的命令
  2. Session layer:中间层,负责将客户端的命令发送给服务端,再将服务端的命令返回给客户端,为客户端和服务端之间提供可靠的通信
  3. Transport layer:最底层,包括二进制流的传输,帧处理,信道复用,错误检测

生产者使用AMQP的过程

  1. Producter
  • 建立连接
  • 开启通道
  • 发送消息
  • 释放资源

消费者使用AMQP的过程

  1. Consumer
  • 建立连接
  • 开启通道
  • 准备接受消息
  • 发送确认
  • 释放资源

AMQP命令和javaAPI的对应

Connection.Start : factory.newConnection 新建连接 Connection.close : connection.close 关闭连接 Channel.Open : channel.openChannel 开启信道 Channel.close : channel.close 关闭信道 Exchange.Declare : channel.exchangeDeclare 声明交换器 Exchange.Delete : channel.exchangeDelete删除交换器 Exchange.Bind : channel.exchangeBind 交换器和交换器绑定 Exchange.Unbind : channel.exchangeUnbind 交换器和交换器解绑 Queue.Declare : channel.queueDeclare 声明队列 Queue.Bind : channel.queueBind 队列和交换机绑定 Queue.Purge : channel.queuePurge 清除队列 Queue.Delete : channel.queueDelect 删除队列 Queue.Unbind : channel.queueUnbind 队列和交换机解绑 Basic.Qos : channel.basicQos 设置未被确认消费的个数 Basic.Consume : channel.basicConsume 消费消息(推) Basic.Cancel : channel.basicCancel 取消 Basic.Publish : channel.basicPublish 发送消息 Basic.Get : channel.basicGet 消费消息(拉) Basic.Ack : channel.basicAck 确认消息 Basic.Reject : channel.basicReject 拒绝单条消息 Basic.Recover : channel.basicRecover 请求Broker重新发送未被确认的消息 Tx.Select : channel.txSelect 事务开启 Tx.Commit : channel.txCommit 事务提交 Tx.Rollback : channel.txRollback,事务回滚

rabbitmq_arc1.jpg

在rabbitMQ的使用中分为

Producer(生产者)、Exchange、Binding、Queue、Consumer(消费者)

其中在消息路由的过程中还涉及以下关系

Routing Key、Binding Key、Exchange Type 的关系

rabbitmq_arc2.png

客户端连接过程:

  1. 客户端连接到消息服务器,打开一个channel
  2. 客户端声明一个exchange,并设置相关属性
  3. 客户端声明一个queue,并设置相关属性
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系
  5. 客户端投递消息到exchange
  6. 客户端从指定的queue中消费消息
ItemComment
Exchange消息交换机,它指定消息按什么规则,路由到哪个队列
Queue消息队列,每个消息都会被投入到一个或多个队列
Binding绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key路由关键字,exchange根据这个关键字进行消息投递
Vhost虚拟主机,可以开设多个vhost,用作不同用户的权限分离
Producer消息生产者,就是投递消息的程序
Consumer消息消费者,就是接受消息的程序
Channel消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

#channel 会话通道,与客户端通过socket建立长连

发送消息过程:

  1. 生产者和Broker建立TCP链接
  2. 生产者和Broker建立通道
  3. 生产者通过通道将消息发送到Broker,有Exchange将消息进行转发
  4. Exchange将消息转发指定的Queue

接受消息过程:

  1. 消费者和Broker建立TCP链接
  2. 消费者和Broker建立通道
  3. 消费者监听指定的Queue
  4. 当有消息到达Queue时,Broker默认将消息推送给消费者
  5. 消费者接收到消息
  6. ack回复

Channel信道:

Channel是创建于连接对象之上的虚拟连接,每一个信道都分配一个唯一Id,由信道完成生产者或消费者和Broker之间的连接

为什么需要channel

为什么有TCP连接对象,还需要Channel信道,那是因为每一次TCP的创建都非常消耗资源,RabbitMQ采用一个NIO机制(io多路复用)进行通信连接达到减少性能消耗的目的

RabbitMQ 工作模式

  1. Work queuees工作队列
  2. Publish、Subscribe 发布订阅模式
  3. Routing 路由模式
  4. Topics 通配符模式
  5. Header Hearder转发器
  6. RPC远程调用模式

Work queues

work queues 和hello world程序相比,多了一个消费者 也就是多个消费者共同消费同一个队列中的消息

特点:

  1. 一个生产者讲消息发送给一个队列
  2. 多个消费者共同监听一个队列的消息
  3. 消息不能被重复消费 4.rabbit 采用轮询的方式将消息平均发送给消费者

Publish/Subscribe

特点:

  1. 一个生产者将消息发给交换机
  2. 与交换机绑定得有多个queue,每个消费者监听自己的队列
  3. 生产者讲消息发送给交换机,由交换机讲消息转发给绑定词交换机的每个队列,每个绑定交换机的队列都将收到该消息
  4. 如果消息发给没有绑定队列的交换机上,消息将会丢失

与work queue 模式的区别

  1. publish、subscribe可以定义一个交换机绑定多个队列,一个消息可以发送给多个队列
  2. work queues无需定义交换机,一个消息只能发送给一个队列
  3. pub/sub比work queues的功能更强大,pub/sub也可以将多个消费者监听同一个队列实现work queues的功能
  4. 交换机的类型为fanout

使用场景:

一个消息、任务。需要同时被多个客户端获取/执行

Routing

特点:

  1. 每个消费者监听自己的队列,并且设置routingkey,可以设置多个routingkey。我看范例里面是举得log日志的例子。一个队列监听所有级别(error,info)的routingkey。另一个队列可能是告警信息业务绑定的队列,他会设置(error)的routingkey
  2. 生产者将消息发送给交换机,发送消息时需要指定routingkey,由交换机根据routingkey来转发消息到指定的队列
  3. 生产者将消息发给交换机,发送消息时需要指定routingkey的值,交换机来判断该routingkey的值和哪个队列的routingkey相等,如果相等则将该消息转发给该队列

Routing与pub/sub的区别

  1. Pub/Sub模式在绑定交换机时不需要指定routingkey,交换机会将消息同时发送给绑定自己的多个队列。
  2. Routing模式要求队列在绑定交换机时指定routingkey,会在exchange那里做一层direct,有选择的将消息发送到,rroutingkey对应的队列中去
  3. 交换机类型为direct

Topics

特点:

  1. 每个消费者监听自己的队列,并且设置带通配符的routingkey
  2. 符号:① #,匹配一个或者多个词,info.# 可以匹配info.s,info.a。②*只能匹配一个词
  3. 一个交换机可以绑定等多个队列,每个队列需要设置一个或者多个带通配符的routingkey。
  4. 生产者将消息发送给交换机,交换机根据routingkey的值来匹配,匹配时采用通配符匹配

与Routing的区别

  1. Routing是完全匹配,Topics是通配符匹配
  2. 交换机类型为topic

header模式与routing模式不同的是,header模式取消routingkey,使用header中的key、value 匹配队列

RPC

RPC即客户端远程调用服务端的犯法,使用MQ可以实现RPC的远程异步调用,基于Direct交换机实现 流程如下:

  1. 客户端既是消费者也是生产者,向RPC请求队列发送RPC调用信息,同时会监听RPC响应队列
  2. 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
  3. 服务端讲RPC方法的结果发送到RPC响应队列

特点:

可以通过消息队列实现双方服务的异步通信

python 例程:

"""
整体实现基于同步模式:

- BlockingConnection 阻塞/连接

> 该适配器虽然是同步阻塞适配器,但是依然支持异步RPC特性
> 组成: BlockingConnection 和 BlockingChannel
> 异步适配器
> 1.Select Connection Adapter 没有第三方依赖包的异步模式
> 2.Tornado Connection Adapter 基于Tornado 的异步IO请求模式
> 3.Twisted Connection Adapter 基于Twisted’的异步IO请求模式
"""
# coding:utf8
from common.config.dev import RABBITMQ_HOST,RABBITMQ_PASS,RABBITMQ_PORT,RABBITMQ_USER
import uuid
import pika
import json
from common.utils.logger import Logger
from common.utils.helpers import JsonObject
logger = Logger()



class RabbitMQ():
    def __init__(self,v_host="/",host=RABBITMQ_HOST, port=RABBITMQ_PORT, user=RABBITMQ_USER, password=RABBITMQ_PASS):
        """
        RabbitMQ 封装
        :param v_host: 虚拟机host
        :param host: RabbitMQ 服务host
        :param port: RabbitMQ服务端口
        :param user: RabbitMq 用户认证-用户名
        :param password: RabbitMq 用户认证-密码
        """
        self._v_host = v_host
        self._host = host
        self._port = port
        self._user = user
        self._password = password
        self.exchange_type = ""
        self.exchange = ""
        self.responses = {} #rpc客户端收到的所有结果
        self.do_connection()

    def do_connection(self):
        """
        进行了连接这个动作
        :return:
        """
        try:
            self.connection = self.blocking_connection(host=self._host,
                                     port=self._port,
                                     user=self._user,
                                     password=self._password,
                                    v_host=self._v_host)
            self.channel = self.connection.channel()
        except ValueError as e:
            print(e)

    def blocking_connection(self,
                            host=RABBITMQ_HOST,
                            port=RABBITMQ_PORT,
                            user=RABBITMQ_USER,
                            password=RABBITMQ_PASS,
                            v_host="/"):
        """
        Connection Adapters 连接适配器
        pika.PlainCredentials(
        :param str用户名:用于验证的用户名,默认为guest
        :param str password:用于验证的密码,默认为guest
        :param bool delete_on_connect:连接完成后,凭证将不会存储在内存中。删除连接凭据。
        )
        pika.ConnectionParameters(
        host=host
        port=port,
        virtual_host=要使用的rabbitmq虚拟主机, 一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        credentials=PlainCredentials身份验证凭证,
        channel_max=允许的最大channel通道数,
        frame_max=AMQP frame的最大字节大小,
        heartbeat=AMQP callable 心跳超时协商,
        ssl_options=pika.SSLOptions`实例,
        connection_attempts=最大重试尝试次数,
        retry_delay=下一次等待的时间,以秒为单位,
        socket_timeout=套接字连接超时时间,
        stack_timeout=完整协议栈启动超时时间,应该高于socket_timeout,
        locale=设置语言环境值,
        blocked_connection_timeout=如果非负,阻止链接保持;如果
        tcp_options=为套接字设置的TCP选项的字典
        """
        credential_params = pika.PlainCredentials(user, password)
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host, port, v_host, credential_params)
        )
        return connection

    def model(self,exchange,model="fanout"):
        """
        可以选择,自己想构建的模式,
        然后该函数会把需要设置的交换机参数帮你设置好
        :param model:
            model_exchangeType = {
                "fanout":"fanout",
                "worker":"fanout",
                "routing":"direct",
                "direct":"direct",
                "route":"direct",
                "topic":"topic",
                "rpc":"direct"#默认为路由模式
            }
        :return:
        """
        model_exchangeType = {
            "fanout": "fanout",
            "worker": "fanout",
            "routing": "direct",
            "direct": "direct",
            "route": "direct",
            "topic": "topic",
            "rpc": "direct"
        }
        self.exchange_type = model_exchangeType[model]
        self.exchange = exchange
        return self

    def producer_init(self,exchange=None,exchange_type=None):
        """
        生产者初始化代码
        :param v_host: 默认为/,默认虚拟机
        :param exchange:交换机名字,默认为“”
        :param exchange_type:交换机类型,默认为“fanout”,广播模式
        :return: channel
        """
        try:
            self.exchange = exchange if exchange else self.exchange
            self.exchange_type = exchange_type if exchange_type else self.exchange_type
            self.check_Value(self.exchange, self.exchange_type, self.channel)
            """默认这里connection 和 channel都有了,没有raise 异常"""
            # 声明交换机
            self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type)
            return self
        except Exception as e:
            print(e)

    def consumer_init(self,exchange=None,exchange_type=None,queue="",routings=[""]):
        """
        消费者初始化代码
        :param exchange: 交换机名字,默认为
        :param exchange_type: 交换机类型,默认为“fanout”,广播模式
        :param queue: 队列名,默认为“”,为随机队列
        :param routings: 路由key列表,默认为[""],会将这些路由key都绑定到该队列上
        :return: channel
        """
        try:
            self.exchange = exchange if exchange else self.exchange
            self.exchange_type = exchange_type if exchange_type else self.exchange_type
            self.check_Value(self.exchange,self.exchange_type,self.channel)
            """默认这里connection 和 channel都有了,没有raise 异常"""
            # 声明交换机
            self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type)
            # 随机生成临时队列,绑定到交换机.当然也可以指定出多条名字固定的队列,exclusive设置为True当接收端退出时,自动销毁队列
            self.queue = self.channel.queue_declare(queue=queue, auto_delete=True)
            self.qname = self.queue.method.queue
            # 将这个随机队列绑定到交换机上,并设置路由键
            for routing in routings:
                self.channel.queue_bind(exchange=self.exchange,queue=self.qname,routing_key=routing)
            return self
        except Exception as e:
            print(e)

    def rpc_client_init(self,routings:list,exchange=None,exchange_type=None,queue="",auto_ack=True):
        """
        rpc 模型
        1. 先定义消费者
        2. 再定义生产者
        :param exchange: 客户端接受调用结果的交换机
        :param exchange_type: 客户端接受调用结果的交换机的类型
        :param queue: 客户端接受调用结果的队列,当为空时,该队列名字随机
        :param routings: 客户端接受调用结果的队列路由键,必选参数,客户端接受调用结果的队列的路由
        :param auto_ack: 是否自动确认
        :return:
        """
        try:
            self.consumer_init(exchange=exchange, exchange_type=exchange_type, queue=queue, routings=routings)
            self.channel.basic_consume(queue=self.qname, auto_ack=auto_ack, on_message_callback=self.rpc_client_response)
            return self
        except Exception as e:
            print(e)

    def rpc_client_response(self,ch, method, properties, body)->any:
        """
        收到结果后的回调函数
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        if self.corr_id == properties.correlation_id:
            self.response = json.loads(body)

    def rpc_client_request(self,body:any,exchange:str,routing_key:str):
        """
        启动rpc客户端请求循环,拿到消息后返回
        :param body: 请求体
        :param exchange:服务端的交换机
        :param routing_key:请求发送到服务端的哪个队列,由路由指定
        :return:
        """

        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.responses[self.corr_id] = None
        self.channel.basic_publish(exchange=exchange,
                                   routing_key=routing_key,
                                   properties=pika.BasicProperties(
                                       reply_to=self.qname,
                                       correlation_id=self.corr_id
                                   ),
                                   body=json.dumps(body))
        while self.response is None:
            self.connection.process_data_events()
        self.responses[self.corr_id] = self.response
        return self.response

    def rpc_server_init(self,call=None,exchange=None,exchange_type=None,queue="",routings=[""]):
        """
        rpc服务端初始化,会初始化一条消费者队列,这个队列的到达路由需要和客户端进行约定
        :param call: 处理远程调用请求的函数
        :param exchange: 服务端的客户端约定好的交换机
        :param exchange_type: 交换机类型
        :param queue: 队列名,默认为"",为临时队列,名字随机
        :param routings:路由键,服务端的队列需要绑定到某个路由上,默认为"",即该交换机为广播模式时生效
        :return:self
        """
        try:
            #将可以主动设置处理远程调用请求的函数,也可以由回调函数rpc_server_callback自己去找
            self.remote_callback = call
            self.consumer_init(exchange=exchange,exchange_type=exchange_type,queue=queue,routings=routings)
            self.channel.basic_qos(prefetch_count=100)
            self.channel.basic_consume(queue=self.qname, auto_ack=False,on_message_callback=self.rpc_server_callback)
            return self
        except Exception as e:
            print(e)

    def rpc_server_callback(self,ch, method, properties, body):
        """
        这里应该是解析参数,寻找被调用者,然后进行调用后
        将调用结果发送会请求中要求返回的路由中
        :param ch: channel
        :param method: method
        :param properties: 客户端发的消息带的参数
        :param body: 请求体,会封装调用信息
        :return:
        """
        # 将计算结果发送回该请求体要求发送的路由中
        # 这里的发送,直接发送到请求体传过来的队列中,所以设置exchange=""
        ch.basic_publish(exchange="",
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(
                             reply_to=self.qname,
                             correlation_id=properties.correlation_id
                         ),
                         body=json.dumps(self.remote_callback(json.loads(body))),
                         )
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def rpc_server_start(self):
        """
        启动rpc的服务端
        :return:
        """
        self.channel.start_consuming()

    def check_Value(self,*args):
        """参数校验"""
        for i in args:
            assert i != None

CC BY-NC 4.0

爬虫学习1-概念及urllib2
Unix/Linux编程实践1

Comments