avatar

目录
RabbitMQ 入门

RabbitMQ 入门全攻略

目录

[TOC]

RabbitMQ 介绍

这部分参考了 《RabbitMQ实战指南》这本书的第 1 章和第 2 章。

RabbitMQ 简介

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ 的具体特点可以概括为以下几点:

  • 可靠性: RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
  • 灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们将 RabbitMQ 核心概念的时候详细介绍到。
  • 扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
  • 多语言客户端: RabbitMQ几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript等。
  • 易用的管理界面: RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
  • 插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI机制。

RabbitMQ 核心概念

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

下面再来看看图1—— RabbitMQ 的整体模型架构。

图1-RabbitMQ 的整体模型架构

下面我会一一介绍上图中的一些概念。

Producer 和 Consumer

  • Producer(生产者) :生产消息的一方(邮件投递者)
  • Consumer(消费者) :消费消息的一方(邮件收件人)

消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payLoad ,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。

Exchange(交换器)

在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。

Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将RabbitMQ中的交换器看作一个简单的实体。

RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略direct(默认)fanout, topic, 和 headers,不同类型的Exchange转发消息的策略有所区别。这个会在介绍 Exchange Types(交换器类型) 的时候介绍到。

Exchange(交换器) 示意图如下:

Exchange(交换器) 示意图

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键)*,用来指定这个消息的路由规则,而这个 *RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

RabbitMQ 中通过 Binding(绑定)Exchange(交换器)Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。

Binding(绑定) 示意图:

Binding(绑定) 示意图

生产者将消息发送给交换器时,==需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中==。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。

channel

简单理解:==信道是connection基于NIO的优化升级,实现连接复用==。

一个应用程序中有很多个线程需要从RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个Connection ,也就是许多个TCP 连接。然而对于操作系统而言,建立和销毁TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类似NIO’ (Non-blocking 1/0) 的做法,选择TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了Connection 的TCP 连接。同时RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection ,将这些信道均摊到这些Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

RabbitMQ 为什么需要信道?为什么不是TCP直接通信?

  1. TCP的创建和销毁,开销大,创建需要三次握手,销毁需要四次分手

  2. 如果不使用信道,那么引用程序就会使用TCP的方式连接到rabbitmq,高峰时每秒成千上万条连接会造成资源的巨大浪费(一条tcp消耗资源,成千上万的tcp会非常消耗资源),而且操作系统每秒处理TCP连接数量也是有限的,必定会造成性能瓶颈

  3. 信道的原理是一条线程一条信道,多条线程多条信道共同使用一条TCP连接。一条TCP连接可以容纳无限的信道,及时每秒造成成千上万的请求也不会造成性能瓶颈

Queue(消息队列)

Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免的消息被重复消费。

RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。

Broker(消息中间件的服务节点)

对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从Broker中消费数据的整个流程。

消息队列的运转过程

这样图1中的一些关于 RabbitMQ 的基本概念我们就介绍完毕了,下面再来介绍一下 Exchange Types(交换器类型)

vhost(虚拟主机)

虚拟主机,一个消息代理(Broker)里可以开设多个虚拟主机(vhost),用作不同用户的权限分离。可以理解为一个子数据库

Exchange Types(交换器类型)

RabbitMQ 常用的 Exchange Type 有 fanoutdirecttopicheaders 这四种(AMQP规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)。

① fanout(广播)

fanout 类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。

② direct(精准定向)

direct 类型的Exchange路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。

direct 类型交换器

以上图为例,如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为”Info”或者”debug”,消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。

direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

③ topic(模糊类型)

前面讲到direct类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
  • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

topic 类型交换器

以上图为例:

  • 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queuel 和 Queue2;
  • 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
  • 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
  • 路由键为 “java.rabbitmq.demo” 的消息只会路由到Queuel中;
  • 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。
④ headers(不推荐)

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式)’对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

总结

1、信道才是rabbit通信本质,生产者和消费者都是通过信道完成消息生产消费的。

2、交换器本质是一张路由查询表(名称和队列id,类似于hash表),这是一个虚拟出来的东西,并不存在真实的交换器。

3、消息的生命周期:生产者生产消息A 交由信道,信道通过消息(消息由载体和标签)的标签(路由键)放到交换器发送到队列上(其实就是查询匹配,一旦匹配到了规则,信道就直接和队列产生连接,然后将消息发送过去)

4、Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

安装 RabbitMq

通过 Docker 安装非常方便,只需要几条命令就好了,我这里是只说一下常规安装方法。

前面提到了 RabbitMQ 是由 Erlang语言编写的,也正因如此,在安装RabbitMQ 之前需要安装 Erlang。

注意:在安装 RabbitMQ 的时候需要注意 RabbitMQ 和 Erlang 的版本关系,如果不注意的话会导致出错,两者对应关系如下:

RabbitMQ 和 Erlang 的版本关系

安装 erlang

1 下载 erlang 安装包

在官网下载然后上传到 Linux 上或者直接使用下面的命令下载对应的版本。

[root@SnailClimb local]#wget http://erlang.org/download/otp_src_19.3.tar.gz

erlang 官网下载:http://www.erlang.org/downloads

2 解压 erlang 安装包

[root@SnailClimb local]#tar -xvzf otp_src_19.3.tar.gz

3 删除 erlang 安装包

[root@SnailClimb local]#rm -rf otp_src_19.3.tar.gz

4 安装 erlang 的依赖工具

[root@SnailClimb local]#yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel

5 进入erlang 安装包解压文件对 erlang 进行安装环境的配置

新建一个文件夹

[root@SnailClimb local]# mkdir /usr/local/erlang

对 erlang 进行安装环境的配置

[root@SnailClimb otp_src_19.3]# 
./configure --prefix=/usr/local/erlang --without-javac

6 编译安装

[root@SnailClimb otp_src_19.3]# 
make && make install

7 验证一下 erlang 是否安装成功了

[root@SnailClimb otp_src_19.3]# ./bin/erl

运行下面的语句输出“hello world”

 io:format("hello world~n", []).

输出“hello world”

大功告成,我们的 erlang 已经安装完成。

8 配置 erlang 环境变量

[root@SnailClimb etc]# vim profile

追加下列环境变量到文件末尾

#erlang
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

运行下列命令使配置文件profile生效

[root@SnailClimb etc]# source /etc/profile

输入 erl 查看 erlang 环境变量是否配置正确

[root@SnailClimb etc]# erl

输入 erl 查看 erlang 环境变量是否配置正确

安装 RabbitMQ

1. 下载rpm

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm

或者直接在官网下载

https://www.rabbitmq.com/install-rpm.html

2. 安装rpm

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# 安装socat
yum -y install socat 
##此时会报错没有socat包或是找不到socat包,解决方法安装centos的epel的扩展源
yum -y install epel-release  
##之后重新安装socat
yum -y install socat

紧接着执行:

yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm

中途需要你输入”y”才能继续安装。

3 开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

可能会遇到如下错误:

image-20200523105232373

重启解决

service rabbitmq-server restart

4 设置开机启动

chkconfig rabbitmq-server on

5 启动服务

service rabbitmq-server start

6 查看服务状态

service rabbitmq-server status

7. 访问 RabbitMQ 控制台

浏览器访问:http://你的ip地址:15672/

默认用户名和密码: guest/guest;但是需要注意的是:guestuest用户只是被容许从localhost访问。官网文档描述如下:

“guest” user can only connect via localhost

解决远程访问 RabbitMQ 远程访问密码错误

新建用户并授权

[root@SnailClimb rabbitmq]# rabbitmqctl add_user root root
Creating user "root" ...
[root@SnailClimb rabbitmq]# rabbitmqctl set_user_tags root administrator

Setting tags for user "root" to [administrator] ...
[root@SnailClimb rabbitmq]# 
[root@SnailClimb rabbitmq]# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/" ...

再次访问:http://你的ip地址:15672/ ,输入用户名和密码:root root

RabbitMQ控制台

创建Virtual Hosts

这里写图片描述

选中Admin用户,设置权限:

这里写图片描述看到权限已加:

这里写图片描述

管理界面中的功能

这里写图片描述

学习五种队列

简单队列

image-20200523163811355

P:消息的生产者
C:消息的消费者
红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

生产者一一对应消费者

导入RabbitMQ的客户端依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>

获取MQ的连接

package com.zpc.rabbitmq.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("testhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生产者发送消息到队列

public class Send {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

管理工具中查看消息

这里写图片描述

点击上面的队列名称,查询具体的队列中的信息:

这里写图片描述

消费者从队列中获取消息

public class Recv {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列(队列连接到信道上)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 声明consumer
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
            }
        };

        //  启动消费者监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

WorkQueue(轮询分发)

image-20200524085511793

一个生产者、2个消费者。一个消息只能被一个消费者获取。

生产者

向队列中发送50条消息。

public class WorkSend {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

消费者1

public class WorkRecv1 {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("[1] Recv msg : " + msg);

                try {
                    // 模拟处理消息,花费2S
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[1] done!");
                }
            }
        };

        //  启动消费者监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消费者2

public class WorkRecv2 {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("[2] Recv msg : " + msg);

                try {
                    // 模拟处理消息,花费1S
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("[2] done!");
                }
            }
        };

        //  启动消费者监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

测试及结果分析

测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。
RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

怎样才能做到按照每个消费者的能力分配消息呢?

​ 联合使用 Qos 和 Acknowledge 就可以做到。
basicQos方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

2个概念

轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

Work模式的“能者多劳”

打开上述代码的注释:

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);

测试:
消费者1比消费者2获取的消息更多。

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

手动模式:

这里写图片描述

自动模式:

这里写图片描述

订阅模式(fanout Exchange)

这里写图片描述

解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

这里写图片描述

5.6.2.消息的生产者(看作是后台系统)
向交换机中发送消息。

package com.zpc.rabbitmq.subscribe;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
5.6.3.消费者1(看作是前台系统)

package com.zpc.rabbitmq.subscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

5.6.4.消费者2(看作是搜索系统)

package com.zpc.rabbitmq.subscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

测试
测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

在管理工具中查看队列和交换机的绑定关系:

这里写图片描述

路由模式(diect Exchange)

交换机根据匹配规则路由到指定符合的队列中

这里写图片描述

这里写图片描述

生产者

这里写图片描述

消费者1(假设是前台系统)

这里写图片描述

消费2(假设是搜索系统)

这里写图片描述

Topic(主题\通配符模式)

这里写图片描述

这里写图片描述

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

这里写图片描述

生产者

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!!";
        channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消费者1(前台系统)

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消费者2(搜索系统)

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2_x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

Spring集成RabbitMQ

参考链接,这里不做演示:

https://blog.csdn.net/hellozpc/article/details/81436980

SpringBoot集成RabbitMQ

引入rabbitmq

1、配置pom文件,主要是添加spring-boot-starter-amqp的支持

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置application.properties文件,配置rabbitmq的安装地址、端口以及账户信息,数据库信息

spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=172.16.219.215
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=testhost

简单队列

1、配置队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello3");
    }

    @Bean
    public Queue neoQueue() {
        return new Queue("neo");
    }

    @Bean
    public Queue objectQueue() {
        return new Queue("object");
    }

}

2、发送者

package com.neo.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send()   {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello3", context);

    }

}

3、消费者

package com.neo.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "hello3")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

4、测试

package com.neo.rabbitmq;

import com.neo.rabbit.hello.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloTest {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void hello() throws Exception {
        helloSender.send();
        // 等待三秒,使消费者确定可以消费到数据
        Thread.sleep(3000);
        System.out.println("exit");
    }
}

其他见自己的工程spring-boot-rabbitmq

总结:

常用队列形式:

1、1对1

  • 简单队列: sender –> queue –> consumer
  • exchange的diect模式:sender –> exchange –> queue –> consumer

2、1对多

  • 简单队列: sender –> queue –> 多个consumer(默认轮询)
  • 简单队列: sender –> queue –> 多个consumer(能者多劳,关闭自动ack)
  • exchange的订阅模式(fanout Exchange)
  • exchange的Topic(主题\通配符模式)
文章作者: 简凡丶
文章链接: http://yoursite.com/2020/01/15/4.%20%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/RabbitMQ%E5%85%A5%E9%97%A8/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 BestBear

评论