Spring Boot 集成 RocketMQ 实战详解(含顺序消息、事务消息、消息过滤)作者:叶虽分类:Java · Spring Boot · 消息队列 · 中间件标签:RocketMQSpring Boot消息队列顺序消息事务消息延迟消息难度:⭐⭐⭐前言在分布式系统中,消息队列承担着异步解耦、削峰填谷、可靠传递三大核心职责。Apache RocketMQ 作为阿里巴巴开源的工业级消息中间件,具备以下突出优势:🚀高吞吐:单机可达百万级 TPS🔒强一致:支持事务消息,保证分布式事务最终一致性⏰延迟消息:内置 18 个延迟等级,支持定时投递🔁顺序消息:全局有序 / 分区有序两种模式💾消息持久化:基于 CommitLog 的顺序写,兼顾性能与可靠性本文将从零开始,基于Spring Boot 3.2.x + RocketMQ 5.x完整演示:环境搭建与依赖配置普通消息收发(同步 / 异步 / 单向)顺序消息(分区有序)延迟消息事务消息消息过滤(Tag / SQL92)消费者并发控制与重试策略常见问题与生产最佳实践一、环境准备1.1 版本说明组件版本JDK17+Spring Boot3.2.5RocketMQ Server5.1.4rocketmq-spring-boot-starter2.3.01.2 快速启动 RocketMQ(Docker Compose)新建docker-compose.yml:version:'3.8'services:namesrv:image:apache/rocketmq:5.1.4container_name:rmq-namesrvports:-"9876:9876"command:sh mqnamesrvenvironment:JAVA_OPT_EXT:"-Xms256m -Xmx256m"broker:image:apache/rocketmq:5.1.4container_name:rmq-brokerports:-"10909:10909"-"10911:10911"-"10912:10912"depends_on:-namesrvenvironment:NAMESRV_ADDR:"namesrv:9876"JAVA_OPT_EXT:"-Xms512m -Xmx512m"command:sh mqbroker-n namesrv:9876 autoCreateTopicEnable=truedashboard:image:apacherocketmq/rocketmq-dashboard:latestcontainer_name:rmq-dashboardports:-"8180:8080"depends_on:-namesrvenvironment:JAVA_OPTS:"-Drocketmq.namesrv.addr=namesrv:9876"docker-composeup-d启动后访问:NameServer:localhost:9876可视化控制台:http://localhost:8180二、Spring Boot 项目集成2.1 Maven 依赖propertiesjava.version17/java.versionspring-boot.version3.2.5/spring-boot.versionrocketmq-starter.version2.3.0/rocketmq-starter.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- RocketMQ Spring Boot Starter --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion${rocketmq-starter.version}/version/dependency!-- Lombok(可选,简化代码) --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies2.2 application.yml 基础配置server:port:8080spring:application:name:rocketmq-demo# RocketMQ 核心配置rocketmq:# NameServer 地址(集群多个用分号分隔)name-server:127.0.0.1:9876# 生产者配置producer:group:demo-producer-group# 生产者组名(同一业务唯一)send-message-timeout:3000# 发送超时时间(毫秒)retry-times-when-send-failed:2# 同步发送失败重试次数retry-times-when-send-async-failed:2# 异步发送失败重试次数compress-message-body-threshold:4096# 消息体超过此阈值自动压缩(字节)max-message-size:4194304# 最大消息体积:4MB# 消费者公共配置(各 @RocketMQMessageListener 可独立覆盖)consumer:pull-batch-size:32# 批量拉取消息条数# 自定义 Topic 常量(便于统一管理)app:rocketmq:topic:order:ORDER_TOPICpayment:PAYMENT_TOPICdelay:DELAY_TOPICtransaction:TRANSACTION_TOPIC2.3 Topic 常量类packagecom.example.rocketmq.constant;/** * RocketMQ Topic 常量定义 * p统一管理所有 Topic 名称,避免硬编码散落各处/p * * @author 叶虽 */publicfinalclassMqTopics{privateMqTopics(){}/** 订单消息 Topic */publicstaticfinalStringORDER_TOPIC="ORDER_TOPIC";/** 支付消息 Topic */publicstaticfinalStringPAYMENT_TOPIC="PAYMENT_TOPIC";/** 延迟消息 Topic */publicstaticfinalStringDELAY_TOPIC="DELAY_TOPIC";/** 事务消息 Topic */publicstaticfinalStringTRANSACTION_TOPIC="TRANSACTION_TOPIC";// ---- 消费者组 ----publicstaticfinalStringORDER_CONSUMER_GROUP="order-consumer-group";publicstaticfinalStringPAYMENT_CONSUMER_GROUP="payment-consumer-group";publicstaticfinalStringDELAY_CONSUMER_GROUP="delay-consumer-group";}三、消息体定义packagecom.example.rocketmq.dto;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;importjava.io.Serializable;importjava.math.BigDecimal;importjava.time.LocalDateTime;/** * 订单消息体 * * @author 叶虽 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassOrderMessageimplementsSerializable{/** 订单 ID */privateStringorderId;/** 用户 ID */privateLonguserId;/** 订单金额 */privateBigDecimalamount;/** 订单状态:CREATED / PAID / SHIPPED / COMPLETED / CANCELLED */privateStringstatus;/** 消息创建时间 */privateLocalDateTimecreateTime;/** 备注 */privateStringremark;}四、普通消息:同步 / 异步 / 单向4.1 生产者服务packagecom.example.rocketmq.producer;importcom.example.rocketmq.constant.MqTopics;importcom.example.rocketmq.dto.OrderMessage;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Service;/** * 普通消息生产者 * * @author 叶虽 */@Slf4j@Service@RequiredArgsConstructorpublicclassOrderProducer{privatefinalRocketMQTemplaterocketMQTemplate;// ----------------------------------------------------------------// 同步发送:等待 Broker 确认,适合对可靠性要求高的场景// ----------------------------------------------------------------/** * 同步发送订单消息 * * @param order 订单消息体 * @return 发送结果 */publicSendResultsendSync(OrderMessageorder){MessageOrderMessagemessage=MessageBuilder.withPayload(order).setHeader("orderId",order.