1. 消息拉取问题
1.1 问题
生产者发送消息太快了,导致消息堆积,消费者可以采用拉取消息的模式,这样可以自由控制自己消费消息的多少。下面的代码虽然表面是推模式,但是推模式就是设置一个频率不断的拉取,所以下面的代码可以设置参数。
1.2 maxTransferCountOnMessageInMemory(Broker 参数)
- 位置:MessageStoreConfig.maxTransferCountOnMessageInMemory
- 默认值:32
- 作用:控制从内存中单次传输给消费者的最大消息数。
- 影响:即使客户端设置 pullBatchSize > 32,实际拉取仍被限制为 32。
- 结论:若想让 pullBatchSize > 32 生效,必须修改 Broker 配置并重启 Broker。
1.3 pullBatchSize:单次从每个队列拉取的消息数
- 定义:消费者每次调用 pull 时,从每个 MessageQueue 拉取的最大消息条数。
- 常见误解:不是“一次拉取的总消息数”,而是“每个队列拉取的数量”。
- 注意:pullBatchSize 受 Broker 端参数限制。
1.4 pullInterval:拉取间隔(毫秒)
- 定义:两次 pull 请求之间的等待时间。
- 默认值:通常为 0(连续拉取)
1.5 consumeMessageBatchMaxSize:批量消费最大消息数
定义:在 MessageListenerConcurrently 或 MessageListenerOrderly 中,一次回调可处理的最大消息条数。
默认值:1(即默认逐条消费)
设置方式:
java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setConsumeMessageBatchMaxSize(32); // 每次最多批量消费32条
consumeMessageBatchMaxSize 的实际生效值受限于 pullBatchSize。
例如:
- consumeMessageBatchMaxSize = 32
- pullBatchSize = 12
- → 实际每次最多消费 12 条消息。
1.6 消费者配置
rocketmq-spring-boot-starter 默认不支持批量消费,原因如下:
- 其内部封装的 DefaultMQPushConsumer 默认 consumeMessageBatchMaxSize = 1。
- 不暴露 pullBatchSize 和 pullInterval 的配置项。
需自定义 DefaultMQPushConsumer Bean,手动设置参数:
java
@Bean
public DefaultMQPushConsumer customPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("myTopic", "*");
// 关键配置
consumer.setPullBatchSize(32); // 每个队列拉取数
consumer.setPullInterval(0); // 拉取间隔(ms)
consumer.setConsumeMessageBatchMaxSize(32); // 批量消费最大数
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
// 批量处理逻辑
System.out.println("批量消费 " + msgs.size() + " 条消息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}