Skip to content

1. 事务消息

1.1 概念

自 RocketMQ 4.3 版本起,正式支持分布式事务消息功能。该机制采用 “两阶段提交 + 补偿机制”(2PC + Compensation) 的设计思想,旨在解决生产者本地事务与消息发送的原子性问题,即:本地事务执行成功,则消息必须发送成功;本地事务失败,则消息不得投递。

核心思想:事务消息 ≠ 传统事务

  • RocketMQ 的事务消息 不涉及消费者端,也不保证消费侧的事务性;
  • 它聚焦于 生产者侧的消息发送与本地事务的一致性;
  • 本质是将传统架构中的“本地消息表”方案 托管到消息中间件内部实现,由 RocketMQ 统一管理事务状态。

半事务消息(Half Message)机制

  • 生产者首先发送一条 半事务消息(Half Message);
  • 该消息被持久化到 RocketMQ 内部的特殊系统 Topic:RMQ_SYS_TRANS_HALF_TOPIC
  • 此时消息对 普通消费者不可见,不会被消费;
  • 发送成功后,生产者开始执行本地事务逻辑。

事务状态二次确认

  • 本地事务执行完成后,生产者需显式向 Broker 提交 事务状态:
    • Commit:表示事务成功,消息应转存至目标 Topic,对消费者可见;
    • Rollback:表示事务失败,消息应被丢弃;
  • Broker 收到 Commit 后,会将该消息从 RMQ_SYS_TRANS_HALF_TOPIC 复制或移动到目标业务 Topic;
  • 此时消息才可被消费者正常拉取。

信息回查

  • 若因网络抖动、生产者宕机等原因,Broker 长时间未收到二次确认;
  • RocketMQ 的 事务状态服务(TransactionCoordinator) 会定期扫描 RMQ_SYS_TRANS_HALF_TOPIC 中处于“待确认”状态的消息;
  • 对超过设定超时时间(默认 60s)的消息,Broker 会主动发起 事务状态回查(Transaction Check);
  • 回查请求发送给原始生产者,询问其本地事务的实际状态;
  • 生产者通过实现 org.apache.rocketmq.client.producer.TransactionListener checkLocalTransaction 方法,返回 COMMITROLLBACKUNKNOW
  • Broker 根据回查结果决定最终动作。

生产者需实现的接口

java
new TransactionListener() {
    // 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行数据库操作等
        if (success) return LocalTransactionState.COMMIT_MESSAGE;
        if (fail)    return LocalTransactionState.ROLLBACK_MESSAGE;
        else         return LocalTransactionState.UNKNOW;
    }

    // 事务状态回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务日志或数据库状态
        return queryTransactionState(msg.getTransactionId());
    }
}

1.2 使用限制与回查机制

功能限制

  • 事务消息 不支持延迟消息:无法设置 delayTimeLevel,因为延迟投递会干扰事务状态的控制流程。
  • 事务消息 不支持批量消息(Batch Message):每条事务消息必须独立发送,以确保每条消息的事务状态可追踪。

回查次数限制

  • Broker 通过参数 transactionCheckMax(默认值为 15)控制最大回查次数;
  • 若某条半事务消息被回查次数超过该阈值,Broker 将放弃处理并丢弃该消息;
  • 默认情况下,系统会打印错误日志(如 discard this transaction check request);
  • 用户可通过继承并重写 AbstractTransactionCheckListener 类,自定义超次后的处理逻辑(如记录到监控系统、告警等)。

回查时间控制

  • 参数 transactionMsgTimeout 控制首次回查的最短等待时间(默认 60 秒);
  • 表示从半事务消息发送成功到 Broker 第一次发起回查之间的最小间隔;
  • 用户可以通过设置参数来覆盖服务器的参数,以适应不同业务的事务执行耗时。
java
Message message = new Message();
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds, "60");
  • 回查是指数退避式扫描,并非严格按 transactionMsgTimeout 执行,但首次检查不会早于该时间。

生产者 ID 的特殊要求

  • 事务消息的 Producer Group(生产者 ID)不能与其他非事务消息的生产者共享;
  • 原因:RocketMQ 的事务机制依赖 Broker 能通过 Producer Group 反向定位到具体的生产者实例,用于发起事务状态回查;
  • 如果多个生产者共用同一个 Group ID,可能导致回查请求路由错误或失败;
  • 因此,建议为事务消息单独分配独立的 Producer Group,避免混用。

事务消息的幂等性与重复性

  • 由于网络抖动、回查机制的存在,同一条事务消息可能被多次回查或最终被多次投递;
  • 生产者在实现 checkLocalTransaction 方法时,必须保证幂等性;
  • 消费者也应做好消息幂等消费的准备,避免重复处理造成数据异常。

1.3 补偿流程

  1. 发送半消息失败,此时事务也没有提交,是一致性的;
  2. 发送半消息成功了,本地事务执行失败,会回滚事务,可以投递UNKNOW消息等待回查;
  3. 投递UNKNOW失败了,一段时间之后,MQ会对半消息进行回查,发现事务并没有提交成功,会投递rollback消息;
  4. 本地事务执行成功,但是投递二次确认消息给MQ的时候,出现了异常,MQ依旧可以使用回查机制来保证已知悉。

1.4 先执行本地事务,将发送消息嵌入到事务范围,根据消息是否成功来判断提交/回滚事务,这样也能保证原子性?

  • 消息已“确认”,但实际在响应传输中丢失,导致数据库回滚;
  • 本地事务提交了,但 JVM 在返回前宕机,消息未真正发出;
  • 把它放在 @Transactional 内,会拉长数据库事务持有时间

1.5 代码案例

java
public class TransactionProducer {
    public static void main(String[] args) throws Exception{
        TransactionMQProducer product = new TransactionMQProducer ("TransactionProducer");
        product.setNamesrvAddr("192.168.204.130:9876;192.168.204.131:9876;192.168.204.132:9876");
        product.setSendMsgTimeout(1000*60);
        //设置了本地事务执行逻辑和回查逻辑
        TransactionListener listener = new TransactionListenerImpl();
        product.setTransactionListener(listener);
        product.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD"};
        for (int i = 0; i < 4; i++) {
            Message msg = new Message("transactionTopic",tags[i%4],"key"+i,("hi,rocketmq"+i).getBytes());
            TransactionSendResult sendResult = product.sendMessageInTransaction(msg, null);
            System.out.println(sendResult);
        }
    }
}
java
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("开始事务");
//        int a = 1/0;
        System.out.println("提交事务");
        //提交半事务消息
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("开始回查");
        System.out.println("在数据库没有找到落库的数据");
        //回滚半事务消息
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

Released under the MIT License.