• 72827

    文章

  • 665

    评论

  • 17

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

RabbitMQ延迟队列设置

撸了今年阿里、腾讯和美团的面试,我有一个重要发现.......>>

延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

rabbitmq的消息TTL和死信Exchange结合

1.消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。

2.Dead Letter Exchanges

Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

②. 上面的消息的TTL到了,消息过期了。

③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

3.实现延迟队列

我们先设置好各个配置的字符串

public interface TestMq {
    /**
     * 队列名
     */
    String TEST_QUEUE = "test";;
    /**
     * 服务添加routing key
     */
    String ROUTING_KEY_TEST = "post.test";

    /**
     * 死信队列
     */
    String DEAD_QUEUE = "dead";
    String ROURING_KEY_DEAD = "dead.routing.key";
    String MQ_EXCHANGE_DEAD = "dead.exchange";
}

配置信息

/**
 * rabbitmq配置
 *
 */
@Configuration
public class RabbitmqConfig {

   /**
    * 死信队列
    * @return
    */
   @Bean
   public Queue deadQueue() {
      Map<String,Object> arguments = new HashMap<>();
      //此处填入死信交换机
      arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD);
      //此处填入消息队列的路由,而非死信队列自己的路由
      arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST);
      return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments);
   }

   /**
    * 死信交换机
    * @return
    */
   @Bean
   public DirectExchange deadExchange() {
      return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD);
   }

   /**
    * 绑定死信队列到死信交换机
    * @return
    */
   @Bean
   public Binding bindingDeadExchange() {
      return BindingBuilder.bind(deadQueue()).to(deadExchange())
            .with(TestMq.ROURING_KEY_DEAD);
   }

   /**
    * 被消费者侦听的获取消息的队列
    * @return
    */
   @Bean
   public Queue testQueue() {
      return new Queue(TestMq.TEST_QUEUE,true,false,false);
   }

   /**
    * 将消息队列绑定到死信交换机,跟死信队列的路由不同
    * @return
    */
   @Bean
   public Binding bindingTest() {
      return BindingBuilder.bind(testQueue()).to(deadExchange())
            .with(TestMq.ROUTING_KEY_TEST);
   }

}

消息生产者

@Slf4j
@Component
public class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        MessagePostProcessor processor = message -> {
            //给消息设置的过期时间,我们这里为10秒
            message.getMessageProperties().setExpiration(10000 + "");
            return message;
        };
        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

        /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 对消息对象进行二进制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

消费者

@Slf4j
@Component
@RabbitListener(queues = TestMq.TEST_QUEUE)
public class TestConsumer {

    @RabbitHandler
    public void receice(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            Integer orderNo = unSerialize(data);
            log.info(orderNo + "为收到的消息");
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private Integer unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,Integer.class);
        }
        finally {
            input.close();
        }
    }
}

我们随便写个测试

@Service
public class TestService {
    @Autowired
    private TestSender sender;

    @PostConstruct
    public void test() {
        //此处顺序为死信交换机,死信队列路由,消息
        sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1);
    }
}

经测试

2019-10-11 17:26:18.079  INFO 879 --- [           main] c.g.rabbitdelay.config.TestSender        : send content=1
2019-10-11 17:26:18.098  INFO 879 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227  INFO 879 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337  INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender        : send ack success
2019-10-11 17:26:18.446  INFO 879 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751  INFO 879 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959  INFO 879 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962  INFO 879 --- [           main] c.g.rabbitdelay.RabbitdelayApplication   : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342  INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer    : 1为收到的消息

通过日志可以看到,发送消息是18秒,收到消息消费为28秒,中间隔了10秒钟。

 


695856371Web网页设计师②群 | 喜欢本站的朋友可以收藏本站,或者加入我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!

转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤
注册梁钟霖个人博客