Skip to content

1. 消息拉取问题

1.1 问题

生产者发送消息太快了,导致消息堆积,消费者可以采用拉取消息的模式,这样可以自由控制自己消费消息的多少。下面的代码虽然表面是推模式,但是推模式就是设置一个频率不断的拉取,所以下面的代码可以设置参数。

1.2 maxTransferCountOnMessageInMemory(Broker 参数)

  • 位置:MessageStoreConfig.maxTransferCountOnMessageInMemory
  • 默认值:32
  • 作用:控制从内存中单次传输给消费者的最大消息数。
  • 影响:即使客户端设置 pullBatchSize > 32,实际拉取仍被限制为 32。
  • 结论:若想让 pullBatchSize > 32 生效,必须修改 Broker 配置并重启 Broker。

1.3 pullBatchSize:单次从每个队列拉取的消息数

  • 定义:消费者每次调用 pull 时,从每个 MessageQueue 拉取的最大消息条数。
  • 常见误解:不是“一次拉取的总消息数”,而是“每个队列拉取的数量”。
  • 注意:pullBatchSize 受 Broker 端参数限制。

1.4 pullInterval:拉取间隔(毫秒)

  • 定义:两次 pull 请求之间的等待时间。
  • 默认值:通常为 0(连续拉取)

1.5 consumeMessageBatchMaxSize:批量消费最大消息数

定义:在 MessageListenerConcurrently 或 MessageListenerOrderly 中,一次回调可处理的最大消息条数。

默认值:1(即默认逐条消费)

设置方式:

java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setConsumeMessageBatchMaxSize(32); // 每次最多批量消费32条

consumeMessageBatchMaxSize 的实际生效值受限于 pullBatchSize。

例如:

  • consumeMessageBatchMaxSize = 32
  • pullBatchSize = 12
  • → 实际每次最多消费 12 条消息。

1.6 消费者配置

rocketmq-spring-boot-starter 默认不支持批量消费,原因如下:

  • 其内部封装的 DefaultMQPushConsumer 默认 consumeMessageBatchMaxSize = 1。
  • 不暴露 pullBatchSize 和 pullInterval 的配置项。

需自定义 DefaultMQPushConsumer Bean,手动设置参数:

java
@Bean
public DefaultMQPushConsumer customPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myGroup");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("myTopic", "*");

    // 关键配置
    consumer.setPullBatchSize(32); // 每个队列拉取数
    consumer.setPullInterval(0);   // 拉取间隔(ms)
    consumer.setConsumeMessageBatchMaxSize(32); // 批量消费最大数

    consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
        // 批量处理逻辑
        System.out.println("批量消费 " + msgs.size() + " 条消息");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });

    consumer.start();
    return consumer;
}

Released under the MIT License.