消息队列

消息队列简介

消息队列(MessageQueue)简称MQ,是一种在应用系统之间发送消息、存储消息并异步处理消息的技术

常见消息队列协议

  • MQTT(Message Queuing Telemetry Transport):是一种轻量级的发布/订阅协议,特别适用于物联网和低带宽、不稳定网络环境下的通信
  • JMS(Java Message Service):是Java平台上定义的消息队列API标准,它提供了发送、接收和处理消息的功能,JMS本身并不是一种协议,而是一种API规范,可以与多种消息中间件的协议进行交互,如ActiveMQ、RabbitMQ等
  • AMQP(Advanced Message Queuing Protocol):是一种开放的、标准化的消息队列协议,它提供了一套统一的机制来进行异步通信

常见的MQ实现

比较常见的MQ实现:ActiveMQ、RabbitMQ、RocketMQ、Kafka

对比RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP等AMQP,XMPP,REST等自定义协议自定义协议
可用性一般
单机吞吐量万级(一般)万级(最差)十万级(最好)十万级(次之)
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

对于不同的消息队列有不同的优缺点,根据具体业务需求选择

需求可选
追求可用性Kafka、RocketMQ、RabbitMQ
追求可靠性RabbitMQ、RocketMQ
追求吞吐能力RocketMQ、Kafka
追求消息低延迟RabbitMQ、Kafka

消息队列应用场景

异步提速

场景需求:用户注册后,需要发送注册邮件、发送注册短信

(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,三个任务全部完成后返回给客户端

存在问题:邮件、短信只是一个通知,并不是必须的,这种做法让客户端等待没有必要等待的东西。

(2)并行方式:将注册信息写入数据库后,同时发送邮件、发送短信,三个任务完成后返回给客户端

存在问题:邮件、短信只是一个通知,并不是必须的,虽然并行的方式能提高处理的时间,但仍需要等待三个任务完成

(3)异步消息队列:将注册信息写入数据库后,把发送邮件,短信交给消息队列处理,直接返回客户端

应用解耦

场景需求:分布式系统之中,不同的微服务需要进行通信,以实现各种业务功能,常见的两种通信方式有:远程调用、使用MQ

(1)远程调用:A系统远程调用B系统的接口

存在问题:A系统和B系统高耦合,当A系统出现故障时,B系统就无法正常工作,导致整个系统的稳定性受到严重影响

(2)消息队列解耦:A系统将消息发布到队列中,B系统从队列中订阅并消费消息

好处:使得应用间解耦,就算A系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰

场景需求:在秒杀活动中,会有大批量的请求

(1)瞬时请求:在某个短时间内,系统接收到大量的请求,可能导致系统过载崩溃

(2)流量削峰:将请求排队到消息队列中,消费者再从队列中取出任务进行处理,避免系统过载

消息队列的劣势

  • 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
  • 系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用
  • 一致性问题:在某些情况下,消息队列可能无法保证完全一致性。例如消息发送失败或消息消费者出现故障,可能会导致消息丢失或消息重复消费的情况

RabbitMQ基础

简介

RabbitMQ是由Erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛

基础架构

相关概念描述
消息队列(Message Queue)用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中获取消息后进行处理
连接(Connection)客户端与消息队列服务器之间的网络连接。它提供了建立和管理通信会话的功能
通道(Channel)建立在连接之上的逻辑通信路径,用于在客户端和消息队列服务器之间传输消息。一个连接可以包含多个通道,提供了多路复用的能力。
生产者(Producer)消息的发送方,用于将消息发送到队列中等待被消费者获取
消费者(Consumer)消息的接收方,用于监听和处理队列中的消息
交换机(Exchange)接收生产者发送的消息,并将消息路由到对应的队列中
绑定(Binding)将队列和交换机联系在一起的过程,规定了消息应该如何路由到队列中
路由键(Routing Key)用于匹配交换机和队列之间绑定关系的关键字

工作模式

RabbitMQ提供了7种模式,官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

  1. 简单模式(Simple Mode):一个生产者发送消息到一个消费者。
  2. 工作队列模式(Work Queue Mode):一个生产者发送消息到多个消费者,每个消费者接收到的消息是唯一的。
  3. 发布与订阅模式(Publish/Subscribe Mode):一个生产者发送消息到一个交换机,多个消费者绑定到该交换机并同时接收消息。
  4. 路由模式(Routing Mode):一个生产者发送消息到一个交换机,消费者通过设置绑定键来选择性地接收消息。
  5. 主题模式(Topics Mode):一个生产者发送消息到一个交换机,消费者通过设置匹配模式(使用通配符)来接收感兴趣的消息。
  6. 远程调用模式(RPC Mode):一个客户端发送请求消息到一个服务端,服务端处理请求后返回响应消息给客户端。
  7. Publisher Confirms模式:生产者发送消息后,等待消息被确认后再进行下一步操作,确保消息可靠性。

单机部署

在线拉取镜像

1
docker pull rabbitmq:3-management

使用命令加载镜像

1
docker load -i mq.tar

运行MQ容器

默认账号认密码guest,当前改变了账号密码

1
2
3
4
5
6
7
8
9
docker run \
-e RABBITMQ_DEFAULT_USER=wen \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname localhost \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
命令解释
-e RABBITMQ_DEFAULT_USER=wen设置 RabbitMQ 默认用户名为 wen
-e RABBITMQ_DEFAULT_PASS=123456设置 RabbitMQ 默认密码为 123456
--name mq设置容器名为 mq
--hostname localhost设置容器主机名为 localhost
-p 15672:15672映射容器中 RabbitMQ 管理页面的访问端口15672到 Docker 宿主机的同一端口15672
-p 5672:5672映射容器中 RabbitMQ 的 AMQP 协议访问端口5672到 Docker 宿主机的同一端口5672
-d在后台运行容器
rabbitmq:3-management使用 RabbitMQ 官方提供的带有管理插件的 3 版本镜像启动容器

浏览器访问

打开浏览器,访问http://主机地址:15672/,输入命令设置的账号wen密码123456进入管理平台

常见消息模型

简单模式(Basic Queue)

简单模式是 RabbitMQ 中最简单的模式,仅涉及一个生产者和一个消费者。

  • 生产者发送消息到队列
  • 消费者从队列中取出消息进行处理
图解简介
producer生产者:也就是要发送消息的程序
consumer消费者:消息的接收者,会一直等待消息到来
queue消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

工作队列模式(Work Queues)

工作队列模式也称为任务分发模式,主要应用于任务耗时较长的场景下。

  • 生产者将任务发送到队列中
  • 多个消费者可以同时从队列中取出任务进行处理,每个任务只会被一个消费者消费一次
图解简介
producer生产者:也就是要发送消息的程序
consumer消费者1:消息的接收者,会一直等待消息到来
consumer消费者2:消息的接收者,会一直等待消息到来
queue消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

发布订阅模式(Publish/Subscribe)

发布/订阅模式包含一个生产者、多个消费者和多个队列。

  • 生产者将消息发布到交换机
  • 交换机将消息传递给与其绑定的所有队列
  • 每个消费者从自己订阅的队列中取出消息进行处理
图解简介
producer生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
consumer消费者:消息的接收者,会一直等待消息到来
queue消息队列:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
Exchange交换机:一方面,接收生产者发送的消息。另一方面,知道如何处理消息
例如递交给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于Exchange的类型
Fanout广播模式 :不需要路由键,广播到所有队列, 将消息转发到所有与该交换器绑定的队列中
Direct直接匹配:消息携带的路由键, 根据消息携带的路由键将消息转发到对应的队列中
Topic主题匹配:路由键与 binding key 匹配,将消息转发到消息携带的路由键与 binding key 匹配的队列中
Headers首部匹配:首部信息匹配,通过判断消息携带的首部信息(headers)来决定将消息发送到哪些队列中

广播模式(Fanout)

广播模式是发布/订阅模式的一种特殊情况,它会将消息路由到所有与之绑定的队列中,无需指定路由键

路由模式(Routing)

路由模式是根据消息携带的路由键将消息路由到对应的队列中。

  • 生产者将消息发送到交换机
  • 交换机根据绑定的路由键将消息路由到对应的队列中

主题模式(Topic)

主题模式是根据消息携带的路由键进行匹配,但它比路由模式更加灵活,因为它支持通配符匹配。

  • 生产者将消息发送到交换机
  • 交换机将消息根据路由键和通配符进行匹配,并将消息路由到对应的队列中

头部模式(Header)

头部模式是通过判断消息携带的自定义头部信息来决定将消息发送到哪些队列中。头部模式相比于其他模式更加灵活,但同时也增加了配置的复杂度

  • 生产者在发送消息时可以在消息头部添加自定义属性和值
  • 交换机根据这些属性和值来匹配并路由消息至对应的队列

RabbitMQ客户端的使用

准备工作

创建Maven工程引入相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<!-- RabbitMQ客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!-- 将所有日志记录静默丢弃,方便看效果 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.32</version>
</dependency>
</dependencies>

简单模式(Basic Queue)

(1)创建名为BasicQueue的队列,向BasicQueue队列发送一条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* 生产者:发送消息
*/
public class Producer {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String QueueName = "BasicQueue";// 队列名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

/**
* 创建队列,发送消息
*/
public static void main(String[] args) throws IOException, TimeoutException {
/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,消息会被持久化到磁盘上,服务器重启之后队列仍然存在
3. exclusive:是否独占,默认为 false。若设置为 true,则表示只有当前连接创建的消费者才能访问该队列;其他连接或客户端无法访问该队列。
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。若设置为 true,则表示当没有任何消费者连接到该队列时,该队列将自动被删除。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数,比如队列的过期时间、最大长度等。
*/
//(1)创建名为BasicQueue的队列(Queue)没有则会创建
channel.queueDeclare(QueueName, true, false, false, null);

/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。指定发布消息到哪个交换机(exchange),未指定会使用默认的 DirectExchange
2. routingKey:消息的路由键,即消息要发送到的队列名称
3. props:消息的属性,默认为null,可以指定消息的优先级、过期时间等
4. body:消息体的字节数组,即要发送的实际消息内容
*/
//(2)发送消息到指定队列(Queue)
channel.basicPublish("", QueueName, null, "这是发送的一条消息".getBytes());

//(3)断开RabbitMQ连接,释放资源
channel.close();
connection.close();
System.out.println("发送消息成功!");
}

}

(2)消费者接收BasicQueue队列的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* 消费者:接收消息
*/
public class Consumer {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String QueueName = "BasicQueue";// 队列名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

/**
* 创建队列,接收消息
*/
public static void main(String[] args) throws IOException {

/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,则服务器重启之后队列仍然存在
3. exclusive:是否独占。只能有一个消费者监听这队列,当Connection连接关闭时,是否删除该队列
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数。
*/
//(1)创建名为BasicQueue的队列(Queue)没有则会创建
channel.queueDeclare(QueueName, true, false, false, null);

//(2)回调对象,用于处理接收到的消息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消费者标签:" + consumerTag);
System.out.println("消息信封" + envelope);
System.out.println("消息属性:" + properties);
System.out.println("消息主体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(3)接收消息
channel.basicConsume(QueueName, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/
}

}

(3)控制台

工作队列模式(Work Queues)

(1)生产者创建一个名为WorkQueues的队列,一次发送10条消息到WorkQueues消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 生产者:发送消息
*/
public class Producer {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String QueueName = "WorkQueues";// 队列名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,消息会被持久化到磁盘上,服务器重启之后队列仍然存在
3. exclusive:是否独占,默认为 false。若设置为 true,则表示只有当前连接创建的消费者才能访问该队列;其他连接或客户端无法访问该队列。
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。若设置为 true,则表示当没有任何消费者连接到该队列时,该队列将自动被删除。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数,比如队列的过期时间、最大长度等。
*/
//(1)创建名为WorkQueues的队列(Queue)没有则会创建
channel.queueDeclare(QueueName, true, false, false, null);

/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。指定发布消息到哪个交换机(exchange),未指定会使用默认的 DirectExchange
2. routingKey:消息的路由键,即消息要发送到的队列名称
3. props:消息的属性,默认为null,可以指定消息的优先级、过期时间等
4. body:消息体的字节数组,即要发送的实际消息内容
*/
//(2)发送消息到指定队列(Queue)
for (int i = 1; i <= 10; i++) {
String body = "这是第" + i + "条消息";
channel.basicPublish("", QueueName, null, body.getBytes());
}

//(3)断开RabbitMQ连接,释放资源
channel.close();
connection.close();
System.out.println("发送消息成功!");

}

}

(2)消费者1绑定WorkQueues消息队列接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* 消费者1:接收消息
*/
public class Consumer1 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String QueueName = "WorkQueues";// 队列名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

/**
* 创建队列,接收消息
*/
public static void main(String[] args) throws IOException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/
}

}

(3)消费者2绑定WorkQueues消息队列接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 消费者2:接收消息
*/
public class Consumer2 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String QueueName = "WorkQueues";// 队列名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

/**
* 创建队列,接收消息
*/
public static void main(String[] args) throws IOException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/
}
}

(4)启动消费者1和消费者2接收消息,控制台发现,两个消费者竞争消费消息

发布订阅模式(Publish/Subscribe)

广播模式(Fanout)

(1)生产者

  • 创建广播模式类型的交换机,名称为FanoutExchange
  • 创建两个队列,名称分别为FanoutQueue1、FanoutQueue2
  • 将交换机FanoutExchange与队列FanoutQueue1、FanoutQueue2绑定
  • 发送一条消息到交换机FanoutExchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 生产者:发送消息
*/
public class Producer {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "FanoutExchange";// 交换机名称
private static String QueueName1 = "FanoutQueue1";// 队列1名称
private static String QueueName2 = "FanoutQueue2";// 队列2名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
/**
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1.exchange:交换器名称。
2.type:交换器类型,常见的有以下几种。
direct:直接匹配,根据消息携带的路由键(routing key)将消息转发到对应的队列中。
fanout:广播模式,将消息转发到所有与该交换器绑定的队列中。
topic:主题匹配,将消息转发到消息携带的路由键与 binding key 匹配的队列中。
headers:首部匹配,通过判断消息携带的首部信息(headers)来决定将消息发送到哪些队列中。
3.durable:是否持久化,如果设置为 true,表示在 RabbitMQ 服务器重启后该 Exchange 仍然存在。
4.autoDelete:是否自动删除,如果设置为 true,表示当该 Exchange 没有与任何队列绑定时,将自动从 RabbitMQ 服务器中删除。
5.internal:是否是内部的,如果设置为 true,表示该 Exchange 不允许通过发布消息的方式进行发送,只能被 Exchange 到 Exchange 绑定使用。
6.arguments:其他属性,可以设置一些额外的参数,比如过期时间、备份 Exchange 等等,它们作为一个键值对存放在该 Map 中。
*/
//(1)创建 广播模式类型 的交换机
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);

/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,消息会被持久化到磁盘上,服务器重启之后队列仍然存在
3. exclusive:是否独占,默认为 false。若设置为 true,则表示只有当前连接创建的消费者才能访问该队列;其他连接或客户端无法访问该队列。
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。若设置为 true,则表示当没有任何消费者连接到该队列时,该队列将自动被删除。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数,比如队列的过期时间、最大长度等。
*/
//(2)创建两个队列
channel.queueDeclare(QueueName1, true, false, false, null);
channel.queueDeclare(QueueName2, true, false, false, null);

/**
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列的名称,需要事先创建好。
2.exchange:交换器的名称,需要事先创建好。
3.routingKey:用于绑定的路由键,它是一个字符串,可以为空字符串。如果交换机的类型为fanout,routingKey设置为""
4.arguments:用于绑定的参数,它们作为一个键值对存放在该 Map 中
*/
//(3)绑定队列和交换机
channel.queueBind(QueueName1, ExchangeName, "", null);
channel.queueBind(QueueName2, ExchangeName, "", null);

/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。指定发布消息到哪个交换机(exchange),未指定会使用默认的 DirectExchange
2. routingKey:消息的路由键,即消息要发送到的队列名称
3. props:消息的属性,默认为null,可以指定消息的优先级、过期时间等
4. body:消息体的字节数组,即要发送的实际消息内容
*/
//(4)发送消息
channel.basicPublish(ExchangeName, "", null, "这是发送的一条消息".getBytes());

//(5)断开RabbitMQ连接,释放资源
channel.close();
connection.close();
System.out.println("发送消息成功!");

}
}

(2)消费者1绑定队列FanoutQueue1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 消费者1:接收消息
*/
public class Consumer1 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "FanoutExchange";// 交换机名称
private static String QueueName1 = "FanoutQueue1";// 队列1名称
private static String QueueName2 = "FanoutQueue2";// 队列2名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};
/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName1, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/

}
}

(3)消费者2绑定队列FanoutQueue2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 消费者2:接收消息
*/
public class Consumer2 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "FanoutExchange";// 交换机名称
private static String QueueName1 = "FanoutQueue1";// 队列1名称
private static String QueueName2 = "FanoutQueue2";// 队列2名称

/**
* 创建与RabbitMQ的连接
* Connection对象代表了生产者或消费者与消息中间件之间的一条TCP连接。
* 每个Connection对象都包含了一个或多个独立的信道(Channel),可以在这些信道之间并行地发送和接收消息
*/
static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName2, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/

}
}

(4)启动消费者1和消费者2接收消息,虽然生产者只发送了一条消息,但两个消费者都可以收到

路由模式(Routing)

(1)生产者

  • 创建创建直接匹配类型的交换机,名称为RoutingExchange
  • 创建两个队列,名称分别为RoutingQueue1、RoutingQueue2
  • 将交换机RoutingExchange与队列RoutingQueue1绑定并指定路由键为error
  • 将交换机RoutingExchange与队列RoutingQueue2绑定并指定路由键为info、error、warning
  • 发送三种路由消息info、error、warning到交换机RoutingExchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 生产者:发送消息
*/
public class Producer {
private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "RoutingExchange";// 交换机名称
private static String QueueName1 = "RoutingQueue1";// 队列1名称
private static String QueueName2 = "RoutingQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
/**
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1.exchange:交换器名称。
2.type:交换器类型,常见的有以下几种。
direct:直接匹配,根据消息携带的路由键(routing key)将消息转发到对应的队列中。
fanout:广播模式,将消息转发到所有与该交换器绑定的队列中。
topic:主题匹配,将消息转发到消息携带的路由键与 binding key 匹配的队列中。
headers:首部匹配,通过判断消息携带的首部信息(headers)来决定将消息发送到哪些队列中。
3.durable:是否持久化,如果设置为 true,表示在 RabbitMQ 服务器重启后该 Exchange 仍然存在。
4.autoDelete:是否自动删除,如果设置为 true,表示当该 Exchange 没有与任何队列绑定时,将自动从 RabbitMQ 服务器中删除。
5.internal:是否是内部的,如果设置为 true,表示该 Exchange 不允许通过发布消息的方式进行发送,只能被 Exchange 到 Exchange 绑定使用。
6.arguments:其他属性,可以设置一些额外的参数,比如过期时间、备份 Exchange 等等,它们作为一个键值对存放在该 Map 中。
*/
//(1)创建 直接匹配类型 的交换机
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,消息会被持久化到磁盘上,服务器重启之后队列仍然存在
3. exclusive:是否独占,默认为 false。若设置为 true,则表示只有当前连接创建的消费者才能访问该队列;其他连接或客户端无法访问该队列。
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。若设置为 true,则表示当没有任何消费者连接到该队列时,该队列将自动被删除。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数,比如队列的过期时间、最大长度等。
*/
//(2)创建两个队列
channel.queueDeclare(QueueName1, true, false, false, null);
channel.queueDeclare(QueueName2, true, false, false, null);

/**
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列的名称,需要事先创建好。
2.exchange:交换器的名称,需要事先创建好。
3.routingKey:用于绑定的路由键,用来决定该消息应该发送到哪个队列(交换机的类型为fanout,routingKey要设置为"")
4.arguments:用于绑定的参数,它们作为一个键值对存放在该 Map 中
*/
//(3)绑定队列和交换机
// 队列1绑定 error 路由键
channel.queueBind(QueueName1, ExchangeName, "error", null);
// 队列2绑定 info、error、warning 路由键
channel.queueBind(QueueName2, ExchangeName, "info", null);
channel.queueBind(QueueName2, ExchangeName, "error", null);
channel.queueBind(QueueName2, ExchangeName, "warning", null);

/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。指定发布消息到哪个交换机(exchange),未指定会使用默认的 DirectExchange
2. routingKey:消息的路由键,即消息要发送到的队列名称
3. props:消息的属性,默认为null,可以指定消息的优先级、过期时间等
4. body:消息体的字节数组,即要发送的实际消息内容
*/
//(4)发送三种路由消息到交换机
channel.basicPublish(ExchangeName, "info", null, "日志级别为info".getBytes());
channel.basicPublish(ExchangeName, "error", null, "日志级别为error".getBytes());
channel.basicPublish(ExchangeName, "warning", null, "日志级别为warning".getBytes());

//(5)断开RabbitMQ连接,释放资源
channel.close();
connection.close();
System.out.println("发送消息成功!");
}
}

(2)消费者1绑定队列RoutingQueue1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 消费者1:接收消息
*/
public class Consumer1 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "RoutingExchange";// 交换机名称
private static String QueueName1 = "RoutingQueue1";// 队列1名称
private static String QueueName2 = "RoutingQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName1, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/

}
}

(3)消费者2绑定队列RoutingQueue2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 消费者2:接收消息
*/
public class Consumer2 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "RoutingExchange";// 交换机名称
private static String QueueName1 = "RoutingQueue1";// 队列1名称
private static String QueueName2 = "RoutingQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者标签,用于识别不同的消费者。
2. envelope:消息信封,包括消息的路由键、交换机名称、消息 ID 等信息。
3. properties:消息属性,包括消息的持久性、优先级、过期时间等信息。
4. body:消息主体,即发送方发送的具体内容。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认消息,默认为 true,即自动应答,设置为false需要手动调用channel.basicAck()确认接收到消息
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName2, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/
}
}

(4)启动消费者1和消费者2接收消息

  • 消费者1只会收到路由键为error的消息
  • 消费者2会收到路由键为info、error、warning的消息

主题模式(Topic)

(1)生产者

  • 创建创建主题匹配类型的交换机,名称为TopicExchange
  • 创建两个队列,名称分别为TopicQueue1、TopicQueue2
  • 将交换机TopicExchange与队列TopicQueue1绑定并指定路由键为 a.* 和 #.png
  • 将交换机TopicExchange与队列TopicQueue2绑定并指定路由键为 #.#
  • 发送三种路由消息a.jpg、b.png、c.mp4到交换机TopicExchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 生产者:发送消息
*/
public class Producer {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "TopicExchange";// 交换机名称
private static String QueueName1 = "TopicQueue1";// 队列1名称
private static String QueueName2 = "TopicQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
/**
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1.exchange:交换器名称。
2.type:交换器类型,常见的有以下几种。
direct:直接匹配,根据消息携带的路由键(routing key)将消息转发到对应的队列中。
fanout:广播模式,将消息转发到所有与该交换器绑定的队列中。
topic:主题匹配,将消息转发到消息携带的路由键与 binding key 匹配的队列中。
headers:首部匹配,通过判断消息携带的首部信息(headers)来决定将消息发送到哪些队列中。
3.durable:是否持久化,如果设置为 true,表示在 RabbitMQ 服务器重启后该 Exchange 仍然存在。
4.autoDelete:是否自动删除,如果设置为 true,表示当该 Exchange 没有与任何队列绑定时,将自动从 RabbitMQ 服务器中删除。
5.internal:是否是内部的,如果设置为 true,表示该 Exchange 不允许通过发布消息的方式进行发送,只能被 Exchange 到 Exchange 绑定使用。
6.arguments:其他属性,可以设置一些额外的参数,比如过期时间、备份 Exchange 等等,它们作为一个键值对存放在该 Map 中。
*/
//(1)创建 主题匹配类型 的交换机
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称,可选参数,如果不指定则RabbitMQ会随机生成一个名称
2. durable:是否持久化,默认为false。如果设置为true,消息会被持久化到磁盘上,服务器重启之后队列仍然存在
3. exclusive:是否独占,默认为 false。若设置为 true,则表示只有当前连接创建的消费者才能访问该队列;其他连接或客户端无法访问该队列。
4. autoDelete:当所有消费者都断开连接之后,是否自动删除队列。默认为false。若设置为 true,则表示当没有任何消费者连接到该队列时,该队列将自动被删除。
5. arguments:额外参数,可以传递一些键值对作为队列的附加参数,比如队列的过期时间、最大长度等。
*/
//(2)创建两个队列
channel.queueDeclare(QueueName1, true, false, false, null);
channel.queueDeclare(QueueName2, true, false, false, null);

/**
queueBind(String queue, String exchange, String routingKey)
参数:
1.queue:队列的名称,需要事先创建好。
2.exchange:交换器的名称,需要事先创建好。
3.routingKey:用于绑定的路由键,用来决定该消息应该发送到哪个队列(交换机的类型为fanout,routingKey要设置为"")
4.arguments:用于绑定的参数,它们作为一个键值对存放在该 Map 中
*/
//(3)绑定队列和交换机
// 队列1绑定 #.png 和 xxx.* 路由键
channel.queueBind(QueueName1, ExchangeName, "a.*");// 匹配以a开头的路由字符
channel.queueBind(QueueName1, ExchangeName, "#.png");// 匹配以png结尾的路由字符
// 队列2绑定 #.# 路由键
channel.queueBind(QueueName2, ExchangeName, "#.#");// 匹配任意字符

/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。指定发布消息到哪个交换机(exchange),未指定会使用默认的 DirectExchange
2. routingKey:消息的路由键,即消息要发送到的队列名称
3. props:消息的属性,默认为null,可以指定消息的优先级、过期时间等
4. body:消息体的字节数组,即要发送的实际消息内容
*/
//(4)发送消息到交换机
channel.basicPublish(ExchangeName, "a.jpg", null, "图片a.jpg".getBytes());
channel.basicPublish(ExchangeName, "b.png", null, "图片b.png".getBytes());
channel.basicPublish(ExchangeName, "c.mp4", null, "视频c.mp4".getBytes());

//(5)断开RabbitMQ连接,释放资源
channel.close();
connection.close();
System.out.println("发送消息成功!");

}
}

(2)消费者1绑定队列TopicQueue1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 消费者1:接收消息
*/
public class Consumer1 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "TopicExchange";// 交换机名称
private static String QueueName1 = "TopicQueue1";// 队列1名称
private static String QueueName2 = "TopicQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者的标签,在多个消费者竞争同一个队列时可以用来区分不同的消费者
2. envelope:表示消息的信封,包含一些元数据信息,如交换机、路由键等
3. properties:配置信息,包括消息头和其他自定义属性
4. body:表示消息的内容体,即要消费的消息内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));

}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认,如果设置为false,表示需要手动调用channel.basicAck()方法来确认接收到消息,否则消息会一直保持未被确认状态,直到消费者断开连接
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName1, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/

}
}

(3)消费者2绑定队列TopicQueue2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 消费者2:接收消息
*/
public class Consumer2 {

private static Connection connection; // 与 RabbitMQ 服务器之间的 TCP 连接对象,每个连接可以拥有多个 Channel
private static Channel channel;// 通过 Connection 创建的轻量级连接对象,主要用于发送和接收消息
private static String ExchangeName = "TopicExchange";// 交换机名称
private static String QueueName1 = "TopicQueue1";// 队列1名称
private static String QueueName2 = "TopicQueue2";// 队列2名称

static {
//(1)创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//(2)设置连接参数
factory.setHost("localhost");// 设置RabbitMQ服务器地址(默认值 localhost)
factory.setPort(5672); // 设置RabbitMQ服务器端口号 默认值 5672
factory.setVirtualHost("localhost");// 设置虚拟主机名 默认值/
factory.setUsername("wen");// 设置用户名 默认 guest
factory.setPassword("123456");// 设置密码 默认值 guest
try {
//(3)创建应用程序与RabbitMQ消息中间件之间通信的入口connection
connection = factory.newConnection();
//(4)创建虚拟通道 Channel 用于发送和接收消息
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, TimeoutException {
//(1)创建回调对象,用于处理接收到的消息
Consumer consumer = new DefaultConsumer(channel) {
/**
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:消费者的标签,在多个消费者竞争同一个队列时可以用来区分不同的消费者
2. envelope:表示消息的信封,包含一些元数据信息,如交换机、路由键等
3. properties:配置信息,包括消息头和其他自定义属性
4. body:表示消息的内容体,即要消费的消息内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息的内容体:" + new String(body));
}
};

/**
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,表示要从哪个队列中消费消息
2. autoAck:是否自动确认,如果设置为false,表示需要手动调用channel.basicAck()方法来确认接收到消息,否则消息会一直保持未被确认状态,直到消费者断开连接
3. callback:回调对象,用于处理接收到的消息
*/
//(2)接收消息
channel.basicConsume(QueueName2, true, consumer);

/**
* 不需要关闭连接
* 在 RabbitMQ 中,消费者在接收到消息并完成处理后,会自动发送确认(ACK)给 RabbitMQ 服务器,告知服务器已经成功接收并处理了该消息。
* 如果消费者在 basicConsume 执行完成之后关闭连接,由于还没有机会发送 ACK 给服务器,RabbitMQ 会认为该消息并未被消费,重新将其投递到队列中,从而导致消息重复消费。
* 为了避免消息重复消费,在消费者处理完消息后,需要让消费者线程一直保持运行状态,等待下一条消息的到来,直到程序退出或手动停止该消费者
*/

}
}

(4)启动消费者1和消费者2接收消息

  • 消费者1只会收到路由键以a开头、png结尾的消息
  • 消费者2会收到所有的消息

SpringBoot整合RabbitMQ

Spring AMQP简介

(1)Spring AMQP是基于AMQP协议实现的一个轻量级的消息中间件框架,原本是为了简化RabbitMQ的使用而开发的。

(2)Spring AMQP提供了一组抽象API,能够更容易地创建AMQP客户端,处理收发消息的逻辑

(3)Spring AMQP提供了面向对象的方式来发送和接收消息,支持广泛的消息传递模式,如点对点、发布/订阅、路由器和主题等。

(4)Spring AMQP还提供了一些附加功能,如事务、并发处理和消息序列化/反序列化等

(5)Spring AMQP关键部分

关键部分简介
ConnectionFactory连接到消息传递代理的工厂
RabbitTemplate用于发送和接收消息
MessageListenerContainer用于创建和管理消费者
SimpleMessageListenerContainer一个方便的容器,用于为每个队列创建消费者
MessageConverter用于将Java对象转换为可传输的消息格式和将消息转换回Java对象
RabbitAdmin用于管理RabbitMQ中的队列、交换器和绑定等
AmqpTemplate发送和接收消息的抽象接口

案例准备工作

(1)引入AMQP依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependencies>
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 序列化依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>

(2)配置文件

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: localhost # 设置RabbitMQ服务器地址(默认值 localhost)
port: 5672 # 设置RabbitMQ服务器端口号(默认值 5672)
virtual-host: / # 设置虚拟主机名(默认值/)
username: wen # 设置用户名(默认 guest)
password: 123456 # 设置密码(默认 guest)
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

(3)主启动类

1
2
3
4
5
6
@SpringBootApplication
public class MqApplication {
public static void main(String[] args) {
SpringApplication.run(MqApplication.class, args);
}
}

简单模式(Basic Queue)

(1)创建一个配置类,声明一个队列BasicQueue

1
2
3
4
5
6
7
8
9
10
@Configuration
public class BasicConfig {
/**
* 声明队列BasicQueue
*/
@Bean
public Queue basicQueue() {
return new Queue("BasicQueue", true, false, false);
}
}

(2)创建监听器,准备接收BasicQueue队列的消息

1
2
3
4
5
6
7
8
9
@Component
public class BasicQueueListener {

@RabbitListener(queues = "BasicQueue")// 表示消费者监听器,通过queues属性指定监听的队列名称
public void listener(String msg) throws InterruptedException {
System.out.println("简单模式(Basic Queue)消费者接收到消息:【" + msg + "】");
}

}

(3)编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 简单模式测试(BasicQueue)
*/
@RestController
public class BasicQueueController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/BasicQueue")
public String send() {
// 执行发送信息,参数依次为:队列名称、发送的消息
rabbitTemplate.convertAndSend("BasicQueue", "这是发送的一条消息");
return "发送消息成功!";
}
}

工作队列模式(Work Queues)

(1)创建一个配置类,声明一个队列WorkQueues

1
2
3
4
5
6
7
8
9
10
@Configuration
public class WorkConfig {
/**
* 声明队列WorkQueues
*/
@Bean
public Queue workQueues() {
return new Queue("WorkQueues", true, false, false);
}
}

(2)创建两个监听器,准备接收消息

  • 消费者1绑定WorkQueues消息队列
  • 消费者2绑定WorkQueues消息队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class WorkQueuesListener {

@RabbitListener(queues = "WorkQueues")// 表示消费者监听器,通过queues属性指定监听的队列名称
public void listen1(String msg) throws InterruptedException {
System.out.println("工作队列模式(WorkQueues)消费者1接收到消息:【" + msg + "】");
}

@RabbitListener(queues = "WorkQueues")// 表示消费者监听器,通过queues属性指定监听的队列名称
public void listen2(String msg) throws InterruptedException {
System.out.println("工作队列模式(WorkQueues)消费者2接收到消息:【" + msg + "】");
}

}

(3)编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 工作队列模式测试(WorkQueues)
*/
@RestController
public class WorkQueuesController {
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/WorkQueues")
public String send() throws InterruptedException {
// 发送信息,参数一是队列名称,参数二是发送的消息
for (int i = 1; i <= 10; i++) {
String message = "这是第" + i + "条消息";
// 发送信息,参数一是队列名称,参数二是发送的消息
rabbitTemplate.convertAndSend("WorkQueues", message);
}
return "发送消息成功!";
}
}

发布订阅模式(Publish/Subscribe)

广播模式(Fanout)

(1)创建一个配置类,声明 广播模式类型 的交换机FanoutExchange,两个队列FanoutQueue1、FanoutQueue2

  • 将交换机FanoutExchange与队列FanoutQueue1绑定
  • 将交换机FanoutExchange与队列FanoutQueue2绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
public class FanoutConfig {

private static String ExchangeName = "FanoutExchange";
private static String QueueName1 = "FanoutQueue1";
private static String QueueName2 = "FanoutQueue2";

/**
* 声明交换机FanoutExchange
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(ExchangeName, true, false);
}

/**
* 声明队列FanoutQueue1
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(QueueName1, true, false, false);
}


/**
* 声明队列FanoutQueue2
*/
@Bean
public Queue fanoutQueue2() {
return new Queue(QueueName2, true, false, false);
}

/**
* 绑定队列FanoutQueue1和交换机FanoutExchange
* Queue queue1 为声明队列的方法名
* FanoutExchange exchange 为声明交换机的方法名
*/
@Bean
public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}

/**
* 绑定队列FanoutQueue2和交换机FanoutExchange
* Queue queue2 为声明队列的方法名
* FanoutExchange exchange 为声明交换机的方法名
*/
@Bean
public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}

(2)创建两个监听器,准备接收消息

  • 消费者1绑定队列FanoutQueue1
  • 消费者2绑定队列FanoutQueue2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class FanoutListener {

@RabbitListener(queues = "FanoutQueue1")// 表示消费者监听器,通过queues属性指定监听的队列名称
public void listen1(String msg) throws InterruptedException {
System.out.println("广播模式(Fanout)消费者1接收到消息:【" + msg + "】");
}

@RabbitListener(queues = "FanoutQueue2")// 表示消费者监听器,通过queues属性指定监听的队列名称
public void listen2(String msg) throws InterruptedException {
System.out.println("广播模式(Fanout)消费者2接收到消息:【" + msg + "】");
}

}

(3)编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 广播模式测试(Fanout)
*/
@RestController
public class FanoutController {
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/Fanout")
public String SendMessage() {
// 执行发送信息,参数依次为:交换机名、路由、发送的消息
rabbitTemplate.convertAndSend("FanoutExchange", "", "这是发送的一条消息");
return "发送消息成功!";
}
}

路由模式(Routing)

(1)创建一个配置类,声明 直接匹配类型 的交换机DirectExchange,两个队列FanoutQueue1、FanoutQueue2

  • 将交换机RoutingExchange与队列RoutingQueue1绑定并指定路由键为red
  • 将交换机RoutingExchange与队列RoutingQueue2绑定并指定路由键为blue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Configuration
public class RoutingConfig {
private static String ExchangeName = "RoutingExchange";
private static String QueueName1 = "RoutingQueue1";
private static String QueueName2 = "RoutingQueue2";

/**
* 声明交换机RoutingExchange
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(ExchangeName, true, false);
}

/**
* 声明队列RoutingQueue1
*/
@Bean
public Queue routingQueue1() {
return new Queue(QueueName1, true, false, false);
}


/**
* 声明队列RoutingQueue2
*/
@Bean
public Queue routingQueue2() {
return new Queue(QueueName2, true, false, false);
}

/**
* 绑定队列RoutingQueue1和交换机RoutingExchange,使用路由键 red
*
* @param routingQueue1 声明队列的方法名
* @param directExchange 声明交换机的方法名
*/
@Bean
public Binding bindingRoutingQueue1(Queue routingQueue1, DirectExchange directExchange) {
return BindingBuilder
.bind(routingQueue1)
.to(directExchange)
.with("red");
}

/**
* 绑定队列RoutingQueue2和交换机RoutingExchange,使用路由键 blue
*
* @param routingQueue2 声明队列的方法名
* @param directExchange 声明交换机的方法名
*/
@Bean
public Binding bindingRoutingQueue2(Queue routingQueue2, DirectExchange directExchange) {
return BindingBuilder
.bind(routingQueue2)
.to(directExchange)
.with("blue");
}
}

(2)创建两个监听器,准备接收消息

  • 消费者1绑定队列FanoutQueue1
  • 消费者2绑定队列FanoutQueue2
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class RoutingListener {

@RabbitListener(queues ="RoutingQueue1")
public void listen1(String msg) throws InterruptedException {
System.out.println("路由模式(Routing)消费者1接收到消息:【" + msg + "】");
}

@RabbitListener(queues ="RoutingQueue2")
public void listen2(String msg) throws InterruptedException {
System.out.println("路由模式(Routing)消费者2接收到消息:【" + msg + "】");
}
}

(3)编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 路由模式测试(Routing)
*/
@RestController
public class RoutingController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/Routing")
public String SendMessage() {
// 执行发送信息,参数依次为:交换机名、路由、发送的消息
rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色");
rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色");
rabbitTemplate.convertAndSend("RoutingExchange", "red", "红色");
rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色");
rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色");
rabbitTemplate.convertAndSend("RoutingExchange", "blue", "蓝色");
return "发送消息成功!";
}

}

主题模式(Topic)

(1)创建一个配置类,声明 主题匹配类型 的交换机TopicExchange,两个队列TopicQueue1、TopicQueue2

  • 将交换机TopicExchange与队列TopicQueue1绑定并指定路由键为 *.png
  • 将交换机TopicExchange与队列TopicQueue2绑定并指定路由键为 #.#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Configuration
public class TopicConfig {

private static String ExchangeName = "TopicExchange";
private static String QueueName1 = "TopicQueue1";
private static String QueueName2 = "TopicQueue2";

/**
* 声明交换机TopicExchange
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(ExchangeName, true, false);
}

/**
* 声明队列TopicQueue1
*/
@Bean
public Queue topicQueue1() {
return new Queue(QueueName1, true, false, false);
}


/**
* 声明队列TopicQueue2
*/
@Bean
public Queue topicQueue2() {
return new Queue(QueueName2, true, false, false);
}

/**
* 绑定队列TopicQueue1和交换机TopicExchange,使用通配符路由键 *.png
* Queue topicQueue1 为声明队列的方法名
* TopicExchange exchange 为声明交换机的方法名
*/
@Bean
public Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder
.bind(topicQueue1)
.to(topicExchange)
.with("*.png");
}

/**
* 绑定队列TopicQueue2和交换机TopicExchange,使用通配符路由键 #.#
* Queue topicQueue2 为声明队列的方法名
* TopicExchange exchange 为声明交换机的方法名
*/
@Bean
public Binding bindingTopicQueue2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder
.bind(topicQueue2)
.to(topicExchange)
.with("#.#");
}
}

(2)创建两个监听器,准备接收消息

  • 消费者1绑定队列TopicQueue1
  • 消费者2绑定队列TopicQueue2
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class TopicListener {

@RabbitListener(queues ="TopicQueue1")
public void listen1(String msg) throws InterruptedException {
System.out.println("主题模式(Topic)消费者1接收到消息:【" + msg + "】");
}

@RabbitListener(queues ="TopicQueue2")
public void listen2(String msg) throws InterruptedException {
System.out.println("主题模式(Topic)消费者2接收到消息:【" + msg + "】");
}
}

(3)编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 主题模式测试(Topic)
*/
@RestController
public class TopicController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/Topic")
public String SendMessage() {
// 执行发送信息,参数依次为:交换机名、路由、发送的消息
rabbitTemplate.convertAndSend("TopicExchange", "a.jpg", "jpg图片");
rabbitTemplate.convertAndSend("TopicExchange", "b.png", "png图片");
rabbitTemplate.convertAndSend("TopicExchange", "c.mp4", "mp4视频");
return "发送消息成功!";
}

}