侧边栏壁纸
博主头像
梦幻世界博主等级

行动起来,活在当下

  • 累计撰写 23 篇文章
  • 累计创建 2 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

RocketMQ延时消息4.0和5.0对比

梦幻世界
2024-05-31 / 0 评论 / 0 点赞 / 159 阅读 / 5693 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2024-05-31,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

延迟消息与定时消息

首先需要明确延迟消息与定时消息虽然意思不同,但在体现的效果上确实相同的,都是在消息生产到 Broker 之一段时间之后才会被投递(消费者可以消费到)。只不过在使用的 API 上,延迟消息指定延迟的时间,而定时消息指定确切的投递时间。实际上它们可以实现相同的效果。

在 Rocketmq 4.x 中只支持通过设定延迟等级来支持 18 个固定延迟时间。具体的原理可以看 RocketMQ 延迟消息(定时消息)源码解析

4.x 的延迟消息有很大的局限性,它无法支持任意时间的定时,而且最大的定时时间也只有 2 小时,它的性能也达不到普通消息(后来 4.x 的延迟消息性能被优化,详见 RocketMQ 延迟消息(定时消息)4.9.3 版本优化 异步投递支持

许多公司不满足于它的能力,自研了任意时间定时消息,扩展了最大定时时长。

在 Rocketmq 5.x 中终于开源了支持任意时间的定时消息(以下简称定时消息)。它与 4.x 的延迟消息是两套实现机制,互相之间几乎不影响。

RocketMQ5.0 API 任意时间定时消息的使用

在 Rocketmq 5.x 的客户端中,在构造消息时提供了 3 个 API 来指定延迟时间或定时时间。

Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
// 延迟 10s 后投递
message.setDelayTimeSec(10);
// 延迟 10000ms 后投递
message.setDelayTimeMs(10_000L);
// 定时投递,定时时间为当前时间 + 10000ms
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// 发送消息
SendResult result = producer.send(message);

公共平台封装

基础平台封装的API中调用的方法

/**
 * 发送延时消息
 * @param topic 主题
 * @param baseMessage 消息
 * @param delayTime 延时时间(毫秒, 最长不超过24小时)
 */
public void publishDelayTimeMills(String topic, BaseMessage baseMessage, long delayTime) {
    rocketProducer.sendDelayTimeMills(topic, baseMessage, delayTime);
}

进入sendDelayTimeMills()

public SendMsgResult sendDelayTimeMills(String topic, BaseMessage message, long delayTime) {
    Message<BaseMessage> sendMessage = this.putMessage(message);
    SendResult sendResult = this.template.syncSendDelayTimeMills(topic, sendMessage, delayTime);
    log.info("send delay message end({})! messageId:{}, topic:{}, delayTime:{}, message:{}", new Object[]{sendResult.getSendStatus(), sendResult.getMsgId(), topic, delayTime, message});
    return this.convertMsgResult(sendResult);
}

进入 this.template.syncSendDelayTimeMills()

public SendResult syncSendDelayTimeMills(String destination, Message<?> message, long delayTime) {
    return this.syncSend(destination, message, (long)this.producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_MILLISECONDS);
}

注意这个参数 DelayMode.DELAY_MILLISECONDS后面有用到

RocketMQ提供了三个枚举

DELAY_SECONDS,
DELAY_MILLISECONDS,
DELIVER_TIME_MILLISECONDS

进入syncSend()

private SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
    if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            if (delayTime > 0L && Objects.nonNull(mode)) {
                switch (mode) {
                    case DELAY_SECONDS:
                        rocketMsg.setDelayTimeSec(delayTime);
                        break;
                    case DELAY_MILLISECONDS:
                        rocketMsg.setDelayTimeMs(delayTime);
                        break;
                    case DELIVER_TIME_MILLISECONDS:
                        rocketMsg.setDeliverTimeMs(delayTime);
                        break;
                    default:
                        log.warn("delay mode: {} not support", mode);
                }
            }

            SendResult sendResult = this.producer.send(rocketMsg, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            }

            return sendResult;
        } catch (Exception var14) {
            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", new Object[]{destination, message, var14});
            throw new MessagingException(var14.getMessage(), var14);
        }
    } else {
        log.error("syncSend failed. destination:{}, message is null ", destination);
        throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
    }
}

看到switch中最终调用的API,也就是上方最开始的三个

switch (mode) {
    case DELAY_SECONDS:
        rocketMsg.setDelayTimeSec(delayTime);
        break;
    case DELAY_MILLISECONDS:
        rocketMsg.setDelayTimeMs(delayTime);
        break;
    case DELIVER_TIME_MILLISECONDS:
        rocketMsg.setDeliverTimeMs(delayTime);
        break;
    default:
        log.warn("delay mode: {} not support", mode);
}

DELAY_MILLISECONDS为基础平台封装方法的最终调用,也就是延迟xxx毫秒

参考

Rocketmq 5.0 任意时间定时消息(RIP-43) 原理详解 & 源码解析

0

评论区