RocketMQ生产者与消费者完整封装技术文档

概述

本文档提供了基于Spring Boot的RocketMQ完整解决方案,包括消息生产者和消费者的封装实现。支持同步发送、异步发送、延迟消息、定时消息、有序消息以及并发消费、顺序消费等多种模式。

依赖配置

在项目的 pom.xml中添加RocketMQ Spring Boot Starter依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.3</version>
</dependency>

配置文件

application.yml配置

application.yml中添加RocketMQ配置:

rocketmq:
  # NameServer地址,多个用分号分隔
  name-server: localhost:9876
  
  producer:
    # 生产者组名,同一个组内的生产者发送相同Topic的消息
    group: prodProduceGroup
  
    # 是否启用消息轨迹,用于消息的发送和消费轨迹追踪
    # 开启后可在控制台查看消息的完整链路,但会影响性能
    enable-msg-trace: false
  
    # 访问密钥,用于RocketMQ的权限验证
    access-key: '00phgp34'
  
    # 秘密密钥,与access-key配对使用
    secret-key: 'errtteeet3!'
  
    # 发送消息超时时间(毫秒),默认3000ms
    send-message-timeout: 10000
  
    # 发送失败重试次数,默认2次
    retry-times-when-send-failed: 3
  
    # 消息最大大小(字节),默认4MB
    max-message-size: 4194304
  
  consumer:
    # 是否启用消息轨迹
    enable-msg-trace: false
  
    # 消费者访问密钥
    access-key: '00phgp34'
  
    # 消费者秘密密钥  
    secret-key: 'errtteeet3!'
  
    # 拉取消息超时时间(毫秒)
    pull-timeout: 10000

broker.conf配置

RocketMQ服务端参数大全

RocketMQ Broker服务端配置文件:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# Broker对外服务的IP地址,客户端连接的IP
brokerIP1=公网IP

# NameServer的地址,如果有多个的话,使用分号分隔开
namesrvAddr=公网IP:30001

# Broker端口号,默认10911
listenPort=30002

# store的存储路径
storePathRootDir=/home/rocketmq/data/store

# commitLog的存储路径 
storePathCommitLog=/home/rocketmq/data/store/commitlog

# 消费队列的存储路径
storePathConsumeQueue=/home/rocketmq/data/store/consumequeue

# 消息索引的存储路径
storePathIndex=/home/rocketmq/data/store/index

# checkpoint文件的存储路径
storeCheckpoint=/home/rocketmq/data/store/checkpoint

# abort文件的存储路径
abortFile=/home/rocketmq/data/store/abort

# 集群名称
brokerClusterName = DefaultCluster

# Broker名称
brokerName = broker-a

# BrokerID,0表示Master,大于0表示Slave
brokerId = 0

# 该时间清理过期数据
deleteWhen = 04

# 文件超过N小时没改变算是过期(小时)
fileReservedTime = 72

# Broker角色:ASYNC_MASTER异步复制Master、SYNC_MASTER同步双写Master、SLAVE
brokerRole = ASYNC_MASTER

# 延迟消息时间级别(delayLevel 1对应1s, 2对应5s,依此类推)
# 默认18个级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

# 发送队列等待时间
waitTimeMillsInSendQueue=999999

# 刷盘方式:SYNC_FLUSH同步刷盘、ASYNC_FLUSH异步刷盘
flushDiskType = SYNC_FLUSH

# 存储消息时使用可重入锁
useReentrantLockWhenPutMessage=true

# 消息最大大小,默认4MB,这里设置为10MB
maxMessageSize=4194304

# 消息属性最大大小,默认32KB,这里设置为64KB
maxPropertySize=65536

# 启用访问控制列表(ACL)权限验证
aclEnable=true

# ACL配置文件路径
aclConfig=/home/rocketmq/rocketmq-all-5.1.2-bin-release/conf/plain_acl.yml

plain_acl.yml配置

RocketMQ访问控制列表(ACL)权限配置文件:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# 全局白名单IP地址,这些IP可以无需认证访问RocketMQ
globalWhiteRemoteAddresses:
  - 127.0.0.1                    # 本地回环地址
  - 公网IP                       # RocketMQ服务器公网IP
  - 192.168.1.100               # RocketMQ服务器内网IP

# 用户账户配置
accounts:
  - accessKey: 00phgp34          # 访问密钥,与客户端配置保持一致
    secretKey: errtteeet3!       # 秘密密钥,与客户端配置保持一致
    whiteRemoteAddress: 192.168.1.*  # 该用户允许访问的IP范围
    admin: true                  # 是否为管理员账户

部署脚本

启动脚本 start.sh

#!/bin/bash
logsDir=/home/rocketmq/logs
installDir=/home/rocketmq/rocketmq-all-5.1.2-bin-release
mkdir -p $logsDir

# 设置 JVM 启动参数,仅输出 ERROR 日志
export JAVA_OPT="$JAVA_OPT -Drocketmq.log.level=ERROR"

# 启动 NameServer
nohup sh $installDir/bin/mqnamesrv -c $installDir/conf/namesrv.properties > $logsDir/mqnamesrv.out 2>&1 &

# 启动 Broker,指定 NameServer 地址
nohup sh $installDir/bin/mqbroker -n 公网IP:9876 -c $installDir/conf/broker.conf > $logsDir/broker.out 2>&1 &

# 启动管理控制台(可选)
#nohup java -jar /home/rocketmq/console/rocketmq-dashboard-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=公网IP:9876 > $logsDir/dashboard.out 2>&1 &

停止脚本 stop.sh

#!/bin/bash
cd /home/rocketmq/rocketmq-all-5.1.2-bin-release/bin

# 停止 Broker
sh mqshutdown broker

# 停止 NameServer  
sh mqshutdown namesrv

# 停止管理控制台
jarname='rocketmq-dashboard-1.0.0.jar'
pid=`ps aux | grep $jarname | grep -v grep | awk '{print $2}'`
echo $pid
kill -9 $pid

脚本说明

启动脚本功能

  • 日志目录创建:自动创建日志存储目录
  • JVM参数设置:配置日志级别为ERROR,减少日志输出
  • NameServer启动:使用默认9876端口启动注册中心
  • Broker启动:指定NameServer地址启动消息服务
  • 控制台启动:可选的Web管理界面(注释状态)

停止脚本功能

  • 服务停止:按顺序停止Broker和NameServer
  • 进程清理:强制杀死管理控制台进程
  • 安全关闭:确保所有RocketMQ相关进程完全退出

使用方法

# 赋予执行权限
chmod +x start.sh stop.sh

# 启动RocketMQ
./start.sh

# 停止RocketMQ  
./stop.sh

注意事项

  • 确保脚本中的路径与实际安装路径一致
  • NameServer端口9876与broker.conf中的配置保持一致
  • 日志文件会输出到指定的logs目录便于排查问题

消息生产者封装

接口定义

IMQProducer 接口

import org.apache.rocketmq.client.producer.SendResult;

/**
 * 消息队列生产者
 */
public interface IMQProducer<T> {
  
    /**
     * 异步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
     */
    void asyncSend(String topic, T msg, int delayLevel);

    /**
     * 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
     */
    SendResult syncSend(String topic, T msg, int delayLevel);

    /**
     * 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
     */
    SendResult syncSend(String topic, T msg);

    /**
     * 定时投递消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
     */
    SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime);

    /**
     * 有序发送消息
     */
    SendResult syncSendOrderly(String topic, T msg, String hashKey);
}

生产者实现

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import jakarta.annotation.Resource;

@Service
@Slf4j
public class RocketMQProducer<T> implements IMQProducer<T> {
  
    private final String LOG_SIGN = "生产者消息";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void asyncSend(String topic, T msg, int delayLevel) {
        try {
            Message<T> message = MessageBuilder.withPayload(msg).build();
            rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("【{}】【{}】发送成功, 状态码【{}】", LOG_SIGN, message, sendResult.getSendStatus());
                }

                @Override
                public void onException(Throwable throwable) {
                    log.warn("【{}】【{}】发送异常", LOG_SIGN, message, throwable);
                }
            }, 10000, delayLevel);
        } catch (Exception e) {
            log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
        }
    }

    @Override
    public SendResult syncSend(String topic, T msg, int delayLevel) {
        try {
            Message<T> message = MessageBuilder.withPayload(msg).build();
            SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000, delayLevel);
            log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", 
                    LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
        }
        return null;
    }

    @Override
    public SendResult syncSend(String topic, T msg) {
        try {
            Message<T> message = MessageBuilder.withPayload(msg).build();
            SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000);
            log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", 
                    LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
        }
        return null;
    }

    @Override
    public SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime) {
        try {
            Message<T> message = MessageBuilder.withPayload(msg).build();
            SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills(topic, message, pushTime);
            log.info("【{}】【{}】发送定时消息结果 状态码【{}】 MsgId【{}】", 
                    LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("【{}】【{}】发送定时消息异常", LOG_SIGN, msg, e);
        }
        return null;
    }

    @Override
    public SendResult syncSendOrderly(String topic, T msg, String hashKey) {
        try {
            Message<T> message = MessageBuilder.withPayload(msg).build();
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
            log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】", 
                    LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
            return sendResult;
        } catch (Exception e) {
            log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
        }
        return null;
    }
}

延迟消息级别说明

RocketMQ默认支持18个延迟级别:

Level延迟时间Level延迟时间Level延迟时间
11s73m139m
25s84m1410m
310s95m1520m
430s106m1630m
51m117m171h
62m128m182h

使用示例:

// 发送延迟5秒的消息(delayLevel = 2)
mqProducer.syncSend("TOPIC", "message", 2);

消息消费者封装

1. 并发消费者(高吞吐量场景)

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * 并发消费者 - 高吞吐量场景
 * 适用于: 日志处理、数据统计、通知发送等对顺序不敏感的场景
 */
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
    consumeThreadMax = 10, 
    consumeMode = ConsumeMode.CONCURRENTLY, 
    topic = "CONCURRENT_TOPIC", 
    consumerGroup = "CONCURRENT_CONSUMER_GROUP"
)
public class ConcurrentMQConsumer implements RocketMQListener<MessageExt> {
  
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            // 获取消息基本信息
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String keys = messageExt.getKeys();
            String msgId = messageExt.getMsgId();
            int reconsumeTimes = messageExt.getReconsumeTimes();
          
            // 获取消息体
            String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
          
            log.info("并发消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, ReconsumeTimes: {}", 
                    topic, tags, keys, msgId, reconsumeTimes);
          
            // 业务处理逻辑
            processMessage(messageBody, messageExt);
          
            log.info("并发消费成功 - MsgId: {}", msgId);
          
        } catch (Exception e) {
            log.error("并发消费失败 - MsgId: {}, 错误信息: {}", 
                    messageExt.getMsgId(), e.getMessage(), e);
          
            // 抛出异常会触发重试机制
            throw new RuntimeException("消息处理失败", e);
        }
    }
  
    /**
     * 业务消息处理
     */
    private void processMessage(String messageBody, MessageExt messageExt) {
        // 根据消息内容进行业务处理
        // 例如:数据入库、调用外部API、发送通知等
        log.info("处理消息内容: {}", messageBody);
      
        // 具体业务逻辑
        // ...
    }
}

2. 顺序消费者(严格顺序场景)

图解RocketMQ顺序发送消息

图解RocketMQ顺序消费消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * 顺序消费者 - 严格顺序场景
 * 适用于: 订单状态变更、账户余额变动、状态机流转等需要严格顺序的场景
 */
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
    consumeThreadMax = 10, 
    consumeMode = ConsumeMode.ORDERLY, 
    topic = "ORDERLY_TOPIC", 
    consumerGroup = "ORDERLY_CONSUMER_GROUP"
)
public class OrderlyMQConsumer implements RocketMQListener<MessageExt> {
  
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            // 获取消息基本信息
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String keys = messageExt.getKeys();
            String msgId = messageExt.getMsgId();
            int queueId = messageExt.getQueueId();
            long queueOffset = messageExt.getQueueOffset();
          
            // 获取消息体
            String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
          
            log.info("顺序消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, QueueId: {}, QueueOffset: {}", 
                    topic, tags, keys, msgId, queueId, queueOffset);
          
            // 业务处理逻辑
            processOrderlyMessage(messageBody, messageExt);
          
            log.info("顺序消费成功 - MsgId: {}, QueueId: {}, QueueOffset: {}", 
                    msgId, queueId, queueOffset);
          
        } catch (Exception e) {
            log.error("顺序消费失败 - MsgId: {}, QueueId: {}, 错误信息: {}", 
                    messageExt.getMsgId(), messageExt.getQueueId(), e.getMessage(), e);
          
            // 顺序消费失败时,当前队列会被暂停消费
            throw new RuntimeException("顺序消息处理失败", e);
        }
    }
  
    /**
     * 顺序业务消息处理
     */
    private void processOrderlyMessage(String messageBody, MessageExt messageExt) {
        // 顺序消费的业务处理
        // 例如:订单状态流转、账户变动等需要严格顺序的操作
      
        String orderKey = messageExt.getKeys();
        log.info("处理订单消息 - OrderKey: {}, Content: {}", orderKey, messageBody);
      
        // 模拟状态机处理
        processOrderStatus(orderKey, messageBody);
    }
  
    /**
     * 订单状态处理示例
     */
    private void processOrderStatus(String orderKey, String messageBody) {
        // 解析消息内容,处理订单状态变更
        // 保证同一订单的消息按顺序处理
        log.info("订单状态更新完成 - OrderKey: {}", orderKey);
      
        // 具体业务逻辑
        // ...
    }
}

3. 带TAG过滤的消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * 带TAG过滤的消费者
 * 只消费指定TAG的消息
 */
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
    consumeThreadMax = 10, 
    consumeMode = ConsumeMode.CONCURRENTLY, 
    topic = "USER_EVENT_TOPIC", 
    consumerGroup = "USER_EVENT_CONSUMER_GROUP",
    selectorType = SelectorType.TAG,
    selectorExpression = "LOGIN || LOGOUT || REGISTER"
)
public class UserEventMQConsumer implements RocketMQListener<MessageExt> {
  
    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String msgId = messageExt.getMsgId();
            String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
          
            log.info("用户事件消费 - Topic: {}, Tags: {}, MsgId: {}", topic, tags, msgId);
          
            // 根据不同TAG处理不同业务
            switch (tags) {
                case "LOGIN":
                    handleUserLogin(messageBody);
                    break;
                case "LOGOUT":
                    handleUserLogout(messageBody);
                    break;
                case "REGISTER":
                    handleUserRegister(messageBody);
                    break;
                default:
                    log.warn("未知的用户事件类型: {}", tags);
            }
          
            log.info("用户事件处理成功 - MsgId: {}, Tags: {}", msgId, tags);
          
        } catch (Exception e) {
            log.error("用户事件处理失败 - MsgId: {}, 错误信息: {}", 
                    messageExt.getMsgId(), e.getMessage(), e);
            throw new RuntimeException("用户事件处理失败", e);
        }
    }
  
    private void handleUserLogin(String messageBody) {
        log.info("处理用户登录事件: {}", messageBody);
        // 用户登录相关业务处理
    }
  
    private void handleUserLogout(String messageBody) {
        log.info("处理用户登出事件: {}", messageBody);
        // 用户登出相关业务处理
    }
  
    private void handleUserRegister(String messageBody) {
        log.info("处理用户注册事件: {}", messageBody);
        // 用户注册相关业务处理
    }
}

@RocketMQMessageListener 注解参数详解

核心参数说明

参数类型必填说明示例值
consumerGroupString消费者组名,同组消费者共同消费Topic"ORDER_CONSUMER_GROUP"
topicString要消费的Topic名称"ORDER_TOPIC"
consumeModeConsumeMode消费模式:CONCURRENTLY(并发)、ORDERLY(顺序)ConsumeMode.CONCURRENTLY
consumeThreadMaxint最大消费线程数,默认6410

过滤参数

参数类型默认值说明示例值
selectorTypeSelectorTypeTAG过滤类型:TAG、SQL92SelectorType.TAG
selectorExpressionString"*"过滤表达式"TagA|| TagB"

重试和超时参数

参数类型默认值说明最佳实践
maxReconsumeTimesint-1(16次)最大重试次数3-5次
consumeTimeoutlong15消费超时时间(分钟)10-20分钟

性能优化参数

参数类型默认值说明建议值
consumeMessageBatchMaxSizeint1批量消费大小1-32
pullIntervalint0拉取间隔(毫秒)0(实时)
pullBatchSizeint32拉取批次大小16-64

使用示例

生产者使用示例

@Service
public class MessageService {
  
    @Autowired
    private IMQProducer<String> mqProducer;
  
    public void sendMessage() {
        // 同步发送普通消息
        SendResult result = mqProducer.syncSend("ORDER_TOPIC", "订单创建消息");
      
        // 异步发送延迟消息
        mqProducer.asyncSend("ORDER_TOPIC", "延迟处理消息", 3);
      
        // 发送定时消息(1小时后执行)
        long deliverTime = System.currentTimeMillis() + 3600 * 1000;
        mqProducer.syncSendDeliverTimeMills("ORDER_TOPIC", "定时处理消息", deliverTime);
      
        // 发送有序消息
        mqProducer.syncSendOrderly("ORDER_TOPIC", "订单状态变更", "orderKey123");
      
        // 发送带TAG的消息
        mqProducer.syncSend("ORDER_TOPIC:CREATE", "订单创建消息");
    }
}

消费者使用示例

// 并发消费示例
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
    consumeThreadMax = 10, 
    consumeMode = ConsumeMode.CONCURRENTLY, 
    topic = "ORDER_TOPIC", 
    consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class OrderMQConsumer implements RocketMQListener<MessageExt> {
  
    @Override
    public void onMessage(MessageExt messageExt) {
        String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        log.info("处理订单消息: {}", messageBody);
        // 业务处理逻辑
    }
}

使用场景对比

并发消费 vs 顺序消费

特性并发消费顺序消费
吞吐量相对较低
顺序性不保证严格保证
适用场景日志、通知、统计订单、账户、状态机
线程模型多线程并发单线程顺序
重试影响不影响其他消息阻塞后续消息
参数设置consumeThreadMax = 10consumeThreadMax = 10

最佳实践建议

1. 生产者最佳实践

  • 异常处理: 完善的异常处理和日志记录
  • 重试机制: 合理设置重试次数,避免无效重试
  • 消息大小: 单条消息不超过4MB
  • 批量发送: 高并发场景考虑批量发送

2. 消费者最佳实践

  • 幂等处理: 确保消息处理的幂等性
  • 异常处理: 根据业务需要决定是否重试
  • 线程数配置: 根据业务特点合理设置线程数
  • 监控告警: 监控消费延迟和积压情况

3. Topic和Tag设计

  • Topic设计: 按业务模块划分Topic
  • TAG使用: 用于消息分类和过滤
  • 命名规范: 使用清晰的命名规范

总结

本文档提供了完整的RocketMQ消息队列解决方案,包含生产者和消费者的封装实现。通过合理的参数配置和代码设计,可以满足不同业务场景的需求:

  • 生产者: 支持同步、异步、延迟、定时、有序等多种发送模式
  • 消费者: 支持并发和顺序两种消费模式,参数配置灵活
  • 配置: 详细的yml配置说明,包含权限认证等高级特性

选择使用建议:

  • 高吞吐量场景: 使用并发消费,设置 consumeThreadMax = 10
  • 严格顺序场景: 使用顺序消费,确保消息按序处理
  • 业务隔离: 不同业务使用不同的Topic和ConsumerGroup