消息队列
消息队列简介
消息队列(MessageQueue)简称MQ,是一种在应用系统之间发送消息、存储消息并异步处理消息的技术
常见消息队列协议
- MQTT(Message Queuing Telemetry Transport):是一种轻量级的发布/订阅协议,特别适用于物联网和低带宽、不稳定网络环境下的通信
- JMS(Java Message Service):是Java平台上定义的消息队列API标准,它提供了发送、接收和处理消息的功能,JMS本身并不是一种协议,而是一种API规范,可以与多种消息中间件的协议进行交互,如ActiveMQ、RabbitMQ等
- AMQP(Advanced Message Queuing Protocol):是一种开放的、标准化的消息队列协议,它提供了一套统一的机制来进行异步通信
常见的MQ实现
比较常见的MQ实现:ActiveMQ、RabbitMQ、RocketMQ、Kafka
| 对比 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP等 | AMQP,XMPP,REST等 | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 万级(一般) | 万级(最差) | 十万级(最好) | 十万级(次之) |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
对于不同的消息队列有不同的优缺点,根据具体业务需求选择
| 需求 | 可选 |
|---|
| 追求可用性 | Kafka、RocketMQ、RabbitMQ |
| 追求可靠性 | RabbitMQ、RocketMQ |
| 追求吞吐能力 | RocketMQ、Kafka |
| 追求消息低延迟 | RabbitMQ、Kafka |
消息队列应用场景
异步提速
场景需求:用户注册后,需要发送注册邮件、发送注册短信
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,三个任务全部完成后返回给客户端
存在问题:邮件、短信只是一个通知,并不是必须的,这种做法让客户端等待没有必要等待的东西。

(2)并行方式:将注册信息写入数据库后,同时发送邮件、发送短信,三个任务完成后返回给客户端
存在问题:邮件、短信只是一个通知,并不是必须的,虽然并行的方式能提高处理的时间,但仍需要等待三个任务完成

(3)异步消息队列:将注册信息写入数据库后,把发送邮件,短信交给消息队列处理,直接返回客户端

应用解耦
场景需求:分布式系统之中,不同的微服务需要进行通信,以实现各种业务功能,常见的两种通信方式有:远程调用、使用MQ
(1)远程调用:A系统远程调用B系统的接口
存在问题:A系统和B系统高耦合,当A系统出现故障时,B系统就无法正常工作,导致整个系统的稳定性受到严重影响

(2)消息队列解耦:A系统将消息发布到队列中,B系统从队列中订阅并消费消息
好处:使得应用间解耦,就算A系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰
场景需求:在秒杀活动中,会有大批量的请求
(1)瞬时请求:在某个短时间内,系统接收到大量的请求,可能导致系统过载崩溃

(2)流量削峰:将请求排队到消息队列中,消费者再从队列中取出任务进行处理,避免系统过载

消息队列的劣势
- 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
- 系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用
- 一致性问题:在某些情况下,消息队列可能无法保证完全一致性。例如消息发送失败或消息消费者出现故障,可能会导致消息丢失或消息重复消费的情况
RabbitMQ基础
简介
RabbitMQ是由Erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛
基础架构

| 相关概念 | 描述 |
|---|
| 消息队列(Message Queue) | 用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中获取消息后进行处理 |
| 连接(Connection) | 客户端与消息队列服务器之间的网络连接。它提供了建立和管理通信会话的功能 |
| 通道(Channel) | 建立在连接之上的逻辑通信路径,用于在客户端和消息队列服务器之间传输消息。一个连接可以包含多个通道,提供了多路复用的能力。 |
| 生产者(Producer) | 消息的发送方,用于将消息发送到队列中等待被消费者获取 |
| 消费者(Consumer) | 消息的接收方,用于监听和处理队列中的消息 |
| 交换机(Exchange) | 接收生产者发送的消息,并将消息路由到对应的队列中 |
| 绑定(Binding) | 将队列和交换机联系在一起的过程,规定了消息应该如何路由到队列中 |
| 路由键(Routing Key) | 用于匹配交换机和队列之间绑定关系的关键字 |
工作模式
RabbitMQ提供了7种模式,官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
- 简单模式(Simple Mode):一个生产者发送消息到一个消费者。
- 工作队列模式(Work Queue Mode):一个生产者发送消息到多个消费者,每个消费者接收到的消息是唯一的。
- 发布与订阅模式(Publish/Subscribe Mode):一个生产者发送消息到一个交换机,多个消费者绑定到该交换机并同时接收消息。
- 路由模式(Routing Mode):一个生产者发送消息到一个交换机,消费者通过设置绑定键来选择性地接收消息。
- 主题模式(Topics Mode):一个生产者发送消息到一个交换机,消费者通过设置匹配模式(使用通配符)来接收感兴趣的消息。
- 远程调用模式(RPC Mode):一个客户端发送请求消息到一个服务端,服务端处理请求后返回响应消息给客户端。
- Publisher Confirms模式:生产者发送消息后,等待消息被确认后再进行下一步操作,确保消息可靠性。
单机部署
在线拉取镜像
1
| docker pull rabbitmq:3-management
|
使用命令加载镜像
运行MQ容器
默认账号认密码guest,当前改变了账号密码
1 2 3 4 5 6 7 8 9
| docker run \ -e RABBITMQ_DEFAULT_USER=wen \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name mq \ --hostname localhost \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
| 命令 | 解释 |
|---|
-e RABBITMQ_DEFAULT_USER=wen | 设置 RabbitMQ 默认用户名为 wen |
-e RABBITMQ_DEFAULT_PASS=123456 | 设置 RabbitMQ 默认密码为 123456 |
--name mq | 设置容器名为 mq |
--hostname localhost | 设置容器主机名为 localhost |
-p 15672:15672 | 映射容器中 RabbitMQ 管理页面的访问端口15672到 Docker 宿主机的同一端口15672 |
-p 5672:5672 | 映射容器中 RabbitMQ 的 AMQP 协议访问端口5672到 Docker 宿主机的同一端口5672 |
-d | 在后台运行容器 |
rabbitmq:3-management | 使用 RabbitMQ 官方提供的带有管理插件的 3 版本镜像启动容器 |
浏览器访问
打开浏览器,访问http://主机地址:15672/,输入命令设置的账号wen密码123456进入管理平台
常见消息模型
简单模式(Basic Queue)
简单模式是 RabbitMQ 中最简单的模式,仅涉及一个生产者和一个消费者。
- 生产者发送消息到队列
- 消费者从队列中取出消息进行处理

| 图解 | 简介 |
|---|
| producer | 生产者:也就是要发送消息的程序 |
| consumer | 消费者:消息的接收者,会一直等待消息到来 |
| queue | 消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息 |
工作队列模式(Work Queues)
工作队列模式也称为任务分发模式,主要应用于任务耗时较长的场景下。
- 生产者将任务发送到队列中
- 多个消费者可以同时从队列中取出任务进行处理,每个任务只会被一个消费者消费一次

| 图解 | 简介 |
|---|
| producer | 生产者:也就是要发送消息的程序 |
| consumer | 消费者1:消息的接收者,会一直等待消息到来 |
| consumer | 消费者2:消息的接收者,会一直等待消息到来 |
| queue | 消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息 |
发布订阅模式(Publish/Subscribe)
发布/订阅模式包含一个生产者、多个消费者和多个队列。
- 生产者将消息发布到交换机
- 交换机将消息传递给与其绑定的所有队列
- 每个消费者从自己订阅的队列中取出消息进行处理

| 图解 | 简介 |
|---|
| producer | 生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机) |
| consumer | 消费者:消息的接收者,会一直等待消息到来 |
| queue | 消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息 |
| Exchange | 交换机:一方面,接收生产者发送的消息。另一方面,知道如何处理消息 例如递交给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于Exchange的类型 Fanout广播模式 :不需要路由键,广播到所有队列, 将消息转发到所有与该交换器绑定的队列中 Direct直接匹配:消息携带的路由键, 根据消息携带的路由键将消息转发到对应的队列中 Topic主题匹配:路由键与 binding key 匹配,将消息转发到消息携带的路由键与 binding key 匹配的队列中 Headers首部匹配:首部信息匹配,通过判断消息携带的首部信息(headers)来决定将消息发送到哪些队列中 |
广播模式(Fanout)
广播模式是发布/订阅模式的一种特殊情况,它会将消息路由到所有与之绑定的队列中,无需指定路由键
路由模式(Routing)
路由模式是根据消息携带的路由键将消息路由到对应的队列中。
- 生产者将消息发送到交换机
- 交换机根据绑定的路由键将消息路由到对应的队列中
主题模式(Topic)
主题模式是根据消息携带的路由键进行匹配,但它比路由模式更加灵活,因为它支持通配符匹配。
- 生产者将消息发送到交换机
- 交换机将消息根据路由键和通配符进行匹配,并将消息路由到对应的队列中
头部模式是通过判断消息携带的自定义头部信息来决定将消息发送到哪些队列中。头部模式相比于其他模式更加灵活,但同时也增加了配置的复杂度
- 生产者在发送消息时可以在消息头部添加自定义属性和值
- 交换机根据这些属性和值来匹配并路由消息至对应的队列
RabbitMQ客户端的使用
准备工作
创建Maven工程引入相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.32</version> </dependency> </dependencies>
|
简单模式(Basic Queue)
(1)创建名为BasicQueue的队列,向BasicQueue队列发送一条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
public class Producer {
private static Connection connection; private static Channel channel; private static String QueueName = "BasicQueue";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException {
channel.queueDeclare(QueueName, true, false, false, null);
channel.basicPublish("", QueueName, null, "这是发送的一条消息".getBytes());
channel.close(); connection.close(); System.out.println("发送消息成功!"); }
}
|
(2)消费者接收BasicQueue队列的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
public class Consumer {
private static Connection connection; private static Channel channel; private static String QueueName = "BasicQueue";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException {
channel.queueDeclare(QueueName, true, false, false, null);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("消费者标签:" + consumerTag); System.out.println("消息信封" + envelope); System.out.println("消息属性:" + properties); System.out.println("消息主体:" + new String(body)); } };
channel.basicConsume(QueueName, true, consumer);
}
}
|
(3)控制台

工作队列模式(Work Queues)
(1)生产者创建一个名为WorkQueues的队列,一次发送10条消息到WorkQueues消息队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
public class Producer {
private static Connection connection; private static Channel channel; private static String QueueName = "WorkQueues";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException {
channel.queueDeclare(QueueName, true, false, false, null);
for (int i = 1; i <= 10; i++) { String body = "这是第" + i + "条消息"; channel.basicPublish("", QueueName, null, body.getBytes()); }
channel.close(); connection.close(); System.out.println("发送消息成功!");
}
}
|
(2)消费者1绑定WorkQueues消息队列接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
public class Consumer1 {
private static Connection connection; private static Channel channel; private static String QueueName = "WorkQueues";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName, true, consumer);
}
}
|
(3)消费者2绑定WorkQueues消息队列接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
public class Consumer2 {
private static Connection connection; private static Channel channel; private static String QueueName = "WorkQueues";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName, true, consumer);
} }
|
(4)启动消费者1和消费者2接收消息,控制台发现,两个消费者竞争消费消息

发布订阅模式(Publish/Subscribe)
广播模式(Fanout)
(1)生产者
- 创建广播模式类型的交换机,名称为FanoutExchange
- 创建两个队列,名称分别为FanoutQueue1、FanoutQueue2
- 将交换机FanoutExchange与队列FanoutQueue1、FanoutQueue2绑定
- 发送一条消息到交换机FanoutExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
public class Producer {
private static Connection connection; private static Channel channel; private static String ExchangeName = "FanoutExchange"; private static String QueueName1 = "FanoutQueue1"; private static String QueueName2 = "FanoutQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException {
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
channel.queueDeclare(QueueName1, true, false, false, null); channel.queueDeclare(QueueName2, true, false, false, null);
channel.queueBind(QueueName1, ExchangeName, "", null); channel.queueBind(QueueName2, ExchangeName, "", null);
channel.basicPublish(ExchangeName, "", null, "这是发送的一条消息".getBytes());
channel.close(); connection.close(); System.out.println("发送消息成功!");
} }
|
(2)消费者1绑定队列FanoutQueue1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
public class Consumer1 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "FanoutExchange"; private static String QueueName1 = "FanoutQueue1"; private static String QueueName2 = "FanoutQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName1, true, consumer);
} }
|
(3)消费者2绑定队列FanoutQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
public class Consumer2 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "FanoutExchange"; private static String QueueName1 = "FanoutQueue1"; private static String QueueName2 = "FanoutQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName2, true, consumer);
} }
|
(4)启动消费者1和消费者2接收消息,虽然生产者只发送了一条消息,但两个消费者都可以收到

路由模式(Routing)
(1)生产者
- 创建创建直接匹配类型的交换机,名称为RoutingExchange
- 创建两个队列,名称分别为RoutingQueue1、RoutingQueue2
- 将交换机RoutingExchange与队列RoutingQueue1绑定并指定路由键为error
- 将交换机RoutingExchange与队列RoutingQueue2绑定并指定路由键为info、error、warning
- 发送三种路由消息info、error、warning到交换机RoutingExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
public class Producer { private static Connection connection; private static Channel channel; private static String ExchangeName = "RoutingExchange"; private static String QueueName1 = "RoutingQueue1"; private static String QueueName2 = "RoutingQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException {
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare(QueueName1, true, false, false, null); channel.queueDeclare(QueueName2, true, false, false, null);
channel.queueBind(QueueName1, ExchangeName, "error", null); channel.queueBind(QueueName2, ExchangeName, "info", null); channel.queueBind(QueueName2, ExchangeName, "error", null); channel.queueBind(QueueName2, ExchangeName, "warning", null);
channel.basicPublish(ExchangeName, "info", null, "日志级别为info".getBytes()); channel.basicPublish(ExchangeName, "error", null, "日志级别为error".getBytes()); channel.basicPublish(ExchangeName, "warning", null, "日志级别为warning".getBytes());
channel.close(); connection.close(); System.out.println("发送消息成功!"); } }
|
(2)消费者1绑定队列RoutingQueue1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
public class Consumer1 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "RoutingExchange"; private static String QueueName1 = "RoutingQueue1"; private static String QueueName2 = "RoutingQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName1, true, consumer);
} }
|
(3)消费者2绑定队列RoutingQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
public class Consumer2 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "RoutingExchange"; private static String QueueName1 = "RoutingQueue1"; private static String QueueName2 = "RoutingQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName2, true, consumer);
} }
|
(4)启动消费者1和消费者2接收消息
- 消费者1只会收到路由键为error的消息
- 消费者2会收到路由键为info、error、warning的消息

主题模式(Topic)
(1)生产者
- 创建创建主题匹配类型的交换机,名称为TopicExchange
- 创建两个队列,名称分别为TopicQueue1、TopicQueue2
- 将交换机TopicExchange与队列TopicQueue1绑定并指定路由键为 a.* 和 #.png
- 将交换机TopicExchange与队列TopicQueue2绑定并指定路由键为 #.#
- 发送三种路由消息a.jpg、b.png、c.mp4到交换机TopicExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
public class Producer {
private static Connection connection; private static Channel channel; private static String ExchangeName = "TopicExchange"; private static String QueueName1 = "TopicQueue1"; private static String QueueName2 = "TopicQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException {
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
channel.queueDeclare(QueueName1, true, false, false, null); channel.queueDeclare(QueueName2, true, false, false, null);
channel.queueBind(QueueName1, ExchangeName, "a.*"); channel.queueBind(QueueName1, ExchangeName, "#.png"); channel.queueBind(QueueName2, ExchangeName, "#.#");
channel.basicPublish(ExchangeName, "a.jpg", null, "图片a.jpg".getBytes()); channel.basicPublish(ExchangeName, "b.png", null, "图片b.png".getBytes()); channel.basicPublish(ExchangeName, "c.mp4", null, "视频c.mp4".getBytes());
channel.close(); connection.close(); System.out.println("发送消息成功!");
} }
|
(2)消费者1绑定队列TopicQueue1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
public class Consumer1 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "TopicExchange"; private static String QueueName1 = "TopicQueue1"; private static String QueueName2 = "TopicQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body));
} };
channel.basicConsume(QueueName1, true, consumer);
} }
|
(3)消费者2绑定队列TopicQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
public class Consumer2 {
private static Connection connection; private static Channel channel; private static String ExchangeName = "TopicExchange"; private static String QueueName1 = "TopicQueue1"; private static String QueueName2 = "TopicQueue2";
static { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("localhost"); factory.setUsername("wen"); factory.setPassword("123456"); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
public static void main(String[] args) throws IOException, TimeoutException { Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的内容体:" + new String(body)); } };
channel.basicConsume(QueueName2, true, consumer);
} }
|
(4)启动消费者1和消费者2接收消息
- 消费者1只会收到路由键以a开头、png结尾的消息
- 消费者2会收到所有的消息

SpringBoot整合RabbitMQ
Spring AMQP简介
(1)Spring AMQP是基于AMQP协议实现的一个轻量级的消息中间件框架,原本是为了简化RabbitMQ的使用而开发的。
(2)Spring AMQP提供了一组抽象API,能够更容易地创建AMQP客户端,处理收发消息的逻辑
(3)Spring AMQP提供了面向对象的方式来发送和接收消息,支持广泛的消息传递模式,如点对点、发布/订阅、路由器和主题等。
(4)Spring AMQP还提供了一些附加功能,如事务、并发处理和消息序列化/反序列化等
(5)Spring AMQP关键部分
| 关键部分 | 简介 |
|---|
| ConnectionFactory | 连接到消息传递代理的工厂 |
| RabbitTemplate | 用于发送和接收消息 |
| MessageListenerContainer | 用于创建和管理消费者 |
| SimpleMessageListenerContainer | 一个方便的容器,用于为每个队列创建消费者 |
| MessageConverter | 用于将Java对象转换为可传输的消息格式和将消息转换回Java对象 |
| RabbitAdmin | 用于管理RabbitMQ中的队列、交换器和绑定等 |
| AmqpTemplate | 发送和接收消息的抽象接口 |
案例准备工作
(1)引入AMQP依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> </dependencies>
|
(2)配置文件
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: wen password: 123456 listener: simple: prefetch: 1
|
(3)主启动类
1 2 3 4 5 6
| @SpringBootApplication public class MqApplication { public static void main(String[] args) { SpringApplication.run(MqApplication.class, args); } }
|
简单模式(Basic Queue)
(1)创建一个配置类,声明一个队列BasicQueue
1 2 3 4 5 6 7 8 9 10
| @Configuration public class BasicConfig {
@Bean public Queue basicQueue() { return new Queue("BasicQueue", true, false, false); } }
|
(2)创建监听器,准备接收BasicQueue队列的消息
1 2 3 4 5 6 7 8 9
| @Component public class BasicQueueListener {
@RabbitListener(queues = "BasicQueue") public void listener(String msg) throws InterruptedException { System.out.println("简单模式(Basic Queue)消费者接收到消息:【" + msg + "】"); }
}
|
(3)编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@RestController public class BasicQueueController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/BasicQueue") public String send() { rabbitTemplate.convertAndSend("BasicQueue", "这是发送的一条消息"); return "发送消息成功!"; } }
|
工作队列模式(Work Queues)
(1)创建一个配置类,声明一个队列WorkQueues
1 2 3 4 5 6 7 8 9 10
| @Configuration public class WorkConfig {
@Bean public Queue workQueues() { return new Queue("WorkQueues", true, false, false); } }
|
(2)创建两个监听器,准备接收消息
- 消费者1绑定WorkQueues消息队列
- 消费者2绑定WorkQueues消息队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class WorkQueuesListener {
@RabbitListener(queues = "WorkQueues") public void listen1(String msg) throws InterruptedException { System.out.println("工作队列模式(WorkQueues)消费者1接收到消息:【" + msg + "】"); }
@RabbitListener(queues = "WorkQueues") public void listen2(String msg) throws InterruptedException { System.out.println("工作队列模式(WorkQueues)消费者2接收到消息:【" + msg + "】"); }
}
|
(3)编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@RestController public class WorkQueuesController { @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/WorkQueues") public String send() throws InterruptedException { for (int i = 1; i <= 10; i++) { String message = "这是第" + i + "条消息"; rabbitTemplate.convertAndSend("WorkQueues", message); } return "发送消息成功!"; } }
|
发布订阅模式(Publish/Subscribe)
广播模式(Fanout)
(1)创建一个配置类,声明 广播模式类型 的交换机FanoutExchange,两个队列FanoutQueue1、FanoutQueue2
- 将交换机FanoutExchange与队列FanoutQueue1绑定
- 将交换机FanoutExchange与队列FanoutQueue2绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Configuration public class FanoutConfig {
private static String ExchangeName = "FanoutExchange"; private static String QueueName1 = "FanoutQueue1"; private static String QueueName2 = "FanoutQueue2";
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(ExchangeName, true, false); }
@Bean public Queue fanoutQueue1() { return new Queue(QueueName1, true, false, false); }
@Bean public Queue fanoutQueue2() { return new Queue(QueueName2, true, false, false); }
@Bean public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder .bind(fanoutQueue1) .to(fanoutExchange); }
@Bean public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder .bind(fanoutQueue2) .to(fanoutExchange); } }
|
(2)创建两个监听器,准备接收消息
- 消费者1绑定队列FanoutQueue1
- 消费者2绑定队列FanoutQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class FanoutListener {
@RabbitListener(queues = "FanoutQueue1") public void listen1(String msg) throws InterruptedException { System.out.println("广播模式(Fanout)消费者1接收到消息:【" + msg + "】"); }
@RabbitListener(queues = "FanoutQueue2") public void listen2(String msg) throws InterruptedException { System.out.println("广播模式(Fanout)消费者2接收到消息:【" + msg + "】"); }
}
|
(3)编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@RestController public class FanoutController { @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/Fanout") public String SendMessage() { rabbitTemplate.convertAndSend("FanoutExchange", "", "这是发送的一条消息"); return "发送消息成功!"; } }
|
路由模式(Routing)
(1)创建一个配置类,声明 直接匹配类型 的交换机DirectExchange,两个队列FanoutQueue1、FanoutQueue2
- 将交换机RoutingExchange与队列RoutingQueue1绑定并指定路由键为red
- 将交换机RoutingExchange与队列RoutingQueue2绑定并指定路由键为blue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Configuration public class RoutingConfig { private static String ExchangeName = "RoutingExchange"; private static String QueueName1 = "RoutingQueue1"; private static String QueueName2 = "RoutingQueue2";
@Bean public DirectExchange directExchange() { return new DirectExchange(ExchangeName, true, false); }
@Bean public Queue routingQueue1() { return new Queue(QueueName1, true, false, false); }
@Bean public Queue routingQueue2() { return new Queue(QueueName2, true, false, false); }
@Bean public Binding bindingRoutingQueue1(Queue routingQueue1, DirectExchange directExchange) { return BindingBuilder .bind(routingQueue1) .to(directExchange) .with("red"); }
@Bean public Binding bindingRoutingQueue2(Queue routingQueue2, DirectExchange directExchange) { return BindingBuilder .bind(routingQueue2) .to(directExchange) .with("blue"); } }
|
(2)创建两个监听器,准备接收消息
- 消费者1绑定队列FanoutQueue1
- 消费者2绑定队列FanoutQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class RoutingListener {
@RabbitListener(queues ="RoutingQueue1") public void listen1(String msg) throws InterruptedException { System.out.println("路由模式(Routing)消费者1接收到消息:【" + msg + "】"); }
@RabbitListener(queues ="RoutingQueue2") public void listen2(String msg) throws InterruptedException { System.out.println("路由模式(Routing)消费者2接收到消息:【" + msg + "】"); } }
|
(3)编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@RestController public class RoutingController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/Routing") public String SendMessage() { rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色"); rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色"); rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色"); rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色"); rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色"); rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色"); return "发送消息成功!"; }
}
|
主题模式(Topic)
(1)创建一个配置类,声明 主题匹配类型 的交换机TopicExchange,两个队列TopicQueue1、TopicQueue2
- 将交换机TopicExchange与队列TopicQueue1绑定并指定路由键为
*.png - 将交换机TopicExchange与队列TopicQueue2绑定并指定路由键为
#.#
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Configuration public class TopicConfig {
private static String ExchangeName = "TopicExchange"; private static String QueueName1 = "TopicQueue1"; private static String QueueName2 = "TopicQueue2";
@Bean public TopicExchange topicExchange() { return new TopicExchange(ExchangeName, true, false); }
@Bean public Queue topicQueue1() { return new Queue(QueueName1, true, false, false); }
@Bean public Queue topicQueue2() { return new Queue(QueueName2, true, false, false); }
@Bean public Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder .bind(topicQueue1) .to(topicExchange) .with("*.png"); }
@Bean public Binding bindingTopicQueue2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder .bind(topicQueue2) .to(topicExchange) .with("#.#"); } }
|
(2)创建两个监听器,准备接收消息
- 消费者1绑定队列TopicQueue1
- 消费者2绑定队列TopicQueue2
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class TopicListener {
@RabbitListener(queues ="TopicQueue1") public void listen1(String msg) throws InterruptedException { System.out.println("主题模式(Topic)消费者1接收到消息:【" + msg + "】"); }
@RabbitListener(queues ="TopicQueue2") public void listen2(String msg) throws InterruptedException { System.out.println("主题模式(Topic)消费者2接收到消息:【" + msg + "】"); } }
|
(3)编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@RestController public class TopicController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/Topic") public String SendMessage() { rabbitTemplate.convertAndSend("TopicExchange", "a.jpg", "jpg图片"); rabbitTemplate.convertAndSend("TopicExchange", "b.png", "png图片"); rabbitTemplate.convertAndSend("TopicExchange", "c.mp4", "mp4视频"); return "发送消息成功!"; }
}
|