RabbitMQ在大营销项目中的应用笔记,理解生产者、队列、消费者各自的作用与实现。
一、消息队列(MQ)基础概念
1.1 什么是消息队列?
消息队列(Message Queue)是一种异步通信的中间件。你可以把它想象成一个”邮局”:
- 生产者(Producer):写信的人,把消息投到邮局
- 队列(Queue):邮局中的信箱,暂存消息
- 消费者(Consumer):收信的人,从信箱取出消息处理
核心特点:生产者和消费者不需要同时在线,解耦了发送方和接收方。
1.2 为什么需要消息队列?
| 应用场景 | 说明 | 举例 |
|---|
| 异步处理 | 耗时操作不阻塞主流程 | 扣减库存后,异步通知数据库更新 |
| 解耦 | 系统间不直接调用,降低依赖 | 库存扣完发消息,监听者各自处理 |
| 削峰填谷 | 突发流量先缓冲到队列,慢慢消费 | 秒杀时大量请求入队,后端按自身节奏处理 |
| 可靠投递 | 消息不丢失,支持重试 | 即使消费者暂时宕机,消息仍在队列中等待 |
1.3 本项目使用的 MQ
本项目使用 RabbitMQ,通过 Spring Boot 的 spring-boot-starter-amqp 集成。
二、项目中 RabbitMQ 的整体架构
在本项目中,RabbitMQ 用于一个具体场景:活动 SKU 库存耗尽后,异步通知数据库清空库存。
整体流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 用户抽奖请求 │ ▼ 活动责任链 → SKU库存扣减(Redis decr 原子操作) │ ├── 库存 > 0 → 写入 Redis 延迟队列(等待定时任务慢慢同步到DB) │ └── 库存 == 0 → ★ 发送 RabbitMQ 消息 ★ │ ▼ MQ 消费者监听 │ ▼ 直接清空数据库库存 + 清空 Redis 延迟队列
|
三、核心代码逐层解析
3.1 配置层:application-dev.yml
1 2 3 4 5 6 7 8 9 10 11 12
| spring: rabbitmq: addresses: 127.0.0.1 port: 5672 username: admin password: admin listener: simple: prefetch: 1 topic: activity_sku_stock_zero: activity_sku_stock_zero
|
要点:
prefetch: 1:保证消费者一次只处理一条消息,处理完才拉取下一条,避免消费者过载topic.activity_sku_stock_zero:自定义配置项,统一管理 topic 名称,生产者和消费者通过 @Value 注入同一个 topic,避免硬编码
3.2 Types层:消息基类 BaseEvent
文件:big-market-types/.../event/BaseEvent.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public abstract class BaseEvent<T> {
public abstract EventMessage<T> buildEventMessage(T data);
public abstract String topic();
@Data @Builder @AllArgsConstructor @NoArgsConstructor public static class EventMessage<T> { private String id; private Date timestamp; private T data; } }
|
设计意图:
- 这是一个抽象模板类,定义了所有消息事件的通用结构
- 每种业务消息只需继承
BaseEvent,实现 buildEventMessage() 和 topic() 即可 EventMessage 是统一的消息信封,包含 id(用于幂等/追踪)、timestamp、data
3.3 领域层:具体消息事件 ActivitySkuStockZeroMessageEvent
文件:big-market-domain/.../event/ActivitySkuStockZeroMessageEvent.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class ActivitySkuStockZeroMessageEvent extends BaseEvent<Long> {
@Value("${spring.rabbitmq.topic.activity_sku_stock_zero}") private String topic;
@Override public EventMessage<Long> buildEventMessage(Long sku) { return EventMessage.<Long>builder() .id(RandomStringUtils.randomNumeric(11)) .timestamp(new Date()) .data(sku) .build(); }
@Override public String topic() { return topic; } }
|
要点:
- 泛型
<Long> 表示这个事件的消息体是 Long 类型(即 SKU 编号) @Value 注入 topic 名称,保证与配置文件一致@Component 注册为 Spring Bean,供其他类注入使用
3.4 基础设施层:消息发布者 EventPublisher(生产者)
文件:big-market-infrastructure/.../event/EventPublisher.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component public class EventPublisher {
@Autowired private RabbitTemplate rabbitTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) { try { String messageJson = JSON.toJSONString(eventMessage); rabbitTemplate.convertAndSend(topic, messageJson); log.info("发送MQ消息 topic:{} message:{}", topic, messageJson); } catch (Exception e) { log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e); throw e; } } }
|
要点:
RabbitTemplate 是 Spring AMQP 封装的核心类,用于发送消息到 RabbitMQconvertAndSend(topic, messageJson):将 JSON 字符串发送到名为 topic 的队列- 这是一个通用的消息发送器,任何业务消息都可以通过它发送
3.5 触发层:消息监听者 ActivitySkuStockZeroCustomer(消费者)
文件:big-market-trigger/.../listener/ActivitySkuStockZeroCustomer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Slf4j @Component public class ActivitySkuStockZeroCustomer {
@Value("${spring.rabbitmq.topic.activity_sku_stock_zero}") private String topic;
@Resource private ISkuStock skuStock;
@RabbitListener(queuesToDeclare = @Queue(value = "activity_sku_stock_zero")) public void listener(String message) { try { log.info("监听活动sku库存消耗为0消息 topic: {} message: {}", topic, message); BaseEvent.EventMessage<Long> eventMessage = JSON.parseObject(message, new TypeReference<BaseEvent.EventMessage<Long>>() {}.getType()); Long sku = eventMessage.getData(); skuStock.clearActivitySkuStock(sku); skuStock.clearQueueValue(); } catch (Exception e) { log.error("监听活动sku库存消耗为0消息,消费失败 topic: {} message: {}", topic, message); throw e; } } }
|
要点:
@RabbitListener(queuesToDeclare = @Queue(value = "activity_sku_stock_zero"))
声明监听的队列名为 activity_sku_stock_zero
queuesToDeclare 表示如果队列不存在则自动创建
消费逻辑:收到”库存为0”的消息后,立刻清空数据库库存 + 清空延迟队列
throw e:抛出异常后 RabbitMQ 会自动重试,保证消息不丢失
四、消息发送的触发点:库存扣减流程
4.1 责任链中触发库存扣减
文件:ActivitySkuStockActionChain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Override public boolean action(ActivitySkuEntity activitySkuEntity, ActivityEntity activityEntity, ActivityCountEntity activityCountEntity) { boolean status = activityDispatch.subtractionActivitySkuStock( activitySkuEntity.getSku(), activityEntity.getEndDateTime());
if (status) { activityRepository.activitySkuStockConsumeSendQueue( ActivitySkuStockKeyVO.builder() .sku(activitySkuEntity.getSku()) .activityId(activityEntity.getActivityId()) .build()); return true; }
throw new AppException(ResponseCode.ACTIVITY_SKU_STOCK_ERROR.getCode(), ...); }
|
4.2 实际的库存扣减逻辑(发送 MQ 消息的地方)
文件:ActivityRepository.java → subtractionActivitySkuStock 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public boolean subtractionActivitySkuStock(Long sku, String cacheKey, Date endDateTime) { long surplus = redisService.decr(cacheKey);
if (surplus == 0) { eventPublisher.publish( activitySkuStockZeroMessageEvent.topic(), activitySkuStockZeroMessageEvent.buildEventMessage(sku) ); return false; } else if (surplus < 0) { redisService.setAtomicLong(cacheKey, 0); return false; }
String lockKey = cacheKey + Constants.UNDERLINE + surplus; long expireMillis = endDateTime.getTime() - System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1); Boolean lock = redisService.setNx(lockKey, expireMillis, TimeUnit.MILLISECONDS); if (!lock) { log.info("活动sku库存加锁失败 {}", lockKey); } return lock; }
|
这里就是 MQ 消息的触发源头! 当 Redis decr 操作后发现库存为 0,立即通过 EventPublisher 发送一条消息到 RabbitMQ。
五、定时任务:延迟队列的消费者
文件:UpdateActivitySkuStockJob.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j @Component public class UpdateActivitySkuStockJob {
@Resource private ISkuStock skuStock;
@Scheduled(cron = "0/5 * * * * ?") public void exec() { try { ActivitySkuStockKeyVO activitySkuStockKeyVO = skuStock.takeQueueValue(); if (null == activitySkuStockKeyVO) return; skuStock.updateActivitySkuStock(activitySkuStockKeyVO.getSku()); } catch (Exception e) { log.error("定时任务,更新活动sku库存失败", e); } } }
|
定时任务 vs MQ 消费者的分工:
| 角色 | 触发条件 | 职责 |
|---|
定时任务 UpdateActivitySkuStockJob | 每5秒执行 | 从 Redis 延迟队列取数据,逐步同步库存到 DB(渐进式更新) |
MQ 消费者 ActivitySkuStockZeroCustomer | 库存为0时触发 | 一步到位,直接清空 DB 库存 + 清空延迟队列 |
六、完整数据流图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ┌────────────────────────────┐ │ 用户抽奖请求 │ └──────────┬─────────────────┘ │ ▼ ┌────────────────────────────┐ │ 活动责任链校验 │ │ ActivitySkuStockActionChain│ └──────────┬─────────────────┘ │ ▼ ┌────────────────────────────┐ │ Redis decr 原子扣减库存 │ │ ActivityRepository │ │ .subtractionActivitySkuStock│ └──────┬───────────┬─────────┘ │ │ surplus > 0│ │surplus == 0 │ │ ┌──────────▼──┐ ┌────▼─────────────────┐ │ setNx 加锁 │ │ 发送 MQ 消息 │ │ 防止超卖 │ │ EventPublisher.publish│ └──────┬──────┘ └────┬─────────────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌──────────────────────────┐ │ Redis 延迟队列 │ │ RabbitMQ │ │ DelayedQueue │ │ activity_sku_stock_zero │ └──────┬──────────┘ └──────┬───────────────────┘ │ │ ▼ ▼ ┌────────────────────┐ ┌──────────────────────────────┐ │ 定时任务(每5秒) │ │ MQ 消费者 │ │ UpdateActivity │ │ ActivitySkuStockZeroCustomer │ │ SkuStockJob │ │ │ │ │ │ ① clearActivitySkuStock(sku) │ │ updateActivitySku │ │ ② clearQueueValue() │ │ Stock(sku) │ │ │ │ (DB: stock - 1) │ │ (DB: stock = 0, 清空延迟队列) │ └────────────────────┘ └──────────────────────────────┘
|
七、各层代码职责速查表
| 层次 | 类名 | 所属模块 | 职责 |
|---|
| 基础类型 | BaseEvent<T> | big-market-types | 消息事件抽象基类,定义消息结构 |
| 领域事件 | ActivitySkuStockZeroMessageEvent | big-market-domain | 具体的”库存清零”消息事件,构建消息体 |
| 消息发布 | EventPublisher | big-market-infrastructure | 通用消息发送器,封装 RabbitTemplate |
| 仓储实现 | ActivityRepository | big-market-infrastructure | 库存扣减逻辑 + 触发 MQ 发送 |
| 消息消费 | ActivitySkuStockZeroCustomer | big-market-trigger | 监听 MQ 消息,执行数据库清零 |
| 定时任务 | UpdateActivitySkuStockJob | big-market-trigger | 每5秒从延迟队列取数据,渐进更新 DB |
| 领域服务 | RaffleActivityService | big-market-domain | 实现 ISkuStock 接口,桥接仓储方法 |
| 活动装配 | ActivityArmory | big-market-domain | 预热 SKU 库存到 Redis 缓存 |
| 责任链节点 | ActivitySkuStockActionChain | big-market-domain | 责任链中执行库存扣减 + 写延迟队列 |
八、为什么要用 MQ 而不直接操作数据库?
8.1 直接操作数据库的问题
1 2 3
| 用户A 抽奖 → 锁表 → UPDATE stock = stock - 1 → 释放锁 用户B 抽奖 → 等待锁... → 锁表 → UPDATE stock = stock - 1 → 释放锁 用户C 抽奖 → 等待锁... → 等待锁... → ...
|
- 大量线程阻塞等待数据库锁,每个线程持有一个数据库连接
- 数据库连接是有限资源(通常几十到几百个),很快耗尽
- 后续请求全部排队,响应时间飙升到分钟级 → 系统崩溃
8.2 项目的解决方案
1 2 3 4 5 6 7
| 用户A 抽奖 → Redis decr (原子操作,纳秒级) → 立即返回结果 用户B 抽奖 → Redis decr (原子操作,纳秒级) → 立即返回结果 ...
后台异步: - 定时任务每5秒 → 从延迟队列取1条 → 更新 DB(无竞争) - 库存归0时 → MQ 消息 → 消费者直接清空 DB 库存
|
MQ 在这里的核心作用:
- 解耦:库存扣减(Redis)和数据库更新(DB)完全分离
- 异步:用户请求不等待 DB 操作,瞬间返回
- 最终一致性:Redis 是实时的,DB 是最终一致的,通过 MQ + 定时任务保证数据最终同步
九、Docker 环境配置
文件:docs/dev-ops/docker-compose-environment.yml
1 2 3 4 5 6 7 8 9 10 11 12 13
| rabbitmq: image: rabbitmq:3.12.9 container_name: rabbitmq restart: always ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin command: rabbitmq-server volumes: - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
|
启动后:
- 程序连接地址:
127.0.0.1:5672 - 管理界面:浏览器访问
http://127.0.0.1:15672,账号 admin/admin
十、关键注解速查
| 注解 | 作用 |
|---|
@RabbitListener | 标注一个方法为 MQ 消息消费者 |
queuesToDeclare = @Queue(value = "xxx") | 自动声明队列,队列不存在时自动创建 |
@Value("${spring.rabbitmq.topic.xxx}") | 从配置文件注入 topic 名称 |
@Scheduled(cron = "0/5 * * * * ?") | 定时任务,每5秒执行一次 |
十一、总结
本项目中 RabbitMQ 的使用是一个经典的”库存扣减异步通知”场景:
- 高性能:使用 Redis
decr 原子操作处理实时库存扣减,无锁高效 - 渐进同步:每次扣减后将信息写入 Redis 延迟队列,定时任务缓慢同步到 DB,降低 DB 压力
- 兜底清零:当库存耗尽时,通过 RabbitMQ 消息即时通知,消费者直接清空 DB 库存,并清空延迟队列(因为已经没有意义了)
一句话概括:Redis 负责”快”,MQ 负责”通知”,定时任务负责”慢慢搬”,三者协同保证了高并发下库存操作的高性能与数据一致性。