业务背景
在上一节中,我们介绍了通过数据库扣减完成用户兑换优惠券的逻辑,这种方式虽然稳妥,但性能有所不足,因为主流程的操作是同步执行的,导致响应时间变长,吞吐量下降。在本章节中,我们通过引入消息队列进行异步解耦,主流程仅同步操作 Redis,后续的数据库耗时操作则交由消息队列消费者来执行,从而提升整体性能。
开发基于消息队列秒杀逻辑
1. 编写兑换优惠券 v2 接口
保持原有代码不变,我们开发一个 v2 版本的方法。前置校验部分可以直接复用 v1 版本的通用逻辑。
代码如下所示:
java">@Override
public void redeemUserCouponByMQ(CouponTemplateRedeemReqDTO requestParam) {
// 验证缓存是否存在,保障数据存在并且缓存中存在
CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplate(BeanUtil.toBean(requestParam, CouponTemplateQueryReqDTO.class));
// 验证领取的优惠券是否在活动有效时间
boolean isInTime = DateUtil.isIn(new Date(), couponTemplate.getValidStartTime(), couponTemplate.getValidEndTime());
if (!isInTime) {
// 一般来说优惠券领取时间不到的时候,前端不会放开调用请求,可以理解这是用户调用接口在“攻击”
throw new ClientException("不满足优惠券领取时间");
}
// 获取 LUA 脚本,并保存到 Hutool 的单例管理容器,下次直接获取不需要加载
DefaultRedisScript<Long> buildLuaScript = Singleton.get(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH, () -> {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(STOCK_DECREMENT_AND_SAVE_USER_RECEIVE_LUA_PATH)));
redisScript.setResultType(Long.class);
return redisScript;
});
// 验证用户是否符合优惠券领取条件
JSONObject receiveRule = JSON.parseObject(couponTemplate.getReceiveRule());
String limitPerPerson = receiveRule.getString("limitPerPerson");
// 执行 LUA 脚本进行扣减库存以及增加 Redis 用户领券记录次数
String couponTemplateCacheKey = String.format(EngineRedisConstant.COUPON_TEMPLATE_KEY, requestParam.getCouponTemplateId());
String userCouponTemplateLimitCacheKey = String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIMIT_KEY, UserContext.getUserId(), requestParam.getCouponTemplateId());
Long stockDecrementLuaResult = stringRedisTemplate.execute(
buildLuaScript,
ListUtil.of(couponTemplateCacheKey, userCouponTemplateLimitCacheKey),
String.valueOf(couponTemplate.getValidEndTime().getTime()), limitPerPerson
);
// 判断 LUA 脚本执行返回类,如果失败根据类型返回报错提示
long firstField = StockDecrementReturnCombinedUtil.extractFirstField(stockDecrementLuaResult);
if (RedisStockDecrementErrorEnum.isFail(firstField)) {
throw new ServiceException(RedisStockDecrementErrorEnum.fromType(firstField));
}
UserCouponRedeemEvent userCouponRedeemEvent = UserCouponRedeemEvent.builder()
.requestParam(requestParam)
.receiveCount((int) StockDecrementReturnCombinedUtil.extractSecondField(stockDecrementLuaResult))
.couponTemplate(couponTemplate)
.userId(UserContext.getUserId())
.build();
SendResult sendResult = userCouponRedeemProducer.sendMessage(userCouponRedeemEvent);
// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递
if (ObjectUtil.notEqual(sendResult.getSendStatus().name(), "SEND_OK")) {
log.warn("发送优惠券兑换消息失败,消息参数:{}", JSON.toJSONString(userCouponRedeemEvent));
}
}
2. 消息消费者
开发用户兑换优惠券消息消费者,并通过幂等注解避免消息重复消费。
代码如下所示:
java">@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = EngineRockerMQConstant.COUPON_TEMPLATE_REDEEM_TOPIC_KEY,
consumerGroup = EngineRockerMQConstant.COUPON_TEMPLATE_REDEEM_CG_KEY
)
@Slf4j(topic = "UserCouponRedeemConsumer")
public class UserCouponRedeemConsumer implements RocketMQListener<MessageWrapper<UserCouponRedeemEvent>> {
private final UserCouponMapper userCouponMapper;
private final CouponTemplateMapper couponTemplateMapper;
private final UserCouponDelayCloseProducer couponDelayCloseProducer;
private final StringRedisTemplate stringRedisTemplate;
@NoMQDuplicateConsume(
keyPrefix = "user-coupon-redeem:",
key = "#messageWrapper.keys",
keyTimeout = 600
)
@Transactional(rollbackFor = Exception.class)
@Override
public void onMessage(MessageWrapper<UserCouponRedeemEvent> messageWrapper) {
// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
log.info("[消费者] 用户兑换优惠券 - 执行消费逻辑,消息体:{}", JSON.toJSONString(messageWrapper));
CouponTemplateRedeemReqDTO requestParam = messageWrapper.getMessage().getRequestParam();
CouponTemplateQueryRespDTO couponTemplate = messageWrapper.getMessage().getCouponTemplate();
String userId = messageWrapper.getMessage().getUserId();
int decremented = couponTemplateMapper.decrementCouponTemplateStock(Long.parseLong(requestParam.getShopNumber()), Long.parseLong(requestParam.getCouponTemplateId()), 1L);
if (!SqlHelper.retBool(decremented)) {
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,扣减优惠券数据库库存失败,消息体:{}", JSON.toJSONString(messageWrapper));
return;
}
// 添加 Redis 用户领取的优惠券记录列表
Date now = new Date();
DateTime validEndTime = DateUtil.offsetHour(now, JSON.parseObject(couponTemplate.getConsumeRule()).getInteger("validityPeriod"));
UserCouponDO userCouponDO = UserCouponDO.builder()
.couponTemplateId(Long.parseLong(requestParam.getCouponTemplateId()))
.userId(Long.parseLong(userId))
.source(requestParam.getSource())
.receiveCount(messageWrapper.getMessage().getReceiveCount())
.status(UserCouponStatusEnum.UNUSED.getCode())
.receiveTime(now)
.validStartTime(now)
.validEndTime(validEndTime)
.build();
userCouponMapper.insert(userCouponDO);
// 添加用户领取优惠券模板缓存记录
String userCouponListCacheKey = String.format(EngineRedisConstant.USER_COUPON_TEMPLATE_LIST_KEY, UserContext.getUserId());
String userCouponItemCacheKey = StrUtil.builder()
.append(requestParam.getCouponTemplateId())
.append("_")
.append(userCouponDO.getId())
.toString();
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());
// 由于 Redis 在持久化或主从复制的极端情况下可能会出现数据丢失,而我们对指令丢失几乎无法容忍,因此我们采用经典的写后查询策略来应对这一问题
Double scored;
try {
scored = stringRedisTemplate.opsForZSet().score(userCouponListCacheKey, userCouponItemCacheKey);
// scored 为空意味着可能 Redis Cluster 主从同步丢失了数据,比如 Redis 主节点还没有同步到从节点就宕机了,解决方案就是再新增一次
if (scored == null) {
// 如果这里也新增失败了怎么办?我们大概率做不到绝对的万无一失,只能尽可能增加成功率
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());
}
} catch (Throwable ex) {
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,查询Redis用户优惠券记录为空或抛异常,可能Redis宕机或主从复制数据丢失,基础错误信息:{}", ex.getMessage());
// 如果直接抛异常大概率 Redis 宕机了,所以应该写个延时队列向 Redis 重试放入值。为了避免代码复杂性,这里直接写新增,大家知道最优解决方案即可
stringRedisTemplate.opsForZSet().add(userCouponListCacheKey, userCouponItemCacheKey, now.getTime());
}
// 发送延时消息队列,等待优惠券到期后,将优惠券信息从缓存中删除
UserCouponDelayCloseEvent userCouponDelayCloseEvent = UserCouponDelayCloseEvent.builder()
.couponTemplateId(requestParam.getCouponTemplateId())
.userCouponId(String.valueOf(userCouponDO.getId()))
.userId(userId)
.delayTime(validEndTime.getTime())
.build();
SendResult sendResult = couponDelayCloseProducer.sendMessage(userCouponDelayCloseEvent);
// 发送消息失败解决方案简单且高效的逻辑之一:打印日志并报警,通过日志搜集并重新投递
if (ObjectUtil.notEqual(sendResult.getSendStatus().name(), "SEND_OK")) {
log.warn("[消费者] 用户兑换优惠券 - 执行消费逻辑,发送优惠券关闭延时队列失败,消息参数:{}", JSON.toJSONString(userCouponDelayCloseEvent));
}
}
}
本章总结
主要就是通过消息队列进行重构,创建了新的用户优惠券兑换消费者:UserCouponRedeemConsumer和用户优惠券生产者:UserCouponRedeemProducer。并且消费者也使用了之前的Spring AOP环绕通知面向切面思想用于幂等性判断,总体流程图如下:
方案存在的问题
1. Redis 极端场景
Redis 提供了两套持久化机制,RDB 快照和 AOF 日志文件追加。
-
RDB 它会根据情况定期的 Fork 出一个子进程,生成当前数据库的全量快照。对于 RDB 快照,假如我们在 RDB 快照生成后宕机,那么会丢失快照生成期间全部增量数据,如果在连快照都没成功生成,那么就会丢掉全部数据。
-
另一个是 AOF,它通过向 AOF 日志文件追加每一条执行过的指令实现。而当我们仅开启了 AOF 时,丢失数据的多少取决于我们设置的刷盘策略:当设置为每条指令执行后都刷盘
Always
,我们最多丢失一条指令;当设置为每秒刷一次盘的Eversec
时,最多丢失一秒内的数据;当设置为非主动刷盘的No
时,则可能丢失上次刷盘后到现在的全部数据。
2. 库存扣减的几种场景
在应对于企业中不同场景的库存扣减需求,这里分析下:
-
在商品流量较低的情况下,通常不会出现大量请求同时访问单个商品进行库存扣减。此时,可以使用 Redis 进行防护,并直接同步到 MySQL 进行库存扣减,以防止商品超卖。虽然在此场景中涉及多个商品的数据扣减,可能会出现锁竞争,但竞争程度通常不会很激烈。
-
对于秒杀商品,通常会在短时间内出现大量请求同时访问单个商品进行库存扣减。为此,可以使用 Redis 进行防护,并直接将库存扣减同步到 MySQL,以防止商品超卖。由于秒杀商品的库存一般较少,因此造成的锁竞争相对可控。假设库存扣减采用串行方式,每次扣减耗时 5 毫秒,处理 100 个库存也仅需 500 毫秒。
-
某些秒杀商品的库存较多,或同时进行多个热门商品的秒杀(如直播间商品)。在这种情况下,直接扣减数据库库存会给系统带来较大压力,导致接口响应延迟。为应对这种场景,我们设计了优惠券秒杀 v2 接口。虽然基于 Redis 扣减库存和消息队列异步处理的方案可能会引发前后不一致的问题,但它能显著提升性能。此外,Redis 的持久化和主从宕机的风险相对较小。即使发生宕机,对平台或商家来说,也不会造成直接的损失。
不存在绝对的银弹。Redis 之所以能快速响应,是因为它直接与内存交互,作为缓存中间件,如果每次都为了数据一致性而与磁盘交互,那就本末倒置了。市场上的云 Redis,包括腾讯 Redis 和阿里云 Tair,它们的持久化和主从复制本质上都是异步的。