Skip to content

1. 消息的有序性

1.1 生产者方面

使用“业务键”路由到固定队列

java
SendResult sendResult = producer.send(
    message,
    (mqs, msg, arg) -> {
        String bizKey = (String) arg; // 如 orderId
        // 使用“稳定哈希”或“取模”,确保路由不变
        int queueIndex = Math.abs(bizKey.hashCode()) % mqs.size();
        return mqs.get(queueIndex);
    },
    orderId
);

前提是需要使用像Dledger这种高可用的集群,保证短时间内可以让从节点恢复运作,这样宕机的节点的MessageQueue不会被NameServer剔除,保证消息发送的顺序性。

如果是Dledger这种高可用的集群,是可以开启重试机制的,重生机制相当于等待主从切换成功。

java
// 关闭发送失败自动重试到其他队列
producer.setRetryTimesWhenSendFailed(0); // 失败即抛异常
producer.setRetryAnotherBrokerWhenNotStoreOK(false);

1.2 消费者方面

使用 MessageListenerOrderly 保证串行。

java
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    Message msg = msgs.get(0);
    String orderId = msg.getUserProperty("orderId");

    try {
        // 业务处理(必须串行)
        orderService.handleOrderEvent(orderId, msg.getBody());
        return ConsumeOrderlyStatus.SUCCESS;
    } catch (Exception e) {
        // 处理失败,延迟重试
        context.setSuspendCurrentQueueTimeMillis(1000);
        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }
});

Released under the MIT License.