• 43687

    文章

  • 368

    评论

  • 30

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

七、消费消息与性能权衡(读数笔记与个人实践)

喜欢本站的朋友可以收藏本站,或者加入QQ群:172816590Web网页设计师,我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!




摘要

主要介绍消费消息时的几种方式:

  • 平衡消息消费的可靠性与性能;
  • 死信交换器;
  • 设置自动删除队列、持久化队列、TTL等;

消费性能

添加一个图

使用no-ack模式 

在消费消息时,负责消费的应用程序会发送一个Basic.Consumer请求,与该请求一起发送的还有一个no-ack标志。当这个标志启用时,它会告诉RabbitMQ消费者在接收到消息时不会进行确认,RabbitMQ只管尽快的发送消息。

使用no-ack标志消费消息是让RabbitMQ将消费投递给消费者的最快方式,但这也是最不可靠的方式。

如果使用no-ack,那么当有新的可用消息时,RabbitMQ将会发送该消息给消费者,而不用等待。实际上,如果有可用消息,RabbitMQ会持续向消费者发送它们,直到套接字缓冲区被填满为止。

目前没有找到RabbitTemplate如何开启no-ack的方法,如果有用过的朋友,请留言告诉我,谢谢。

消息确认模式

消息确认模式要求每次消费消息时,向RabbitMQ返回一个Basic.Ack,告知RabbitMQ消息已经成功消费,可以在服务器中删除该消息。

消息确认有三种确认方式:

  • Ack;
  • Reject;
  • Nack;

基于RabbitTemplate,下面这段代码,有对这几种确认方式的实现,在配置文件中开启手动确认模式,acknowledge-mode属性为manual(默认为自动确认):

spring:
  #消息队列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
/**
 * 消费者监听消息队列
 */
@Component
@Slf4j
@RabbitListener(queues = "DIRECT_QUEUE")
public class DirectQueueListener {

    @RabbitHandler
    public void process(String message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        log.info("消费消息成功: {}", message);
        Thread.sleep(1000);
        switch (message) {
            case "nack":
                channel.basicNack(tag, true, false); // 第二个参数控制是否开启批量拒绝,第三个参数表示是否requeue
                break;
            case "nack-requeue":
                channel.basicNack(tag, true, true);
                break;
            case "reject":
                channel.basicReject(tag, false);
                break;
            case "reject-requeue": // 启用了requeue,如果只有一个消费者,容易造成死循环
                channel.basicReject(tag, true);
                break;
            default:
                channel.basicAck(tag, true);
                break;
        }
    }

}

channel.basicAck:当正常消费消息时,调用该方法。

我们看到除了basicAck,还有basicReject和basicNack。这两种,顾名思义,是用来拒绝消费的。

channel.basicReject:从协议层面上,reject是发送一个Basic.Reject响应,告知RabbitMQ无法对这条消息进行处理,当拒绝时,可以指定是否丢弃消息或使用requeue标志重新发送消息。当启用requeue时,RabbitMQ将会把这条消息重新放回到队列中。

不能使用basicReject一次拒绝多个消息。

channel.basicNack:Basic.Nack实现与Basic.Reject相同的行为,但添加了批量拒绝的功能。

设置multiple或requeue如图所示:

服务质量确认模式

AMQP规范要求信道要有服务质量设置,即在确认消息接收之前可以预先接收一定数量的消息。可以设置一个预取数量来实现高效的发送消息。

如果消费者应用程序在确认消息之前崩溃,在套接字关闭时,所有预取的消息将返回到队列。

如果设置了no-ack,那么预取大小将被忽略。

使用RabbitTemplate时,可以在消费者应用程序的配置文件中配置预取大小:

spring:
  #消息队列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1000

其中prefetch就是预取大小,消费者应用程序运行后,可以在RabbitMQ的控制台看到这个设置:

如果熟悉抓包软件的朋友,可以试着抓包看看:

我预先发送了2条消息到RabbitMQ,可以看到上图中最后两行是两个Ack。

有一种方式可以一次确认多个消息,Basic.Ack响应具有一个multiple属性,当把它设置为true时就能确认以前未确认的消息。

如果使用multiple,当成功的接收了一些消息,并且应用程序在回复Ack之前就发生了异常,则所有为确认的消息将返回队列。

死信交换器

RabbitMQ的死信交换器是一种可以拒绝已投递消息的可选行为,一般有三种情况的消息会进入死信队列:

  • 当拒绝了一个不重新发送的消息时,会进入死信;
  • 当消息的TTL到期时,会进入死信;
  • 当队列已满时,会进入死信;
死信与备用交换器不同,过期或被拒绝的消息通过死信交换器进行投递,而备用交换器则路由那些无法由RabbitMQ路由的消息。

在RabbitMQ中,在声明队列时,指定死信交换器:

    /**
     * 声明队列。
     * 同时指定死信队列。
     *
     * @return Queue对象。
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

死信交换器还允许使用预先指定的值覆盖路由键,这样可以允许同一个交换器同时处理死信和非死信消息,但需要确保死信消息不被投递到相同的队列。设置预定义路由键的关键字是:x-dead-letter-routing-key。

测试死信队列,当消费者拒绝时,查看消息是否会进入死信队列:

图。

控制队列

定义队列时,有多个设置可以确定队列的行为:

  • 自动删除自己;
  • 只允许一个消费者进行消费;
  • 自动过期消息;
  • 保持有限数量的消息;
  • 将旧消息推出堆栈;

更改队列的设置,必须删除队列并重新创建它。

临时队列

也可以叫做自动删除的队列。

一旦消费者完成连接和检索消息,在断开连接时队列将被删除。

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").autoDelete()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

队列只会在没有消费者监听的时候自行删除。

只允许单个消费者

在需要确保只有单个消费者能够消费队列中的消息时,在创建队列时设置exclusive属性,启用后在消费者断开连接后,队列也会自动删除。

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").exclusive()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

声明exclusive的队列,只能被声明时所指定的同一个连接和信道所消费,当创建队列的信道关闭时,独占队列也将会删除。在信道关闭之间,可以多次声明和取消exclusive队列的消费者。

自动过期队列

如果一段时间没有使用该队列,就删除它。

创建一个自动过期的队列非常简单,要做的事情就是使用x-expires参数声明一个队列。该参数以毫秒为单位设置队列的生存时间(Time To Live,TTL)。

自动过期队列有一些严格的约定:

  • 队列只有在没有消费者的情况下才会过期。如果有连接消费者,则只有发出Basic.Cancel或断开连接之后才自动删除;
  • 队列只有在TTL周期之内没有收到Basic.Get请求时才会到期。一旦一个Basic.Get请求中已经包含了一个具有过期值的队列,那么过期设置无效,队列不会被自动删除(不要使用Get);
  • 不能重新声明或更改x-expires属性;
  • RabbitMQ不保证过期删除队列这一过程的时效性;

永久队列

使用durable标志告诉RabbitMQ希望队列配置被保存在服务器:

    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

消息级别的TTL

消息级别的TTL设置允许服务器对消息的最大生存时间进行限制。声明队列的同时指定死信交换器和TTL值将导致该队列中已到期的消息成为死信消息。

可以使用x-message-ttl设置队列的消息TTL时间。

最大长度队列

从RabbitMQ3.1.0开始,可以在声明队列时指定最大长度。如果在队列上设置列x-max-length参数,一旦消息到达最大值,RabbitMQ会在添加新消息时删除位于队列前端的消息,如果声明队列时候,指定列死信交换器,则从队列前端删除的消息会进入死信队列。


 转载至链接:https://my.oschina.net/u/2450666/blog/3037305。




您觉喜欢本网站,或者觉得本文章对您有帮助,那么可以选择打赏。
打赏多少,您高兴就行,谢谢您对梁钟霖这小子的支持! ~(@^_^@)~

  • 微信扫一扫

  • 支付宝扫一扫

    支付宝打赏
转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤