一、前置准备安装并启动 RabbitMQ默认端口 5672JDK 8、Maven、IDEA所有项目通用工具类 通用 pom直接复制二、全局统一配置所有项目必用1. 公共连接工具类 ConnectionUtil.javajava运行package com.mq.util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * RabbitMQ 公共连接工具类所有项目通用 */ public class ConnectionUtil { // 获取RabbitMQ连接 public static Connection getConnection() throws Exception { ConnectionFactory factory new ConnectionFactory(); factory.setHost(localhost); factory.setPort(5672); factory.setUsername(guest); factory.setPassword(guest); return factory.newConnection(); } // 关闭连接和通道 public static void close(Connection conn, Channel channel) { try { if (channel ! null) channel.close(); if (conn ! null) conn.close(); } catch (Exception e) { e.printStackTrace(); } } }2. 通用 pom.xml原生 Java 项目xml?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion groupIdcom.mq/groupId !-- 替换为项目名simple-demo / work-demo 等 -- artifactId项目名/artifactId version1.0/version dependencies !-- RabbitMQ客户端依赖 -- dependency groupIdcom.rabbitmq/groupId artifactIdamqp-client/artifactId version5.18.0/version /dependency /dependencies /project三、项目 1simple-demo 简单模式项目结构plaintextsimple-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xml完整代码生产者 Producer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE simple_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); String msg 简单模式消息; channel.basicPublish(, QUEUE, null, msg.getBytes()); System.out.println(发送 msg); ConnectionUtil.close(conn, channel); } }消费者 Consumer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer { private static final String QUEUE simple_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback (tag, msg) - { System.out.println(接收 new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag - {}); System.out.println(等待消息...); } }✅ 运行顺序必须先开消费者运行Consumer保持运行运行Producer消费者控制台打印消息四、项目 2work-demo 工作队列模式项目结构plaintextwork-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xml完整代码生产者 Producer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE work_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); // 发送10条消息 for (int i 1; i 10; i) { String msg 任务 i; channel.basicPublish(, QUEUE, null, msg.getBytes()); System.out.println(发送 msg); } ConnectionUtil.close(conn, channel); } }消费者 1 Consumer1.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String QUEUE work_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback (tag, msg) - { System.out.println(消费者1接收 new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag - {}); } }消费者 2 Consumer2.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String QUEUE work_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); DeliverCallback callback (tag, msg) - { System.out.println(消费者2接收 new String(msg.getBody())); }; channel.basicConsume(QUEUE, true, callback, tag - {}); } }✅ 运行顺序运行Consumer1运行Consumer2运行Producer两个消费者轮流接收消息轮询分发五、项目 3fanout-demo 发布订阅模式项目结构plaintextfanout-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xml完整代码生产者 Producer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE fanout_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, fanout); String msg 广播消息; channel.basicPublish(EXCHANGE, , null, msg.getBytes()); System.out.println(发送 msg); ConnectionUtil.close(conn, channel); } }消费者 1 Consumer1.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String EXCHANGE fanout_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, fanout); String queue channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE, ); DeliverCallback callback (tag, msg) - { System.out.println(消费者1接收 new String(msg.getBody())); }; channel.basicConsume(queue, true, callback, tag - {}); } }消费者 2 Consumer2.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String EXCHANGE fanout_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, fanout); String queue channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE, ); DeliverCallback callback (tag, msg) - { System.out.println(消费者2接收 new String(msg.getBody())); }; channel.basicConsume(queue, true, callback, tag - {}); } }✅ 运行顺序运行Consumer1运行Consumer2运行Producer两个消费者都收到同一条消息广播六、项目 4direct-demo 路由模式项目结构plaintextdirect-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── ConsumerInfo.java │ └── ConsumerError.java └── pom.xml完整代码生产者 Producer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE direct_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, direct); // 发送不同路由键消息 channel.basicPublish(EXCHANGE, info, null, info日志.getBytes()); channel.basicPublish(EXCHANGE, error, null, error日志.getBytes()); ConnectionUtil.close(conn, channel); } }消费者 ConsumerInfo.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class ConsumerInfo { public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(direct_exchange, direct); String queue channel.queueDeclare().getQueue(); channel.queueBind(queue, direct_exchange, info); channel.basicConsume(queue, true, (t, m) - { System.out.println(info接收 new String(m.getBody())); }, t - {}); } }消费者 ConsumerError.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class ConsumerError { public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(direct_exchange, direct); String queue channel.queueDeclare().getQueue(); channel.queueBind(queue, direct_exchange, error); channel.basicConsume(queue, true, (t, m) - { System.out.println(error接收 new String(m.getBody())); }, t - {}); } }✅ 运行顺序运行ConsumerInfo运行ConsumerError运行Producer各自只收到对应路由键的消息七、项目 5topic-demo 主题模式完整代码项目结构plaintexttopic-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ ├── Consumer1.java │ └── Consumer2.java └── pom.xmlProducer.java生产者java运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE topic_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, topic); // 发送两条带路由键的消息 channel.basicPublish(EXCHANGE, user.save, null, 用户新增.getBytes()); channel.basicPublish(EXCHANGE, order.pay, null, 订单支付.getBytes()); System.out.println(主题模式消息发送完成); ConnectionUtil.close(conn, channel); } }Consumer1.java匹配 user.#java运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer1 { private static final String EXCHANGE topic_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, topic); String queue channel.queueDeclare().getQueue(); // 绑定通配符匹配所有 user 开头的路由 channel.queueBind(queue, EXCHANGE, user.#); channel.basicConsume(queue, true, (tag, msg) - { System.out.println(消费者1(user.#) 接收 new String(msg.getBody())); }, tag - {}); } }Consumer2.java匹配 *.payjava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer2 { private static final String EXCHANGE topic_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, topic); String queue channel.queueDeclare().getQueue(); // 绑定通配符匹配所有以 .pay 结尾的路由 channel.queueBind(queue, EXCHANGE, *.pay); channel.basicConsume(queue, true, (tag, msg) - { System.out.println(消费者2(*.pay) 接收 new String(msg.getBody())); }, tag - {}); } }✅ 运行顺序运行Consumer1运行Consumer2运行Producer结果消费者 1 收到用户新增消费者 2 收到订单支付八、项目 6headers-demo 首部匹配模式完整代码项目结构plaintextheaders-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xmlProducer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Map; public class Producer { private static final String EXCHANGE headers_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, headers); // 设置消息头 AMQP.BasicProperties props new AMQP.BasicProperties.Builder() .headers(Map.of(type, sms)).build(); channel.basicPublish(EXCHANGE, , props, 短信消息.getBytes()); System.out.println(首部模式消息发送完成); ConnectionUtil.close(conn, channel); } }Consumer.javajava运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.util.Map; public class Consumer { private static final String EXCHANGE headers_exchange; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); channel.exchangeDeclare(EXCHANGE, headers); String queue channel.queueDeclare().getQueue(); // 匹配headers任意一个满足即可 MapString, Object args Map.of(x-match, any, type, sms); channel.queueBind(queue, EXCHANGE, , args); channel.basicConsume(queue, true, (tag, msg) - { System.out.println(首部匹配消费者接收 new String(msg.getBody())); }, tag - {}); } }✅ 运行顺序运行Consumer运行Producer消费者收到短信消息九、项目 7delay-demo 延迟队列完整代码项目结构plaintextdelay-demo ├── src/main/java/com/mq │ ├── util/ConnectionUtil.java │ ├── Producer.java │ └── Consumer.java └── pom.xmlProducer.java生产者 —— 发送到过期队列java运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.HashMap; import java.util.Map; public class Producer { // 过期队列消息在这里等待5秒 private static final String WAIT_QUEUE wait_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); // 核心过期队列配置死信转发 MapString, Object params new HashMap(); // 消息过期时间 5秒 params.put(x-message-ttl, 5000); // 过期后转发到死信交换机 params.put(x-dead-letter-exchange, ); // 过期后路由到真正队列 params.put(x-dead-letter-routing-key, real_delay_queue); // 声明过期等待队列 channel.queueDeclare(WAIT_QUEUE, true, false, false, params); // 发送消息不设置expiration队列统一过期 String msg 我是延迟5秒的消息; channel.basicPublish(, WAIT_QUEUE, null, msg.getBytes()); System.out.println(已发送延迟消息等待5秒后到达消费者...); ConnectionUtil.close(conn, channel); } }Consumer.java消费者 —— 监听真正队列java运行package com.mq; import com.mq.util.ConnectionUtil; import com.rabbitmq.client.*; public class Consumer { // 真正消费的队列5秒后消息才会来 private static final String REAL_QUEUE real_delay_queue; public static void main(String[] args) throws Exception { Connection conn ConnectionUtil.getConnection(); Channel channel conn.createChannel(); // 声明真正的消费队列 channel.queueDeclare(REAL_QUEUE, true, false, false, null); System.out.println(延迟消费者已启动等待5秒后收到消息...); // 监听消费 channel.basicConsume(REAL_QUEUE, true, (tag, msg) - { System.out.println(✅ 收到延迟消息 new String(msg.getBody())); }, tag - {}); } }十、项目 8springboot-rabbitmq-demo完整代码pom.xmlxml?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version2.7.15/version /parent modelVersion4.0.0/modelVersion groupIdcom.mq/groupId artifactIdspringboot-rabbitmq-demo/artifactId version1.0/version dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency /dependencies /projectapplication.ymlyamlspring: rabbitmq: host: localhost port: 5672 username: guest password: guestRabbitApplication.java启动类java运行package com.mq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class RabbitApplication { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); } }RabbitConfig.javajava运行package com.mq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class RabbitConfig { Bean public Queue queue(){ return new Queue(boot_queue); } }ProducerController.javajava运行package com.mq.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; RestController public class ProducerController { Autowired private RabbitTemplate template; GetMapping(/send) public String send(){ template.convertAndSend(boot_queue, SpringBoot集成RabbitMQ消息); return 消息发送成功; } }Consumer.javajava运行package com.mq.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; Component public class Consumer { RabbitListener(queues boot_queue) public void receive(String msg){ System.out.println(SpringBoot消费者接收msg); } }✅ 运行顺序启动RabbitApplication浏览器访问http://localhost:8080/send控制台打印消息十一、项目 9rabbitmq-microservice-demo 微服务异步通信完整代码启动类 MicroApplication.javajava运行package com.mq.micro; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class MicroApplication { public static void main(String[] args) { SpringApplication.run(MicroApplication.class,args); } }配置类 RabbitMicroConfig.javajava运行package com.mq.micro.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class RabbitMicroConfig { public static final String ORDER_EXCHANGE order_business_exchange; public static final String STOCK_QUEUE stock_reduce_queue; public static final String SMS_QUEUE sms_send_queue; public static final String ROUTING_STOCK order.stock; public static final String ROUTING_SMS order.sms; Bean public DirectExchange orderExchange(){ return new DirectExchange(ORDER_EXCHANGE,true,false); } Bean public Queue stockQueue(){ return new Queue(STOCK_QUEUE,true); } Bean public Queue smsQueue(){ return new Queue(SMS_QUEUE,true); } Bean public Binding stockBinding(){ return BindingBuilder.bind(stockQueue()).to(orderExchange()).with(ROUTING_STOCK); } Bean public Binding smsBinding(){ return BindingBuilder.bind(smsQueue()).to(orderExchange()).with(ROUTING_SMS); } }订单生产者 OrderProducer.javajava运行package com.mq.micro.order; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; RestController public class OrderProducer { Resource private RabbitTemplate rabbitTemplate; GetMapping(/create/order/{orderNo}) public String createOrder(PathVariable String orderNo){ String orderMsg 订单orderNo 支付完成; rabbitTemplate.convertAndSend(RabbitMicroConfig.ORDER_EXCHANGE, RabbitMicroConfig.ROUTING_STOCK,orderMsg); rabbitTemplate.convertAndSend(RabbitMicroConfig.ORDER_EXCHANGE, RabbitMicroConfig.ROUTING_SMS,orderMsg); return 订单创建成功消息已推送; } }库存消费者 StockConsumer.javajava运行package com.mq.micro.stock; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; Component public class StockConsumer { RabbitListener(queues RabbitMicroConfig.STOCK_QUEUE) public void reduceStock(String msg){ System.out.println(【库存微服务】收到msg); System.out.println(【库存微服务】执行商品库存扣减...); } }短信消费者 SmsConsumer.javajava运行package com.mq.micro.sms; import com.mq.micro.config.RabbitMicroConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; Component public class SmsConsumer { RabbitListener(queues RabbitMicroConfig.SMS_QUEUE) public void sendSms(String msg){ System.out.println(【短信微服务】收到msg); System.out.println(【短信微服务】执行发送下单成功短信...); } }✅ 运行顺序启动RabbitMQ启动MicroApplication浏览器访问plaintexthttp://localhost:8080/create/order/ORDER20260520控制台输出plaintext【库存微服务】收到订单ORDER20260520 支付完成 【库存微服务】执行商品库存扣减... 【短信微服务】收到订单ORDER20260520 支付完成 【短信微服务】执行发送下单成功短信...十二、所有项目通用运行口诀零基础必背原生 Java 项目先运行消费者再运行生产者SpringBoot 项目直接启动访问接口即可多个消费者全部先启动再发消息交换机模式消费者必须先绑定队列