大营销项目 | {RabbitMQ在Big-Market中的应用}

RabbitMQ在大营销项目中的应用笔记,理解生产者、队列、消费者各自的作用与实现。

一、消息队列(MQ)基础概念

1.1 什么是消息队列?

消息队列(Message Queue)是一种异步通信的中间件。你可以把它想象成一个”邮局”:

  • 生产者(Producer):写信的人,把消息投到邮局
  • 队列(Queue):邮局中的信箱,暂存消息
  • 消费者(Consumer):收信的人,从信箱取出消息处理

核心特点:生产者和消费者不需要同时在线,解耦了发送方和接收方

1.2 为什么需要消息队列?

应用场景说明举例
异步处理耗时操作不阻塞主流程扣减库存后,异步通知数据库更新
解耦系统间不直接调用,降低依赖库存扣完发消息,监听者各自处理
削峰填谷突发流量先缓冲到队列,慢慢消费秒杀时大量请求入队,后端按自身节奏处理
可靠投递消息不丢失,支持重试即使消费者暂时宕机,消息仍在队列中等待

1.3 本项目使用的 MQ

本项目使用 RabbitMQ,通过 Spring Boot 的 spring-boot-starter-amqp 集成。


二、项目中 RabbitMQ 的整体架构

在本项目中,RabbitMQ 用于一个具体场景:活动 SKU 库存耗尽后,异步通知数据库清空库存

整体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
用户抽奖请求


活动责任链 → SKU库存扣减(Redis decr 原子操作)

├── 库存 > 0 → 写入 Redis 延迟队列(等待定时任务慢慢同步到DB)

└── 库存 == 0 → ★ 发送 RabbitMQ 消息 ★


MQ 消费者监听


直接清空数据库库存 + 清空 Redis 延迟队列

三、核心代码逐层解析

3.1 配置层:application-dev.yml

1
2
3
4
5
6
7
8
9
10
11
12
# Spring 配置;rabbitmq
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: admin
password: admin
listener:
simple:
prefetch: 1 # 每次投递1个消息,消费完再投递下一个
topic:
activity_sku_stock_zero: activity_sku_stock_zero # 自定义的 topic 名称

要点:

  • prefetch: 1:保证消费者一次只处理一条消息,处理完才拉取下一条,避免消费者过载
  • topic.activity_sku_stock_zero:自定义配置项,统一管理 topic 名称,生产者和消费者通过 @Value 注入同一个 topic,避免硬编码

3.2 Types层:消息基类 BaseEvent

文件:big-market-types/.../event/BaseEvent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class BaseEvent<T> {

// 构建消息体(子类实现)
public abstract EventMessage<T> buildEventMessage(T data);

// 获取 topic(子类实现)
public abstract String topic();

// 统一的消息结构
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class EventMessage<T> {
private String id; // 消息唯一ID
private Date timestamp; // 时间戳
private T data; // 消息内容(泛型,灵活承载不同业务数据)
}
}

设计意图:

  • 这是一个抽象模板类,定义了所有消息事件的通用结构
  • 每种业务消息只需继承 BaseEvent,实现 buildEventMessage()topic() 即可
  • EventMessage 是统一的消息信封,包含 id(用于幂等/追踪)、timestamp、data

3.3 领域层:具体消息事件 ActivitySkuStockZeroMessageEvent

文件:big-market-domain/.../event/ActivitySkuStockZeroMessageEvent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class ActivitySkuStockZeroMessageEvent extends BaseEvent<Long> {

@Value("${spring.rabbitmq.topic.activity_sku_stock_zero}")
private String topic;

@Override
public EventMessage<Long> buildEventMessage(Long sku) {
return EventMessage.<Long>builder()
.id(RandomStringUtils.randomNumeric(11)) // 生成11位随机数作为消息ID
.timestamp(new Date())
.data(sku) // 消息体就是 sku 编号
.build();
}

@Override
public String topic() {
return topic;
}
}

要点:

  • 泛型 <Long> 表示这个事件的消息体是 Long 类型(即 SKU 编号)
  • @Value 注入 topic 名称,保证与配置文件一致
  • @Component 注册为 Spring Bean,供其他类注入使用

3.4 基础设施层:消息发布者 EventPublisher(生产者)

文件:big-market-infrastructure/.../event/EventPublisher.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component
public class EventPublisher {

@Autowired
private RabbitTemplate rabbitTemplate; // Spring AMQP 提供的消息发送模板

public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage); // 序列化为 JSON
rabbitTemplate.convertAndSend(topic, messageJson); // 发送到指定 topic
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
}

要点:

  • RabbitTemplate 是 Spring AMQP 封装的核心类,用于发送消息到 RabbitMQ
  • convertAndSend(topic, messageJson):将 JSON 字符串发送到名为 topic 的队列
  • 这是一个通用的消息发送器,任何业务消息都可以通过它发送

3.5 触发层:消息监听者 ActivitySkuStockZeroCustomer(消费者)

文件:big-market-trigger/.../listener/ActivitySkuStockZeroCustomer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Component
public class ActivitySkuStockZeroCustomer {

@Value("${spring.rabbitmq.topic.activity_sku_stock_zero}")
private String topic;

@Resource
private ISkuStock skuStock;

@RabbitListener(queuesToDeclare = @Queue(value = "activity_sku_stock_zero"))
public void listener(String message) {
try {
log.info("监听活动sku库存消耗为0消息 topic: {} message: {}", topic, message);
// 1. 转换对象:JSON → EventMessage<Long>
BaseEvent.EventMessage<Long> eventMessage = JSON.parseObject(message,
new TypeReference<BaseEvent.EventMessage<Long>>() {}.getType());
Long sku = eventMessage.getData();
// 2. 更新数据库:将 DB 中该 sku 的库存直接设为 0
skuStock.clearActivitySkuStock(sku);
// 3. 清空 Redis 延迟队列(库存都为0了,不再需要延迟更新)
skuStock.clearQueueValue();
} catch (Exception e) {
log.error("监听活动sku库存消耗为0消息,消费失败 topic: {} message: {}", topic, message);
throw e; // 抛出异常 → RabbitMQ 会重试投递
}
}
}

要点:

  • @RabbitListener(queuesToDeclare = @Queue(value = "activity_sku_stock_zero"))

  • 声明监听的队列名为 activity_sku_stock_zero

  • queuesToDeclare 表示如果队列不存在则自动创建

  • 消费逻辑:收到”库存为0”的消息后,立刻清空数据库库存 + 清空延迟队列

  • throw e:抛出异常后 RabbitMQ 会自动重试,保证消息不丢失


四、消息发送的触发点:库存扣减流程

4.1 责任链中触发库存扣减

文件:ActivitySkuStockActionChain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public boolean action(ActivitySkuEntity activitySkuEntity, ActivityEntity activityEntity,
ActivityCountEntity activityCountEntity) {
// 1. 通过 Redis decr 扣减库存
boolean status = activityDispatch.subtractionActivitySkuStock(
activitySkuEntity.getSku(), activityEntity.getEndDateTime());

if (status) {
// 2. 扣减成功 → 写入 Redis 延迟队列(等定时任务慢慢同步到 DB)
activityRepository.activitySkuStockConsumeSendQueue(
ActivitySkuStockKeyVO.builder()
.sku(activitySkuEntity.getSku())
.activityId(activityEntity.getActivityId())
.build());
return true;
}

throw new AppException(ResponseCode.ACTIVITY_SKU_STOCK_ERROR.getCode(), ...);
}

4.2 实际的库存扣减逻辑(发送 MQ 消息的地方)

文件:ActivityRepository.javasubtractionActivitySkuStock 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public boolean subtractionActivitySkuStock(Long sku, String cacheKey, Date endDateTime) {
long surplus = redisService.decr(cacheKey); // ★ Redis 原子递减

if (surplus == 0) {
// ★★★ 库存刚好扣减为0 → 发送 MQ 消息通知清空数据库 ★★★
eventPublisher.publish(
activitySkuStockZeroMessageEvent.topic(),
activitySkuStockZeroMessageEvent.buildEventMessage(sku)
);
return false;
} else if (surplus < 0) {
// 超卖了,恢复为0
redisService.setAtomicLong(cacheKey, 0);
return false;
}

// 库存 > 0,加锁防止库存恢复后超卖
String lockKey = cacheKey + Constants.UNDERLINE + surplus;
long expireMillis = endDateTime.getTime() - System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
Boolean lock = redisService.setNx(lockKey, expireMillis, TimeUnit.MILLISECONDS);
if (!lock) {
log.info("活动sku库存加锁失败 {}", lockKey);
}
return lock;
}

这里就是 MQ 消息的触发源头! 当 Redis decr 操作后发现库存为 0,立即通过 EventPublisher 发送一条消息到 RabbitMQ。


五、定时任务:延迟队列的消费者

文件:UpdateActivitySkuStockJob.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
public class UpdateActivitySkuStockJob {

@Resource
private ISkuStock skuStock;

@Scheduled(cron = "0/5 * * * * ?") // 每5秒执行一次
public void exec() {
try {
// 从 Redis 延迟队列中获取待更新的 sku 信息
ActivitySkuStockKeyVO activitySkuStockKeyVO = skuStock.takeQueueValue();
if (null == activitySkuStockKeyVO) return;
// 更新数据库中的库存(逐条递减,而非批量)
skuStock.updateActivitySkuStock(activitySkuStockKeyVO.getSku());
} catch (Exception e) {
log.error("定时任务,更新活动sku库存失败", e);
}
}
}

定时任务 vs MQ 消费者的分工:

角色触发条件职责
定时任务 UpdateActivitySkuStockJob每5秒执行从 Redis 延迟队列取数据,逐步同步库存到 DB(渐进式更新)
MQ 消费者 ActivitySkuStockZeroCustomer库存为0时触发一步到位,直接清空 DB 库存 + 清空延迟队列

六、完整数据流图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
                ┌────────────────────────────┐
│ 用户抽奖请求 │
└──────────┬─────────────────┘


┌────────────────────────────┐
│ 活动责任链校验 │
│ ActivitySkuStockActionChain│
└──────────┬─────────────────┘


┌────────────────────────────┐
│ Redis decr 原子扣减库存 │
│ ActivityRepository │
.subtractionActivitySkuStock
└──────┬───────────┬─────────┘
│ │
surplus > 0│ │surplus == 0
│ │
┌──────────▼──┐ ┌────▼─────────────────┐
setNx 加锁 │ │ 发送 MQ 消息 │
│ 防止超卖 │ │ EventPublisher.publish│
└──────┬──────┘ └────┬─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────────────────────┐
│ Redis 延迟队列 │ │ RabbitMQ │
│ DelayedQueue │ │ activity_sku_stock_zero │
└──────┬──────────┘ └──────┬───────────────────┘
│ │
▼ ▼
┌────────────────────┐ ┌──────────────────────────────┐
│ 定时任务(每5秒) │ │ MQ 消费者 │
│ UpdateActivity │ │ ActivitySkuStockZeroCustomer │
│ SkuStockJob │ │ │
│ │ │ ① clearActivitySkuStock(sku)
│ updateActivitySku │ │ ② clearQueueValue()
│ Stock(sku) │ │ │
(DB: stock - 1) │ │ (DB: stock = 0, 清空延迟队列)
└────────────────────┘ └──────────────────────────────┘

七、各层代码职责速查表

层次类名所属模块职责
基础类型BaseEvent<T>big-market-types消息事件抽象基类,定义消息结构
领域事件ActivitySkuStockZeroMessageEventbig-market-domain具体的”库存清零”消息事件,构建消息体
消息发布EventPublisherbig-market-infrastructure通用消息发送器,封装 RabbitTemplate
仓储实现ActivityRepositorybig-market-infrastructure库存扣减逻辑 + 触发 MQ 发送
消息消费ActivitySkuStockZeroCustomerbig-market-trigger监听 MQ 消息,执行数据库清零
定时任务UpdateActivitySkuStockJobbig-market-trigger每5秒从延迟队列取数据,渐进更新 DB
领域服务RaffleActivityServicebig-market-domain实现 ISkuStock 接口,桥接仓储方法
活动装配ActivityArmorybig-market-domain预热 SKU 库存到 Redis 缓存
责任链节点ActivitySkuStockActionChainbig-market-domain责任链中执行库存扣减 + 写延迟队列

八、为什么要用 MQ 而不直接操作数据库?

8.1 直接操作数据库的问题

1
2
3
用户A 抽奖 → 锁表 → UPDATE stock = stock - 1 → 释放锁
用户B 抽奖 → 等待锁... → 锁表 → UPDATE stock = stock - 1 → 释放锁
用户C 抽奖 → 等待锁... → 等待锁......
  • 大量线程阻塞等待数据库锁,每个线程持有一个数据库连接
  • 数据库连接是有限资源(通常几十到几百个),很快耗尽
  • 后续请求全部排队,响应时间飙升到分钟级 → 系统崩溃

8.2 项目的解决方案

1
2
3
4
5
6
7
用户A 抽奖 → Redis decr (原子操作,纳秒级) → 立即返回结果
用户B 抽奖 → Redis decr (原子操作,纳秒级) → 立即返回结果
...

后台异步:
- 定时任务每5秒 → 从延迟队列取1条 → 更新 DB(无竞争)
- 库存归0时 → MQ 消息 → 消费者直接清空 DB 库存

MQ 在这里的核心作用:

  1. 解耦:库存扣减(Redis)和数据库更新(DB)完全分离
  2. 异步:用户请求不等待 DB 操作,瞬间返回
  3. 最终一致性:Redis 是实时的,DB 是最终一致的,通过 MQ + 定时任务保证数据最终同步

九、Docker 环境配置

文件:docs/dev-ops/docker-compose-environment.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
rabbitmq:
image: rabbitmq:3.12.9
container_name: rabbitmq
restart: always
ports:
- "5672:5672" # AMQP 协议端口(程序连接用)
- "15672:15672" # 管理界面端口(浏览器访问用)
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
command: rabbitmq-server
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins # 启用管理插件

启动后:

  • 程序连接地址:127.0.0.1:5672
  • 管理界面:浏览器访问 http://127.0.0.1:15672,账号 admin/admin

十、关键注解速查

注解作用
@RabbitListener标注一个方法为 MQ 消息消费者
queuesToDeclare = @Queue(value = "xxx")自动声明队列,队列不存在时自动创建
@Value("${spring.rabbitmq.topic.xxx}")从配置文件注入 topic 名称
@Scheduled(cron = "0/5 * * * * ?")定时任务,每5秒执行一次

十一、总结

本项目中 RabbitMQ 的使用是一个经典的”库存扣减异步通知”场景

  1. 高性能:使用 Redis decr 原子操作处理实时库存扣减,无锁高效
  2. 渐进同步:每次扣减后将信息写入 Redis 延迟队列,定时任务缓慢同步到 DB,降低 DB 压力
  3. 兜底清零:当库存耗尽时,通过 RabbitMQ 消息即时通知,消费者直接清空 DB 库存,并清空延迟队列(因为已经没有意义了)

一句话概括:Redis 负责”快”,MQ 负责”通知”,定时任务负责”慢慢搬”,三者协同保证了高并发下库存操作的高性能与数据一致性。


大营销项目 | {RabbitMQ在Big-Market中的应用}
http://paopaotangzu.xyz/cn/big_market_rabbit_mq/
作者
PROTON TANG
发布于
2026年3月2日
许可协议