1. 事务消息
1.1 概念
自 RocketMQ 4.3 版本起,正式支持分布式事务消息功能。该机制采用 “两阶段提交 + 补偿机制”(2PC + Compensation) 的设计思想,旨在解决生产者本地事务与消息发送的原子性问题,即:本地事务执行成功,则消息必须发送成功;本地事务失败,则消息不得投递。
核心思想:事务消息 ≠ 传统事务
- RocketMQ 的事务消息 不涉及消费者端,也不保证消费侧的事务性;
- 它聚焦于 生产者侧的消息发送与本地事务的一致性;
- 本质是将传统架构中的“本地消息表”方案 托管到消息中间件内部实现,由 RocketMQ 统一管理事务状态。
半事务消息(Half Message)机制
- 生产者首先发送一条 半事务消息(Half Message);
- 该消息被持久化到 RocketMQ 内部的特殊系统 Topic:
RMQ_SYS_TRANS_HALF_TOPIC
; - 此时消息对 普通消费者不可见,不会被消费;
- 发送成功后,生产者开始执行本地事务逻辑。
事务状态二次确认
- 本地事务执行完成后,生产者需显式向 Broker 提交 事务状态:
Commit
:表示事务成功,消息应转存至目标 Topic,对消费者可见;Rollback
:表示事务失败,消息应被丢弃;
- Broker 收到 Commit 后,会将该消息从
RMQ_SYS_TRANS_HALF_TOPIC
复制或移动到目标业务 Topic; - 此时消息才可被消费者正常拉取。
信息回查
- 若因网络抖动、生产者宕机等原因,Broker 长时间未收到二次确认;
- RocketMQ 的 事务状态服务(TransactionCoordinator) 会定期扫描
RMQ_SYS_TRANS_HALF_TOPIC
中处于“待确认”状态的消息; - 对超过设定超时时间(
默认 60s
)的消息,Broker 会主动发起 事务状态回查(Transaction Check); - 回查请求发送给原始生产者,询问其本地事务的实际状态;
- 生产者通过实现
org.apache.rocketmq.client.producer.TransactionListener
的checkLocalTransaction
方法,返回COMMIT
、ROLLBACK
或UNKNOW
; - Broker 根据回查结果决定最终动作。
生产者需实现的接口
java
new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行数据库操作等
if (success) return LocalTransactionState.COMMIT_MESSAGE;
if (fail) return LocalTransactionState.ROLLBACK_MESSAGE;
else return LocalTransactionState.UNKNOW;
}
// 事务状态回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询本地事务日志或数据库状态
return queryTransactionState(msg.getTransactionId());
}
}
1.2 使用限制与回查机制
功能限制
- 事务消息 不支持延迟消息:无法设置 delayTimeLevel,因为延迟投递会干扰事务状态的控制流程。
- 事务消息 不支持批量消息(Batch Message):每条事务消息必须独立发送,以确保每条消息的事务状态可追踪。
回查次数限制
- Broker 通过参数 transactionCheckMax(默认值为 15)控制最大回查次数;
- 若某条半事务消息被回查次数超过该阈值,Broker 将放弃处理并丢弃该消息;
- 默认情况下,系统会打印错误日志(如 discard this transaction check request);
- 用户可通过继承并重写 AbstractTransactionCheckListener 类,自定义超次后的处理逻辑(如记录到监控系统、告警等)。
回查时间控制
- 参数 transactionMsgTimeout 控制首次回查的最短等待时间(默认 60 秒);
- 表示从半事务消息发送成功到 Broker 第一次发起回查之间的最小间隔;
- 用户可以通过设置参数来覆盖服务器的参数,以适应不同业务的事务执行耗时。
java
Message message = new Message();
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds, "60");
- 回查是指数退避式扫描,并非严格按 transactionMsgTimeout 执行,但首次检查不会早于该时间。
生产者 ID 的特殊要求
- 事务消息的 Producer Group(生产者 ID)不能与其他非事务消息的生产者共享;
- 原因:RocketMQ 的事务机制依赖 Broker 能通过 Producer Group 反向定位到具体的生产者实例,用于发起事务状态回查;
- 如果多个生产者共用同一个 Group ID,可能导致回查请求路由错误或失败;
- 因此,建议为事务消息单独分配独立的 Producer Group,避免混用。
事务消息的幂等性与重复性
- 由于网络抖动、回查机制的存在,同一条事务消息可能被多次回查或最终被多次投递;
- 生产者在实现 checkLocalTransaction 方法时,必须保证幂等性;
- 消费者也应做好消息幂等消费的准备,避免重复处理造成数据异常。
1.3 补偿流程
- 发送半消息失败,此时事务也没有提交,是一致性的;
- 发送半消息成功了,本地事务执行失败,会回滚事务,可以投递
UNKNOW
消息等待回查; - 投递
UNKNOW
失败了,一段时间之后,MQ会对半消息进行回查,发现事务并没有提交成功,会投递rollback
消息; - 本地事务执行成功,但是投递二次确认消息给MQ的时候,出现了异常,MQ依旧可以使用回查机制来保证已知悉。
1.4 先执行本地事务,将发送消息嵌入到事务范围,根据消息是否成功来判断提交/回滚事务,这样也能保证原子性?
- 消息已“确认”,但实际在响应传输中丢失,导致数据库回滚;
- 本地事务提交了,但 JVM 在返回前宕机,消息未真正发出;
- 把它放在 @Transactional 内,会拉长数据库事务持有时间
1.5 代码案例
java
public class TransactionProducer {
public static void main(String[] args) throws Exception{
TransactionMQProducer product = new TransactionMQProducer ("TransactionProducer");
product.setNamesrvAddr("192.168.204.130:9876;192.168.204.131:9876;192.168.204.132:9876");
product.setSendMsgTimeout(1000*60);
//设置了本地事务执行逻辑和回查逻辑
TransactionListener listener = new TransactionListenerImpl();
product.setTransactionListener(listener);
product.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD"};
for (int i = 0; i < 4; i++) {
Message msg = new Message("transactionTopic",tags[i%4],"key"+i,("hi,rocketmq"+i).getBytes());
TransactionSendResult sendResult = product.sendMessageInTransaction(msg, null);
System.out.println(sendResult);
}
}
}
java
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("开始事务");
// int a = 1/0;
System.out.println("提交事务");
//提交半事务消息
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("开始回查");
System.out.println("在数据库没有找到落库的数据");
//回滚半事务消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}