RocketMQ生产者与消费者完整封装技术文档
概述
本文档提供了基于Spring Boot的RocketMQ完整解决方案,包括消息生产者和消费者的封装实现。支持同步发送、异步发送、延迟消息、定时消息、有序消息以及并发消费、顺序消费等多种模式。
依赖配置
在项目的 pom.xml中添加RocketMQ Spring Boot Starter依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.3</version>
</dependency>
配置文件
application.yml配置
在 application.yml中添加RocketMQ配置:
rocketmq:
# NameServer地址,多个用分号分隔
name-server: localhost:9876
producer:
# 生产者组名,同一个组内的生产者发送相同Topic的消息
group: prodProduceGroup
# 是否启用消息轨迹,用于消息的发送和消费轨迹追踪
# 开启后可在控制台查看消息的完整链路,但会影响性能
enable-msg-trace: false
# 访问密钥,用于RocketMQ的权限验证
access-key: '00phgp34'
# 秘密密钥,与access-key配对使用
secret-key: 'errtteeet3!'
# 发送消息超时时间(毫秒),默认3000ms
send-message-timeout: 10000
# 发送失败重试次数,默认2次
retry-times-when-send-failed: 3
# 消息最大大小(字节),默认4MB
max-message-size: 4194304
consumer:
# 是否启用消息轨迹
enable-msg-trace: false
# 消费者访问密钥
access-key: '00phgp34'
# 消费者秘密密钥
secret-key: 'errtteeet3!'
# 拉取消息超时时间(毫秒)
pull-timeout: 10000
broker.conf配置
RocketMQ Broker服务端配置文件:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Broker对外服务的IP地址,客户端连接的IP
brokerIP1=公网IP
# NameServer的地址,如果有多个的话,使用分号分隔开
namesrvAddr=公网IP:30001
# Broker端口号,默认10911
listenPort=30002
# store的存储路径
storePathRootDir=/home/rocketmq/data/store
# commitLog的存储路径
storePathCommitLog=/home/rocketmq/data/store/commitlog
# 消费队列的存储路径
storePathConsumeQueue=/home/rocketmq/data/store/consumequeue
# 消息索引的存储路径
storePathIndex=/home/rocketmq/data/store/index
# checkpoint文件的存储路径
storeCheckpoint=/home/rocketmq/data/store/checkpoint
# abort文件的存储路径
abortFile=/home/rocketmq/data/store/abort
# 集群名称
brokerClusterName = DefaultCluster
# Broker名称
brokerName = broker-a
# BrokerID,0表示Master,大于0表示Slave
brokerId = 0
# 该时间清理过期数据
deleteWhen = 04
# 文件超过N小时没改变算是过期(小时)
fileReservedTime = 72
# Broker角色:ASYNC_MASTER异步复制Master、SYNC_MASTER同步双写Master、SLAVE
brokerRole = ASYNC_MASTER
# 延迟消息时间级别(delayLevel 1对应1s, 2对应5s,依此类推)
# 默认18个级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
# 发送队列等待时间
waitTimeMillsInSendQueue=999999
# 刷盘方式:SYNC_FLUSH同步刷盘、ASYNC_FLUSH异步刷盘
flushDiskType = SYNC_FLUSH
# 存储消息时使用可重入锁
useReentrantLockWhenPutMessage=true
# 消息最大大小,默认4MB,这里设置为10MB
maxMessageSize=4194304
# 消息属性最大大小,默认32KB,这里设置为64KB
maxPropertySize=65536
# 启用访问控制列表(ACL)权限验证
aclEnable=true
# ACL配置文件路径
aclConfig=/home/rocketmq/rocketmq-all-5.1.2-bin-release/conf/plain_acl.yml
plain_acl.yml配置
RocketMQ访问控制列表(ACL)权限配置文件:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 全局白名单IP地址,这些IP可以无需认证访问RocketMQ
globalWhiteRemoteAddresses:
- 127.0.0.1 # 本地回环地址
- 公网IP # RocketMQ服务器公网IP
- 192.168.1.100 # RocketMQ服务器内网IP
# 用户账户配置
accounts:
- accessKey: 00phgp34 # 访问密钥,与客户端配置保持一致
secretKey: errtteeet3! # 秘密密钥,与客户端配置保持一致
whiteRemoteAddress: 192.168.1.* # 该用户允许访问的IP范围
admin: true # 是否为管理员账户
部署脚本
启动脚本 start.sh
#!/bin/bash
logsDir=/home/rocketmq/logs
installDir=/home/rocketmq/rocketmq-all-5.1.2-bin-release
mkdir -p $logsDir
# 设置 JVM 启动参数,仅输出 ERROR 日志
export JAVA_OPT="$JAVA_OPT -Drocketmq.log.level=ERROR"
# 启动 NameServer
nohup sh $installDir/bin/mqnamesrv -c $installDir/conf/namesrv.properties > $logsDir/mqnamesrv.out 2>&1 &
# 启动 Broker,指定 NameServer 地址
nohup sh $installDir/bin/mqbroker -n 公网IP:9876 -c $installDir/conf/broker.conf > $logsDir/broker.out 2>&1 &
# 启动管理控制台(可选)
#nohup java -jar /home/rocketmq/console/rocketmq-dashboard-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=公网IP:9876 > $logsDir/dashboard.out 2>&1 &
停止脚本 stop.sh
#!/bin/bash
cd /home/rocketmq/rocketmq-all-5.1.2-bin-release/bin
# 停止 Broker
sh mqshutdown broker
# 停止 NameServer
sh mqshutdown namesrv
# 停止管理控制台
jarname='rocketmq-dashboard-1.0.0.jar'
pid=`ps aux | grep $jarname | grep -v grep | awk '{print $2}'`
echo $pid
kill -9 $pid
脚本说明
启动脚本功能:
- 日志目录创建:自动创建日志存储目录
- JVM参数设置:配置日志级别为ERROR,减少日志输出
- NameServer启动:使用默认9876端口启动注册中心
- Broker启动:指定NameServer地址启动消息服务
- 控制台启动:可选的Web管理界面(注释状态)
停止脚本功能:
- 服务停止:按顺序停止Broker和NameServer
- 进程清理:强制杀死管理控制台进程
- 安全关闭:确保所有RocketMQ相关进程完全退出
使用方法:
# 赋予执行权限
chmod +x start.sh stop.sh
# 启动RocketMQ
./start.sh
# 停止RocketMQ
./stop.sh
注意事项:
- 确保脚本中的路径与实际安装路径一致
- NameServer端口9876与broker.conf中的配置保持一致
- 日志文件会输出到指定的logs目录便于排查问题
消息生产者封装
接口定义
IMQProducer 接口
import org.apache.rocketmq.client.producer.SendResult;
/**
* 消息队列生产者
*/
public interface IMQProducer<T> {
/**
* 异步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
*/
void asyncSend(String topic, T msg, int delayLevel);
/**
* 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
*/
SendResult syncSend(String topic, T msg, int delayLevel);
/**
* 同步发送消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
*/
SendResult syncSend(String topic, T msg);
/**
* 定时投递消息 如果要发送指定TAG,topic参数格式TOPIC:TAG
*/
SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime);
/**
* 有序发送消息
*/
SendResult syncSendOrderly(String topic, T msg, String hashKey);
}
生产者实现
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import jakarta.annotation.Resource;
@Service
@Slf4j
public class RocketMQProducer<T> implements IMQProducer<T> {
private final String LOG_SIGN = "生产者消息";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void asyncSend(String topic, T msg, int delayLevel) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【{}】【{}】发送成功, 状态码【{}】", LOG_SIGN, message, sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
log.warn("【{}】【{}】发送异常", LOG_SIGN, message, throwable);
}
}, 10000, delayLevel);
} catch (Exception e) {
log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
}
}
@Override
public SendResult syncSend(String topic, T msg, int delayLevel) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000, delayLevel);
log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】",
LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
}
return null;
}
@Override
public SendResult syncSend(String topic, T msg) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 10000);
log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】",
LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
}
return null;
}
@Override
public SendResult syncSendDeliverTimeMills(String topic, T msg, long pushTime) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills(topic, message, pushTime);
log.info("【{}】【{}】发送定时消息结果 状态码【{}】 MsgId【{}】",
LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("【{}】【{}】发送定时消息异常", LOG_SIGN, msg, e);
}
return null;
}
@Override
public SendResult syncSendOrderly(String topic, T msg, String hashKey) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
log.info("【{}】【{}】发送结果 状态码【{}】 MsgId【{}】",
LOG_SIGN, msg, sendResult.getSendStatus(), sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("【{}】【{}】发送异常", LOG_SIGN, msg, e);
}
return null;
}
}
延迟消息级别说明
RocketMQ默认支持18个延迟级别:
| Level | 延迟时间 | Level | 延迟时间 | Level | 延迟时间 |
|---|---|---|---|---|---|
| 1 | 1s | 7 | 3m | 13 | 9m |
| 2 | 5s | 8 | 4m | 14 | 10m |
| 3 | 10s | 9 | 5m | 15 | 20m |
| 4 | 30s | 10 | 6m | 16 | 30m |
| 5 | 1m | 11 | 7m | 17 | 1h |
| 6 | 2m | 12 | 8m | 18 | 2h |
使用示例:
// 发送延迟5秒的消息(delayLevel = 2)
mqProducer.syncSend("TOPIC", "message", 2);
消息消费者封装
1. 并发消费者(高吞吐量场景)
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* 并发消费者 - 高吞吐量场景
* 适用于: 日志处理、数据统计、通知发送等对顺序不敏感的场景
*/
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.CONCURRENTLY,
topic = "CONCURRENT_TOPIC",
consumerGroup = "CONCURRENT_CONSUMER_GROUP"
)
public class ConcurrentMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
try {
// 获取消息基本信息
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
String msgId = messageExt.getMsgId();
int reconsumeTimes = messageExt.getReconsumeTimes();
// 获取消息体
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("并发消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, ReconsumeTimes: {}",
topic, tags, keys, msgId, reconsumeTimes);
// 业务处理逻辑
processMessage(messageBody, messageExt);
log.info("并发消费成功 - MsgId: {}", msgId);
} catch (Exception e) {
log.error("并发消费失败 - MsgId: {}, 错误信息: {}",
messageExt.getMsgId(), e.getMessage(), e);
// 抛出异常会触发重试机制
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 业务消息处理
*/
private void processMessage(String messageBody, MessageExt messageExt) {
// 根据消息内容进行业务处理
// 例如:数据入库、调用外部API、发送通知等
log.info("处理消息内容: {}", messageBody);
// 具体业务逻辑
// ...
}
}
2. 顺序消费者(严格顺序场景)
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* 顺序消费者 - 严格顺序场景
* 适用于: 订单状态变更、账户余额变动、状态机流转等需要严格顺序的场景
*/
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.ORDERLY,
topic = "ORDERLY_TOPIC",
consumerGroup = "ORDERLY_CONSUMER_GROUP"
)
public class OrderlyMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
try {
// 获取消息基本信息
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
String msgId = messageExt.getMsgId();
int queueId = messageExt.getQueueId();
long queueOffset = messageExt.getQueueOffset();
// 获取消息体
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("顺序消费消息 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}, QueueId: {}, QueueOffset: {}",
topic, tags, keys, msgId, queueId, queueOffset);
// 业务处理逻辑
processOrderlyMessage(messageBody, messageExt);
log.info("顺序消费成功 - MsgId: {}, QueueId: {}, QueueOffset: {}",
msgId, queueId, queueOffset);
} catch (Exception e) {
log.error("顺序消费失败 - MsgId: {}, QueueId: {}, 错误信息: {}",
messageExt.getMsgId(), messageExt.getQueueId(), e.getMessage(), e);
// 顺序消费失败时,当前队列会被暂停消费
throw new RuntimeException("顺序消息处理失败", e);
}
}
/**
* 顺序业务消息处理
*/
private void processOrderlyMessage(String messageBody, MessageExt messageExt) {
// 顺序消费的业务处理
// 例如:订单状态流转、账户变动等需要严格顺序的操作
String orderKey = messageExt.getKeys();
log.info("处理订单消息 - OrderKey: {}, Content: {}", orderKey, messageBody);
// 模拟状态机处理
processOrderStatus(orderKey, messageBody);
}
/**
* 订单状态处理示例
*/
private void processOrderStatus(String orderKey, String messageBody) {
// 解析消息内容,处理订单状态变更
// 保证同一订单的消息按顺序处理
log.info("订单状态更新完成 - OrderKey: {}", orderKey);
// 具体业务逻辑
// ...
}
}
3. 带TAG过滤的消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* 带TAG过滤的消费者
* 只消费指定TAG的消息
*/
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.CONCURRENTLY,
topic = "USER_EVENT_TOPIC",
consumerGroup = "USER_EVENT_CONSUMER_GROUP",
selectorType = SelectorType.TAG,
selectorExpression = "LOGIN || LOGOUT || REGISTER"
)
public class UserEventMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String msgId = messageExt.getMsgId();
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("用户事件消费 - Topic: {}, Tags: {}, MsgId: {}", topic, tags, msgId);
// 根据不同TAG处理不同业务
switch (tags) {
case "LOGIN":
handleUserLogin(messageBody);
break;
case "LOGOUT":
handleUserLogout(messageBody);
break;
case "REGISTER":
handleUserRegister(messageBody);
break;
default:
log.warn("未知的用户事件类型: {}", tags);
}
log.info("用户事件处理成功 - MsgId: {}, Tags: {}", msgId, tags);
} catch (Exception e) {
log.error("用户事件处理失败 - MsgId: {}, 错误信息: {}",
messageExt.getMsgId(), e.getMessage(), e);
throw new RuntimeException("用户事件处理失败", e);
}
}
private void handleUserLogin(String messageBody) {
log.info("处理用户登录事件: {}", messageBody);
// 用户登录相关业务处理
}
private void handleUserLogout(String messageBody) {
log.info("处理用户登出事件: {}", messageBody);
// 用户登出相关业务处理
}
private void handleUserRegister(String messageBody) {
log.info("处理用户注册事件: {}", messageBody);
// 用户注册相关业务处理
}
}
@RocketMQMessageListener 注解参数详解
核心参数说明
| 参数 | 类型 | 必填 | 说明 | 示例值 |
|---|---|---|---|---|
consumerGroup | String | ✓ | 消费者组名,同组消费者共同消费Topic | "ORDER_CONSUMER_GROUP" |
topic | String | ✓ | 要消费的Topic名称 | "ORDER_TOPIC" |
consumeMode | ConsumeMode | ✗ | 消费模式:CONCURRENTLY(并发)、ORDERLY(顺序) | ConsumeMode.CONCURRENTLY |
consumeThreadMax | int | ✗ | 最大消费线程数,默认64 | 10 |
过滤参数
| 参数 | 类型 | 默认值 | 说明 | 示例值 |
|---|---|---|---|---|
selectorType | SelectorType | TAG | 过滤类型:TAG、SQL92 | SelectorType.TAG |
selectorExpression | String | "*" | 过滤表达式 | "TagA|| TagB" |
重试和超时参数
| 参数 | 类型 | 默认值 | 说明 | 最佳实践 |
|---|---|---|---|---|
maxReconsumeTimes | int | -1(16次) | 最大重试次数 | 3-5次 |
consumeTimeout | long | 15 | 消费超时时间(分钟) | 10-20分钟 |
性能优化参数
| 参数 | 类型 | 默认值 | 说明 | 建议值 |
|---|---|---|---|---|
consumeMessageBatchMaxSize | int | 1 | 批量消费大小 | 1-32 |
pullInterval | int | 0 | 拉取间隔(毫秒) | 0(实时) |
pullBatchSize | int | 32 | 拉取批次大小 | 16-64 |
使用示例
生产者使用示例
@Service
public class MessageService {
@Autowired
private IMQProducer<String> mqProducer;
public void sendMessage() {
// 同步发送普通消息
SendResult result = mqProducer.syncSend("ORDER_TOPIC", "订单创建消息");
// 异步发送延迟消息
mqProducer.asyncSend("ORDER_TOPIC", "延迟处理消息", 3);
// 发送定时消息(1小时后执行)
long deliverTime = System.currentTimeMillis() + 3600 * 1000;
mqProducer.syncSendDeliverTimeMills("ORDER_TOPIC", "定时处理消息", deliverTime);
// 发送有序消息
mqProducer.syncSendOrderly("ORDER_TOPIC", "订单状态变更", "orderKey123");
// 发送带TAG的消息
mqProducer.syncSend("ORDER_TOPIC:CREATE", "订单创建消息");
}
}
消费者使用示例
// 并发消费示例
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.CONCURRENTLY,
topic = "ORDER_TOPIC",
consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class OrderMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("处理订单消息: {}", messageBody);
// 业务处理逻辑
}
}
使用场景对比
并发消费 vs 顺序消费
| 特性 | 并发消费 | 顺序消费 |
|---|---|---|
| 吞吐量 | 高 | 相对较低 |
| 顺序性 | 不保证 | 严格保证 |
| 适用场景 | 日志、通知、统计 | 订单、账户、状态机 |
| 线程模型 | 多线程并发 | 单线程顺序 |
| 重试影响 | 不影响其他消息 | 阻塞后续消息 |
| 参数设置 | consumeThreadMax = 10 | consumeThreadMax = 10 |
最佳实践建议
1. 生产者最佳实践
- 异常处理: 完善的异常处理和日志记录
- 重试机制: 合理设置重试次数,避免无效重试
- 消息大小: 单条消息不超过4MB
- 批量发送: 高并发场景考虑批量发送
2. 消费者最佳实践
- 幂等处理: 确保消息处理的幂等性
- 异常处理: 根据业务需要决定是否重试
- 线程数配置: 根据业务特点合理设置线程数
- 监控告警: 监控消费延迟和积压情况
3. Topic和Tag设计
- Topic设计: 按业务模块划分Topic
- TAG使用: 用于消息分类和过滤
- 命名规范: 使用清晰的命名规范
总结
本文档提供了完整的RocketMQ消息队列解决方案,包含生产者和消费者的封装实现。通过合理的参数配置和代码设计,可以满足不同业务场景的需求:
- 生产者: 支持同步、异步、延迟、定时、有序等多种发送模式
- 消费者: 支持并发和顺序两种消费模式,参数配置灵活
- 配置: 详细的yml配置说明,包含权限认证等高级特性
选择使用建议:
- 高吞吐量场景: 使用并发消费,设置
consumeThreadMax = 10 - 严格顺序场景: 使用顺序消费,确保消息按序处理
- 业务隔离: 不同业务使用不同的Topic和ConsumerGroup