1. 消息的有序性
1.1 生产者方面
使用“业务键”路由到固定队列
java
SendResult sendResult = producer.send(
message,
(mqs, msg, arg) -> {
String bizKey = (String) arg; // 如 orderId
// 使用“稳定哈希”或“取模”,确保路由不变
int queueIndex = Math.abs(bizKey.hashCode()) % mqs.size();
return mqs.get(queueIndex);
},
orderId
);
前提是需要使用像Dledger这种高可用的集群,保证短时间内可以让从节点恢复运作,这样宕机的节点的MessageQueue不会被NameServer剔除,保证消息发送的顺序性。
如果是Dledger这种高可用的集群,是可以开启重试机制的,重生机制相当于等待主从切换成功。
java
// 关闭发送失败自动重试到其他队列
producer.setRetryTimesWhenSendFailed(0); // 失败即抛异常
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
1.2 消费者方面
使用 MessageListenerOrderly 保证串行。
java
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
Message msg = msgs.get(0);
String orderId = msg.getUserProperty("orderId");
try {
// 业务处理(必须串行)
orderService.handleOrderEvent(orderId, msg.getBody());
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 处理失败,延迟重试
context.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});