先来看一段Spring官文对于SpringAMQP的介绍 SpringAMQP项目将Spring的核心概念应用于AMQP消息传递的解决方案。它提供了一个“模板”作为用于发送和接收消息的高级抽象。它还通过“侦听器容器”为消息驱动的POJO提供支持。这些库促进了AMQP资源的管理,同时促进了对依赖项注入和声明性配置的使用。在所有这些情况下,您将看到与Spring Framework中的JMS支持相似的地方。 该项目包括两个部分;spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。 可以看出,SpringRabbit可以帮我们简单高效的使用RabbitMQ 看一下需要的依赖 看一下配置文件 官方共有6种消息模型,但是常用的只有5种,下面简单介绍一下五种消息模型 RabbitMQ对消息进行接收、存储、转发。 生产者:一个发送消息的用户应用程序。 消费者:等待并接收应用程序发送的消息。 先来创建一个生产者。 再来创建一个消费者。 说一下几个注解和属性 然后调用生产者的接口,发送数据可以看到消费者很快就消费完了数据。 测试结果: 这种消费模型其实是基于基本消息模型的,只是对RabbitMQ的消息投递做了一个配置,给消费者投递时,一次不要投递过多的数据,以免造成性能浪费。 创建一个生产者 创建两个消费者,一个模拟性能比较差的服务器,一个模拟性能比较好的服务器 测试一下 我们需要加一行配置 然后加完配置之后再次看一下测试效果 可以看到,已经达到我们想要的效果了,FastCustomer消费者消费了9条消息,而SlowCustomer只消费了一条消息。 这个模型字面翻译是“扇出”的意思,其实可以理解为广播,也就是一个生产者发送一条消息,可以同时被多个消费者所收到。 这里就引入了一个新的概念: 交换机可以接收到生产者发送过来的数据,然后可以指派给指定队列。而本例中,生产者将消息发送给交换机,而交换机将消息投递给所有与本交换机绑定的队列中。 先来创建一个生产者 需要注意的是,如果使用Fanout消息模型,是不需要指定RoutingKey的,就算指定了也是不会生效的,具体是什么消息模型,是取决与消费者端交换机是如何定义的。 再来创建两个消费者 这里又有几个新的注解和属性,解释一下: 测试一下,看一下效果。 可以看到,生产者只指定了交换机并发送消息,而绑定了交换机的两个队列都收到了生产者的消息,这就是广播模型。 路由模型是交换机通过前面提到的RoutingKey进行消息投递的,每个队列都有自己专属的RoutingKey,生产者发送消息时,指定交换机和RoutingKey,消息到了交换机之后,交换机通过RoutingKey将消息投递到指定队列。 先创建一个生产者 再来创建两个消费者 可以看到,注解和属性还是前面已经说过的,很简单,但是要注意,这路由模型中key必须写,不然交换机不知该给那个队列投递数据,数据就丢失了。 还有一点,交换机类型默认就是 看一下测试效果。 我们先调用send1接口 可以看到,交换机只将数据投递到了routeQueue1队列。 再来调用send2接口 同样,routeQueue2队列收到了消息,而routeQueue1队列并没有收到消息。 还有最后一种消息模型,和路由模型类似,只不过是RoutingKey有些小变化。 在Topic模型中,RoutingKey不再是固定的字符,而是有了通配符,交换机可以模糊匹配队列。 有两个通配符,第一个是 举例说明 我这里没有用单词,而是用了AAA,只要是用 上代码,先来创建一个生产者。 再来创建两个消费者。 可以看到两个消费者的RoutingKey不一样,我们测试一下看看效果。 先来调用send1接口,可以猜想一下,这条信息会被两个消费者同时收到。 可以看到,确实如我们猜想一样,消息被两个队列都收到了。 再来调用send2接口。 因为send2接口中RoutingKey是三个单词,所以只有 我们用基本消息模型来说一下手动ACK。 在SpringRabbit中,ACK默认是自动的,也就是说消息队列将消息投递到消费者时,SpringRabbit自动帮我们进行了消息确认并通知消息队列,但是这样做有一定的问题。RabbitMQ收到ACK回执之后就会将消息删除,但是如果处理消息出异常呢,这条数据未能成功处理,但是RabbitMQ也将该条消息删除了。 所以我们需要手动调用ACK回执,确保在消息正常处理完成之后再告诉RabbitMQ我确实已经成功处理了这条消息,你可以删除这条消息了。 我们来看看代码 首先声明队列绑定交换机时,要配置 可以看到监听方法多了两个入参, 简单研究了一下SpringAMQP的实现SpringRabbit,发现确实比以往xml模式配置简单了许多,注解配置也清晰了许多。 邮箱:91907@163.com
一、SpringAMQP介绍
二、依赖和配置
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.wxx</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- web启动器,便于测试发送消息 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringAMQP 主角 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
spring: rabbitmq: addresses: 127.0.0.1 #ip地址 username: admin # 账号 password: admin # 密码
三、消息模型
1. 基本消息模型
@RestController public class Producer { /** * SpringAMQP帮我们封装好操作RabbitMQ的对象模板 */ @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public void send() { for (int i = 0; i < 10; i++) { // 给指定routingKey发送消息 // arg0: routingKey // arg1: 消息数据 rabbitTemplate.convertAndSend("testQueue", "你好啊" + i); } } }
@Component public class Customer { int i = 0; /** * queuesToDeclare:支持多个队列,将队列绑定到默认交换机上,routeKey为队列名称。 * @param msg 接收到的消息 */ @RabbitListener(queuesToDeclare = @Queue(value = "testQueue")) public void listener(String msg) { System.out.println(msg); } }
@RabbitListener
:用于类上和方法上,用于类上时可以配合@RabbitHandler使用,本文不阐述;主要说说用于方法上,可以用于声明队列,用于绑定交换机和队列。queuesToDeclare
:将队列绑定到默认交换机上,routeKey为队列名称。@Queue
:队列注解,value为队列名称2.Work消息模型(能者多劳)
@RestController public class WorkProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/work/send") public void send() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("workQueue", "工作模式队列" + i); } } }
/** * @author 她爱微笑 * @date 2020/4/11 * 慢消费者,模拟性能比较差的服务器 */ @Component public class SlowCustomer { @RabbitListener(queuesToDeclare = @Queue(value = "workQueue")) public void listener(String msg) { try { // 模拟执行每次任务需要1秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("SlowCustomer:" + msg); } }
/** * @author 她爱微笑 * @date 2020/4/11 * 快消费者,模拟性能比较好的服务器 */ @Component public class FastCustomer { @RabbitListener(queuesToDeclare = @Queue(value = "workQueue")) public void listener(String msg) { System.out.println("FastCustomer:" + msg); } }
可以看到效果,两个消费者都分别获取到了5条消息,但是问题来了,FastCustomer消费者很快的消费完了自己的5条消息,然后就闲置了。但是FastCustomer执行很慢,执行5条消息,就需要大于5秒的时间,这样就造成了性能浪费。我们应该让RabbitMQ智能一些,给每个消费者每次只给一个消息,当确认消息完成之后再下发另一个消息,这样就可以能者多劳
了。spring: rabbitmq: listener: simple: prefetch: 1 # 每个消费者每次可以消费一个
3.Fanout消息模型(广播模型)
交换机(Exchange)
:如果把队列比作邮局,而交换机可以看做是集散中心,是负责把消息送到相应邮局的机构。@RestController public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/fanout/send") public void send() { // 广播模式,不需要指定队列 和 routingKey, // 直接指定交换机,交换机 会将消息发送到所有和该交换机绑定的队列中 // 就算指定Routingkey,在广播模式中也是不生效的,交换机还是会把消息推送到所有与之绑定的队列中 rabbitTemplate.convertAndSend("fanoutExchange", "", "广播模式"); } }
/** * @author 她爱微笑 * @date 2020/4/11 * 广播模式消费者1 */ @Component public class FanoutCustomer1 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "fanoutCustomer1"), exchange = @Exchange( value = "fanoutExchange", type = ExchangeTypes.FANOUT ) // key = "fanout1" 在广播模式中,写key也是不生效的,为了避免歧义还是不要写为好 ) ) public void listener(String msg) { System.out.println("FanoutCustomer1:" + msg); } }
/** * @author 她爱微笑 * @date 2020/4/11 * 广播模式消费者2 */ @Component public class FanoutCustomer2 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "fanoutCustomer2"), exchange = @Exchange( value = "fanoutExchange", type = ExchangeTypes.FANOUT ) ) ) public void listener(String msg) { System.out.println("FanoutCustomer2:" + msg); } }
bindings
:用于声明交换机和队列的绑定,可以接收@QueueBinding
类型数组。@QueueBinding
:声明交换机和队列绑定。属性有value,用于声明队列;还有exchange,用于声明与之绑定的交换机。@Queue
:前面说过了,用于声明队列并监听队列。@Exchange
:声明交换机。属性有value,为交换机名称;type为交换机类型,也就是消息模型。key
:是用于声明队列RoutingKey的,相当于队列的别名吧,交换机可以通过RoutingKey找到队列并投递消息,因为Fanout模型不需要,所以这里只是提一下。4.Direct消息模型(路由模型)
/** * @author 她爱微笑 * @date 2020/4/11 * 路由模式 */ @RestController public class RouteProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/route/send1") public void send1() { // arg0: 交换机名称 // arg1: routingKey // arg2: 需要发送的数据 Obejct类型 rabbitTemplate.convertAndSend("routeExchange", "route1", "路由模式消息1"); } @GetMapping("/route/send2") public void send2() { rabbitTemplate.convertAndSend("routeExchange", "route2", "路由模式消息2"); } }
@Component public class RouteCustomer1 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "routeQueue1"), exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT), key = "route1" // 在路由模式中,必须写key,不写key,默认为空字符串 ) ) public void listener(String msg) { System.out.println("RouteCustomer1:" + msg); } }
@Component public class RouteCustomer2 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "routeQueue2"), exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT), key = "route2" ) ) public void listener(String msg) { System.out.println("RouteCustomer2:" + msg); } }
DIRECT
类型,所以这里的type = ExchangeTypes.DIRECT
可以省略不写。5.Topic消息模型(通配符模型)
Routingkey
一般都是由一个或多个单词组成,多个单词之间以”.”分割。*
号,第二个是 #
号`*`:匹配一个单词,就只有一个单词 `#`:匹配一个或多个词
.
分隔都认为是单词topic.*:可以匹配topic.AAA,topic.BBB topic.#:可以匹配topic.AAA,topic.AAA.BBB
@RestController public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/topic/send1") public void send1() { rabbitTemplate.convertAndSend("topicExchange", "topic.AAA", "通配符模式消息1"); } @GetMapping("/topic/send2") public void send2() { rabbitTemplate.convertAndSend("topicExchange", "topic.BBB.CCC", "通配符模式消息2"); } }
@Component public class TopicCustomer1 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "topicQueue1"), exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC), key = "topic.*" ) ) public void listener(String msg) { System.out.println("TopicCustomer1:" + msg); } }
@Component public class TopicCustomer2 { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "topicQueue2"), exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC), key = "topic.#" ) ) public void listener(String msg) { System.out.println("TopicCustomer2:" + msg); } }
topicQueue2
队列收到了消息。四、手动ACK
ACK
:将消息处理结果通知消息队列的叫法。/** * ackMode:MANUAL 手动确认 */ @RabbitListener(queuesToDeclare = @Queue(value = "testQueue"), ackMode = "MANUAL") public void listener(String msg, Message message, Channel channel) throws IOException { // 该条消息的消息编号,Long类型,递增的 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 模拟处理消息 System.out.println(msg); // 处理成功 手动ACK回执 // arg0:消息编号 递增的 // arg1:true: 将一次性ACK回执成功所有小于消息编号(deliveryTag)的消息 // false:仅ACK回执成功传入的消息编号(deliveryTag) channel.basicAck(deliveryTag, false); } catch (Exception e) { // 当消息处理异常时,将消息重新放回队列,重新排队 // arg0:消息编号 递增的 // arg1:true: 将一次性拒绝所有小于消息编号(deliveryTag)的消息 // false:仅拒绝传入的消息编号(deliveryTag) // arg2:true: 让消息重新回到队列 // false:直接丢弃消息 channel.basicNack(deliveryTag, false, false); } }
ackMode = "MANUAL"
,配置为手动ACK。Message
和Channel
。Message是消息本体,包含字节类型的数据和消息号;Channel是管道对象,用于手动ACK确认或者拒绝消息。五、总结
联系我
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算