基本介绍
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
RabbitMQ是一个 Erlang 开发的AMQP(Advanced Message Queuing Protocol )的开源实现。
RabbitMQ安装和基本配置
1、安装RabbitMQ服务
1
| docker pull rabbitmq:3-management //注意docker pull rabbitmq如果安装这个是后面不能访问管理网页的
|
2、查看安装的RabbitMQ的镜像id
3、启动RabbitMQ
1
| docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 镜像id
|
4、访问RabbitMQ管理平台
1 2
| 通过:[服务器ip:15672访问] 默认账号密码为:guest
|

5、添加队列queues

目前只需要用到blue,blue.emps,blue.news,coderblue.news
6、添加Exchanges交换机,并绑定队列

注意:
Exchange一共有四种类型:direct、topic、headers 和fanout。
效率:fanout > direct > topic
Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。(全路径匹配)
Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。(与关联的对比匹配,可模糊匹配)。匹配规则如下:
- *:匹配一个单词
- #:匹配0个或多个字符
- *,#:只能写在.号左右,且不能挨着字符
- 单词和单词之间需要用.隔开
Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key。(广播到所有)
Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。
感谢:CSDN博主-做个好人好吗
RabbitMQ基本使用
下面以Springboot工程(版本2.3.3.RELEASE)为例,操作rabbitmq
1、配置properties
1 2 3 4
| spring.rabbitmq.host=服务器ip spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port=5672
|
2、添加MQConfig配置文件,更改默认的序列化规则
1 2 3 4 5 6 7 8
| @Configuration public class MQConfig {
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
|
3、创建UserInfo实体类
1 2 3 4 5 6 7 8 9 10
|
@Data public class UserInfo {
private String userName;
private Integer age; }
|
4、使用Junit5测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| @SpringBootTest class RabbitmqApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
@Autowired AmqpAdmin amqpAdmin;
@Test void contextLoads() {
HashMap<String, Object> map = new HashMap<>(); map.put("msg", "这是converAndSend发送的消息"); map.put("data", Arrays.asList(2, "helloWorld", false)); rabbitTemplate.convertAndSend("exchange.direct", "blue.news", map); }
@Test void contextLoadsObject() { rabbitTemplate.convertAndSend("exchange.direct", "blue.users", new UserInfo("Kity", 20)); }
@Test void sendMsg() { rabbitTemplate.convertAndSend("exchanges.fanout", "", new UserInfo("fanout", 20)); }
@Test public void createExchange() { amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("创建完成"); amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true)); amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "创建绑定规则", null)); }
@Test public void deleteQueue() { amqpAdmin.deleteQueue("amqpadmin.queue"); amqpAdmin.deleteExchange("blue.users"); }
@Test public void receive() { Object o = rabbitTemplate.receiveAndConvert("blue.users"); System.out.println(o.getClass()); System.err.println(o); } }
|
5、监听接收对应匹配队列的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@Service public class UserInfoService {
@RabbitListener(queues = "blue.users",containerFactory = "rabbitListenerContainerFactory") public void receive(UserInfo userInfo) { System.out.println("接收到的userInfo:" + userInfo); }
@RabbitListener(queues = "blue") public void receiveMsg(Message message) { System.out.println(Arrays.toString(message.getBody())); System.out.println(message.getMessageProperties()); }
}
|
常用工作模式
Spring Boot整合引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>版本号</version> </dependency>
|
第一种模型(直连)

在上图的模型中,有以下概念
1、定义消费者
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", autoDelete = "true")) public class HelloCustomer {
@RabbitHandler public void receive(String message) { System.out.println("message=" + message); } }
|
其中参数解释:
- queuesToDeclare:如果队列不存在就会先创建
- value:队列名
- durable:是否持久化,false为不支持
- autoDelete:是否自动删除,true为是
2、定义生产者
1 2 3 4 5 6 7
| @Resource private RabbitTemplate rabbitTemplate; public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); }
|
第二种模型(work queue)

1、定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue(value = "work")) public void receive1(String message) { System.out.println("work message1 = " + message); }
@RabbitListener(queuesToDeclare = @Queue(value = "work")) public void receive2(String message) { System.out.println("work message2 = " + message); } }
|
2、定义生产者
1 2 3 4 5
| public void testWork() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } }
|
注意:ack为false,手动确认。防止宕机后,没有被消费的消息丢失,而且实现能者多劳。
第三种模型(fanout)

1、定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
@Component public class FanoutCustomer {
@RabbitListener(bindings = { @QueueBinding(value = @Queue, //创建的临时队列 exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机 ) }) public void receive1(String message) { System.out.println("message1=fanout - " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue, //创建的临时队列 exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机 ) }) public void receive2(String message) { System.out.println("message2=fanout - " + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue("fanout_queue"), exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机 ) }) public void receive3(String message) { System.out.println("message3=fanout - " + message); } }
|
2、定义生产者
1 2 3
| public void testFanout() { rabbitTemplate.convertAndSend("fanout_mine", "", "Fanout的模型发送的消息"); }
|
如下图所示,会生成一个指定队列和两个临时队列

第四种模型(Routing)
Routing之订阅模型-Direct直连

1、定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Component public class RouteCustomer {
@RabbitListener(bindings = { @QueueBinding(value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定交换机名称和类型 key = {"info", "error", "warn"}) }) public void receive1(String message) { System.out.println("message1=" + message); }
@RabbitListener(bindings = { @QueueBinding(value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定交换机名称和类型 key = {"error"}) }) public void receive2(String message) { System.out.println("message2=" + message); } }
|
2、定义生产者
1 2 3 4
| public void testRoute() { rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息"); }
|
Routing之订阅模型-Topic

注意:
*(星号)可以代替一个单词。
#(哈希)可以替代零个或多个单词。
1、定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class TopicCustomer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "topics"), key = {"user.save", "user.*"} ) }) public void receive1(String message) { System.out.println("message1=" + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", name = "topics"), key = {"order.#", "produce.#", "user.*"} ) }) public void receive2(String message) { System.out.println("message2=" + message); } }
|
2、定义生产者
1 2 3
| public void testTopic() { rabbitTemplate.convertAndSend("topics", "produce.use.hello", "user.save 路由消息"); }
|
基于配置绑定交换机
1 2 3 4 5 6
| sudo groupadd docker sudo usermod -aG docker $(whoami) 先退出一下再重新登录来保证有了正确的权限 sudo service docker start
vi /etc/sysconfig/docker
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| @Configuration public class RabbitMQConfig { public static final String MIAOSHA_QUEUE = "hello"; public static final String QUEUE = "queue";
public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String HEADER_QUEUE = "header.queue";
public static final String TOPIC_EXCHANGE = "topicExchage"; public static final String FANOUT_EXCHANGE = "fanoutExchage";
public static final String HEADERS_EXCHANGE = "headersExchage";
@Bean public Queue queue() { return new Queue(QUEUE, true); }
@Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public TopicExchange topicExchage(){ return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#"); }
@Bean public FanoutExchange fanoutExchage(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding FanoutBinding1() { return BindingBuilder.bind(topicQueue1()).to(fanoutExchage()); } @Bean public Binding FanoutBinding2() { return BindingBuilder.bind(topicQueue2()).to(fanoutExchage()); }
@Bean public HeadersExchange headersExchage(){ return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<String, Object>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match(); }
}
|
死信队列
消息成为死信的三种情况
- 队列长度达到限制
- 消费者拒绝消费消息,basicNack/basicReject, 并且不把消息重回队列
- 源队列存在消息过期限制,消息超时未消费
config配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| @Configuration public class RabbitMQConfig {
public static final String NORMAL_EXCHANGE_BOOT = "normal_exchange_boot";
public static final String DLX_EXCHANGE_BOOT = "dlx_exchange-boot";
public static final String NORMAL_QUEUE_BOOT = "normal_queue_boot";
public static final String DLX_QUEUE_BOOT = "dlx_queue_boot";
@Bean("normalQueue") public Queue queueDeclare() { Map<String, Object> args = new HashMap<>(2); args.put("x-dead-letter-exchange", DLX_EXCHANGE_BOOT); args.put("x-dead-letter-routing-key", DLX_QUEUE_BOOT); args.put("x-message-ttl", 9000); args.put("x-max-length", 99); return QueueBuilder.durable(NORMAL_QUEUE_BOOT).withArguments(args).build(); }
@Bean("exchangeNormal") public Exchange exchangeNormal() { return ExchangeBuilder.directExchange(NORMAL_EXCHANGE_BOOT).durable(true).build(); }
@Bean public Binding queueBindExchange(@Qualifier("normalQueue") Queue queue, @Qualifier("exchangeNormal") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(NORMAL_QUEUE_BOOT).noargs(); }
@Bean("queueDlx") public Queue queueDlx() { return QueueBuilder.durable(DLX_QUEUE_BOOT).build(); }
@Bean("exchangeDlx") public Exchange exchangeDlx() { return ExchangeBuilder.directExchange(DLX_EXCHANGE_BOOT).durable(true).build(); }
@Bean public Binding dlxQueueBindDlxExchange(@Qualifier("queueDlx") Queue queue, @Qualifier("exchangeDlx") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DLX_QUEUE_BOOT).noargs(); }
}
|
启动后我们会发现mq中新增如上的交换机和队列:

使用junit发送消息测试:
1 2 3 4 5 6 7
| @Test void putMessageToNormalQueue() { HashMap<String, Object> map = new HashMap<>(); map.put("msg", "这是putMessageToNormalQueue发送的消息"); map.put("data", Arrays.asList((Math.random() * 100) + 1, "helloWorld", false)); rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_BOOT, NORMAL_QUEUE_BOOT, map); }
|
使用junit接收死信队列消息:
1 2 3 4
| @Test void getMessageDeadQueue() { Map map = (HashMap)rabbitTemplate.receiveAndConvert(DLX_QUEUE_BOOT); }
|
TTL配置
1 2 3 4 5 6 7 8
| MessagePostProcessor messagePostProcessor=message -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setContentEncoding("utf-8"); messageProperties.setExpiration("5000"); return message; };
|
模拟生产者发送消息
1 2 3 4 5 6
| @Test public void test() { for (int i = 0; i < 10000; i++) { rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_BOOT, NORMAL_QUEUE_BOOT, i + ""); } }
|
消费者监听消息
1 2 3 4 5 6 7 8 9 10 11
| @Component @RabbitListener(queues = "dlx_queue_boot") public class DeadQueueListen {
@RabbitHandler public void getMessage(String msg, Channel channel, Message message) throws IOException { System.out.println("接受到的消息:"+msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); }
}
|
消费流程

延迟队列
延迟队列故名思议:即消息进去队列后并不是立即被消费,而是到达某一时间后,才会被消费,其实RabbitMQ中并没有延迟队列,其本质是借助ttl和死信队列结合构成与延迟队列一样的效果

参考链接 - RabbitMQ高级特性之死信队列和延迟队列
完整代码仓库地址