延迟消息与定时消息
首先需要明确延迟消息与定时消息虽然意思不同,但在体现的效果上确实相同的,都是在消息生产到 Broker 之一段时间之后才会被投递(消费者可以消费到)。只不过在使用的 API 上,延迟消息指定延迟的时间,而定时消息指定确切的投递时间。实际上它们可以实现相同的效果。
在 Rocketmq 4.x 中只支持通过设定延迟等级来支持 18 个固定延迟时间。具体的原理可以看 RocketMQ 延迟消息(定时消息)源码解析。
4.x 的延迟消息有很大的局限性,它无法支持任意时间的定时,而且最大的定时时间也只有 2 小时,它的性能也达不到普通消息(后来 4.x 的延迟消息性能被优化,详见 RocketMQ 延迟消息(定时消息)4.9.3 版本优化 异步投递支持。
许多公司不满足于它的能力,自研了任意时间定时消息,扩展了最大定时时长。
在 Rocketmq 5.x 中终于开源了支持任意时间的定时消息(以下简称定时消息)。它与 4.x 的延迟消息是两套实现机制,互相之间几乎不影响。
RocketMQ5.0 API 任意时间定时消息的使用
在 Rocketmq 5.x 的客户端中,在构造消息时提供了 3 个 API 来指定延迟时间或定时时间。
Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));
// 延迟 10s 后投递
message.setDelayTimeSec(10);
// 延迟 10000ms 后投递
message.setDelayTimeMs(10_000L);
// 定时投递,定时时间为当前时间 + 10000ms
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// 发送消息
SendResult result = producer.send(message);
公共平台封装
基础平台封装的API中调用的方法
/**
* 发送延时消息
* @param topic 主题
* @param baseMessage 消息
* @param delayTime 延时时间(毫秒, 最长不超过24小时)
*/
public void publishDelayTimeMills(String topic, BaseMessage baseMessage, long delayTime) {
rocketProducer.sendDelayTimeMills(topic, baseMessage, delayTime);
}
进入sendDelayTimeMills()
public SendMsgResult sendDelayTimeMills(String topic, BaseMessage message, long delayTime) {
Message<BaseMessage> sendMessage = this.putMessage(message);
SendResult sendResult = this.template.syncSendDelayTimeMills(topic, sendMessage, delayTime);
log.info("send delay message end({})! messageId:{}, topic:{}, delayTime:{}, message:{}", new Object[]{sendResult.getSendStatus(), sendResult.getMsgId(), topic, delayTime, message});
return this.convertMsgResult(sendResult);
}
进入 this.template.syncSendDelayTimeMills()
public SendResult syncSendDelayTimeMills(String destination, Message<?> message, long delayTime) {
return this.syncSend(destination, message, (long)this.producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_MILLISECONDS);
}
注意这个参数 DelayMode.DELAY_MILLISECONDS后面有用到
RocketMQ提供了三个枚举
DELAY_SECONDS,
DELAY_MILLISECONDS,
DELIVER_TIME_MILLISECONDS
进入syncSend()
private SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
if (delayTime > 0L && Objects.nonNull(mode)) {
switch (mode) {
case DELAY_SECONDS:
rocketMsg.setDelayTimeSec(delayTime);
break;
case DELAY_MILLISECONDS:
rocketMsg.setDelayTimeMs(delayTime);
break;
case DELIVER_TIME_MILLISECONDS:
rocketMsg.setDeliverTimeMs(delayTime);
break;
default:
log.warn("delay mode: {} not support", mode);
}
}
SendResult sendResult = this.producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception var14) {
log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", new Object[]{destination, message, var14});
throw new MessagingException(var14.getMessage(), var14);
}
} else {
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
}
看到switch中最终调用的API,也就是上方最开始的三个
switch (mode) {
case DELAY_SECONDS:
rocketMsg.setDelayTimeSec(delayTime);
break;
case DELAY_MILLISECONDS:
rocketMsg.setDelayTimeMs(delayTime);
break;
case DELIVER_TIME_MILLISECONDS:
rocketMsg.setDeliverTimeMs(delayTime);
break;
default:
log.warn("delay mode: {} not support", mode);
}
DELAY_MILLISECONDS为基础平台封装方法的最终调用,也就是延迟xxx毫秒
评论区