【Redis实战】Redis在分布式系统下秒杀业务中的应用
前后端项目
前后端项目基于之前的项目,详细看这里:
添加数据表
新增三张数据库表,优惠券表、特价优惠券表、订单表,其中优惠券表与特价优惠券表之间有关联,分别如下:
tb_voucher
:优惠券的基本信息,优惠金额、使用规则等。
CREATE TABLE IF NOT EXISTS `tb_voucher` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`shop_id` bigint(20) unsigned DEFAULT NULL COMMENT '商铺id',
`title` varchar(255) NOT NULL COMMENT '代金券标题',
`sub_title` varchar(255) DEFAULT NULL COMMENT '副标题',
`rules` varchar(1024) DEFAULT NULL COMMENT '使用规则',
`pay_value` bigint(10) unsigned NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',
`actual_value` bigint(10) NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',
`type` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '0,普通券;1,秒杀券',
`status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '1,上架; 2,下架; 3,过期',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;
:优惠券的库存、开始抢购时间,结束抢购时间。
CREATE TABLE IF NOT EXISTS `tb_seckill_voucher` (
`voucher_id` bigint(20) unsigned NOT NULL COMMENT '关联的优惠券的id',
`stock` int(8) NOT NULL COMMENT '库存',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='秒杀优惠券表,与优惠券是一对一关系';
tb_voucher_order
:订单信息表。
CREATE TABLE IF NOT EXISTS `tb_voucher_order` (
`id` bigint(20) NOT NULL COMMENT '主键',
`user_id` bigint(20) unsigned NOT NULL COMMENT '下单的用户id',
`voucher_id` bigint(20) unsigned NOT NULL COMMENT '购买的代金券id',
`pay_type` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',
`status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
`pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
`use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
`refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;
使用MP代码生成器完成三张表对应的三层代码,其中,需要修改Voucher实体类,添加以下属性。
/**
* 库存
*/
@TableField(exist = false)
private Integer stock;
/**
* 生效时间
*/
@TableField(exist = false)
private LocalDateTime beginTime;
/**
* 失效时间
*/
@TableField(exist = false)
private LocalDateTime endTime;
// VoucherController.java
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
// VoucherServiceImpl.java
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
调用接口添加秒杀券
// 发送的json数据模板
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一至周五可用",
"rules": "全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
"payValue": 8000,
"actualValue": "10000",
"type": "1",
"status": "1",
"beginTime": "2022-10-27T21:05:06.950",
"endTime": "2022-11-27T21:05:06.950",
"stock": 100
}
// VoucherOrderController.java
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@PostMapping("/seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return Result.fail("功能待开发...");
}
}
1. 分布式系统全局唯一ID
为什么需要全局唯一ID?
在说秒杀业务之前,我们应该首先了解一个概念,分布式系统中的全局唯一ID。因为在秒杀业务进行时,会很容易就产生大数据量的订单,此时如果还是用基于数据库的ID自增的话,由于单表ID数据量的一个限制,很容易会出现问题,除此之外,因为订单会牵扯到一些敏感数据,所以还要考虑做到安全性,不能被用户轻易就发现其中的规律等,比如以下场景。
场景分析一:如果订单ID具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,MySQL的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
所以,我们需要去有一个全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
-
唯一性
-
递增性
-
安全性
-
高可用
-
高性能
如何实现全局唯一ID?
实现分布式全局唯一ID的方式有很多,比如UUID、Redis自增、snowflake算法、数据库自增等,几种方式各有优缺点,在这里呢,我们肯定是选择基于Redis
的方式实现一个全局ID生成器。
同样需要参考需要保证以上几个基本特性,首先对于高可用来说,由于Redis中的主从复制、哨兵机制以及天然支持的集群模式的特点,可以保证一定程度的高可用;其次是高性能,因为Redis是一款内存级别的缓存数据库,还有其他的特点,本身就以性能好著称,所以对于高性能方面也可以轻松保证;然后就是递增性和唯一性,虽然Redis中对于Key有一种incr的自增命令,但是出于安全性考虑,这里就需要我们去想出一种生成策略来保证这两点,可以使用时间戳+时间日期+计数器生成的序号进行拼接来完成;当以上四个特性保证后,安全性也就可以做到了。
ID的组成部分:
-
符号位:1bit,永远为0
-
时间戳:31bit,以秒为单位,可以使用69年
-
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
基于Redis代码实现全局唯一ID器
/** * @Classname RedisIdWorker * @Description 基于Redis代码实现全局唯一ID器 * @Date 2022/11/1 9:03 * @Created by YJS * @WebSite www.imyjs.cn */ @Component // 作为一个bean加载到容器,方便各种Service的后期的使用 public class RedisIdWorker { // 自定义开始时间戳 《确保安全》 private static final Long BEGIN_TIMESTAMP = 1667293533L; // 注入 StringRedisTemplate 操作Redis @Resource private StringRedisTemplate stringRedisTemplate; // 时间戳左移位数 用于与自增值进行或运算合并 private static final int BIT_COUNTS = 32; /** * 成的全局唯一ID * @param keyPrefix Redis中Key的前缀 主要用于区分不同的服务 * @return 生成的全局唯一ID */ public Long nextId(String keyPrefix){ // 1. 生成当前时间戳 LocalDateTime now = LocalDateTime.now(); // 使用当前时间戳减去自定义开始时间戳 《确保安全》 long timestamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP; // 2. 获取当前日期用于做KEY String date = now.format(DateTimeFormatter.ofPattern(":yyyy:MM:dd")); // 3. 拼接KEY 并使用 Redis 自增长操作,值从1开始,后期还可以分时段获取当年、当月、当日的订单量 Long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + date); assert count != null; // 4.返回生成的ID 取时间戳的后32位 与 序列值进行按位与操作 return timestamp << BIT_COUNTS | count; } }
测试生成器的使用
// 注入基于Redis生成的全局唯一ID器
@Autowired
private RedisIdWorker redisIdWorker;
// 用于测试模拟生成全局唯一ID的线程池
private static final ExecutorService EXECUTORSERVICE = Executors.newFixedThreadPool(100);
@Test
public void testRedisIdWorker() throws InterruptedException {
// 开始计时
long beginTime = System.currentTimeMillis();
/*
* 由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,
* 然后我们期望的是分线程全部走完之后,主线程再走,
* 所以我们此时需要使用到`CountDownLatch`。
*/
// 这里一个300个线程,所以count为300 每一个线程执行完任务进行countDown减一
CountDownLatch countDownLatch = new CountDownLatch(300);
// 定义一个线程任务用于操作全局唯一ID器生成ID并打印到控制台
Runnable task = (() -> {
// 一个线程任务测试生成100个ID
for (int i = 0; i < 100; i++) {
System.out.println(redisIdWorker.nextId("order"));
}
//
countDownLatch.countDown();
});
// 提交任务到线程池执行
for (int i = 0; i < 300; i++) {
// 模拟300个线程执行任务 共计生成30000个ID
EXECUTORSERVICE.submit(task);
}
// 线程全部启动后,countDownLatch在这里进行await,等待全部线程的全部任务执行完后,再往下执行主线程。
// 注意避坑:CountDownLatch类的是`await()`方法!!!不是`wait()`!!!否则会报异常!!!
countDownLatch.await();
// 结束计时
long endTime = System.currentTimeMillis();
// 打印耗时
System.out.println("生成30000个ID共计用时:" + (endTime - beginTime));
}
由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch
。
CountDownLatch
CountDownLatch
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
CountDownLatch
用给定的计数初始化。 方法阻塞,直到由于方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await
调用立即返回。 这是一个一次性的现象 - 计数无法重置。
CountDownLatch
是一种通用的同步工具,可用于多种用途。 一个CountDownLatch
为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用在门口等待,直到被调用的线程打开。 一个CountDownLatch
初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。
CountDownLatch
一个有用的属性是,它不要求调用countDown
线程等待计数到达零之前继续,它只是阻止任何线程通过 ,直到所有线程可以通过。
CountDownLatch 中有两个最重要的方法:countDown()
、await()
。
await()
方法!!!不是wait()
!!!否则会报异常!!!await方法:如果当前计数为零,则此方法立即返回。
如果当前计数大于零,则当前线程将被禁用以进行线程调度,并处于休眠状态,直至发生两件事情之一:
-
由于
countDown()
方法的调用,计数达到零; -
要么一些其他线程interrupts当前线程。
如果当前线程:
-
在进入该方法时设置了中断状态;
-
要么是interrupted等待,
然后InterruptedException
被关上,当前线程的中断状态被清除。
await
方法是阻塞方法,我们担心分线程没有执行完时,main线程就先执行结束,所以使用await
可以让main
线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。秒杀基础代码实现
秒杀下单核心逻辑分析:
-
当用户开始进行下单,携带请求秒杀券ID发送至后端
-
后端根据请求秒杀的优惠券ID去查询数据库,判断请求优惠券是否存在
-
如果请求优惠券不存在,直接返回非法请求!
-
如果请求优惠券存在,判断秒杀是否开始以及是否结束
-
未开始或者已结束直接返回错误信息
-
已开始且未结束,则判断对应优惠券是否有库存
-
库存不足,直接返回错误信息
-
库存充足,开始扣减库存,新增订单
-
首先获取全局唯一ID
-
创建新增订单:设置订单ID为全局唯一ID、设置抢购用户ID、设置优惠券ID
-
保存订单至数据库
-
返回订单号
-
-
-
-
代码实现:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private IVoucherOrderService voucherOrderService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckill(Long voucherId) {
// 根据请求秒杀的优惠券ID去查询数据库
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 请求优惠券不存在,直接返回非法请求!
if (Objects.isNull(seckillVoucher)){
return Result.fail("非法请求!");
}
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 请求优惠券存在,判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(now)){
// 未开始直接返回
return Result.fail("秒杀还未开始!");
}
// 请求优惠券存在,判断秒杀是否已经结束
if (seckillVoucher.getEndTime().isBefore(now)){
// 已经结束直接返回
return Result.fail("秒杀已经结束!");
}
// 请求优惠券存在且在秒杀时间范围内,判断是否有库存
if (seckillVoucher.getStock() < 1){
// 库存不足,直接返回
return Result.fail("已经被抢光了...");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存
// 开始扣减库存,新增订单
boolean updateResult = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId)
.update();
// 如果扣减库存异常直接返回
if (!updateResult) {
return Result.fail("更新库存异常!");
}
// 如果扣减库存正常,开始生成订单
// 首先获取全局唯一ID
long orderId = redisIdWorker.nextId("voucher");
// 创建新增订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单ID为全局唯一ID
voucherOrder.setId(orderId);
// 设置抢购用户ID
voucherOrder.setUserId(UserHolder.getUser().getId());
// 设置优惠券ID
voucherOrder.setVoucherId(voucherId);
// 保存订单至数据库
voucherOrderService.save(voucherOrder);
// 返回结果
return Result.ok(orderId);
}
}
测试并分析超卖问题
问题说明
通过使用 jmeter 测试工具模拟秒杀业务的实现,创建 200 个请求同时去访问秒杀接口模拟秒杀下单,期待正常结果应该为秒杀券库存为零,创建一百个订单,然而结果发现数据库中秒杀券的库存量余额竟然成了负数,订单数量超过了100,也就是发生了超卖问题。
问题分析
通过观察源代码发现在检查库存是否不足时,是这样写的:
// 请求优惠券存在且在秒杀时间范围内,判断是否有库存
if (seckillVoucher.getStock() < 1){
// 库存不足,直接返回
return Result.fail("已经被抢光了...");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存
// 开始扣减库存,新增订单
boolean updateResult = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId)
.update();
// 如果扣减库存异常直接返回
if (!updateResult) {
return Result.fail("更新库存异常!");
}
可以发现在检查库存和扣减库存时是两个单独的操作,并不是原子操作,也就是说有可能在同一瞬间若干个请求去检查库存时,因为都还没来的及扣减库存,导致查询库存都是充足的,然后这样若干个请求后续都去扣减了库存,导致了超卖问题。
问题解决
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:
-
悲观锁:悲观锁在操作数据时比较悲观,认为别人会同时修改数据。悲观锁可以实现对于数据的串行化执行,比如使用
synchronized
、lock
都是悲观锁的代表。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据。 -
乐观锁:乐观锁在操作数据时非常乐观,认为别人不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人修改了数据则放弃操作,否则执行操作。
检查是否被修改的方式可以是有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如CAS。乐观锁的典型代表:就是CAS,利用CAS进行无锁化机制加锁。
使用悲观锁解决问题的实现比较简单,只需要给这个方法添加 synchronized 关键字,也就是添加一个同步锁即可,也就是保证在同一时刻只有一个线程进入该方法:
@Override
@Transactional
public synchronized Result seckill(Long voucherId) {
// 省略代码...
}
因为给方法添加同步锁的方式,会影响一定的性能,所以我们采用乐观锁的方式来解决超卖问题,具体代码实现如下:
// 请求优惠券存在且在秒杀时间范围内,且有库存
// 开始扣减库存,新增订单
boolean updateResult = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId)
// .eq("stock", seckillVoucher.getStock())
.gt("stock", 0)
.update();
以上代码表示在进行更新库存时,会去判断当前的库存是否大于0,如果大于0表示可以创建订单,否则直接返回false。
eq("stock", seckillVoucher.getStock())
方式去判断只要扣减库存时的库存和之前查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,因为这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败实现一人一单需求
需求说明
通过以上代码的编写实现,基本上已经做到了保证秒杀业务的需求,但是,仍然还有一个不符合实际的问题是:优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单,也就是实现一人一单的功能需求。
需求分析
为了实现一人一单的功能需求,我们可以只需要在上面的秒杀逻辑中,在进行扣减库存之前,进行根据优惠卷id和用户id查询判断该用户是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行扣减库存并且完成下单操作。
代码实现
// 实现一人一单功能
// 在扣减库存之前进行判断当前用户是否已经下过单
// 获取当前用户ID
Long userID = UserHolder.getUser().getId();
// 查询数据库判断是否有当前用户的订单信息
int count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();
if (count > 0){
// 当前用户已经下过单,直接返回错误信息
return Result.fail("你已经下过一单了~");
}
问题分析
通过测试,发现此时这种方法并不能保证做到一人一单的功能,观察数据库发现,仍然被同一个用户抢购了若干订单。其实通过观察源码不难发现,其实问题和上边类似,就是在某一时刻若干线程同时执行查询该用户订单的操作,由于此时其他线程并未将订单创建保存到数据库,所以若干线程同时去进行了扣减库存完成下单的操作,最终导致了一个用户生成了多个订单。
解决办法仍然采用加锁的方式,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作。
问题解决
首先我们可以把创建订单的操作封装为一个方法,并且使用synchronized (userID.toString().intern())
添加同步代码块保证线程安全:
/**
* 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单,则去创建订单
* @param voucherId 秒杀券ID
* @return Result
*/
@Transactional
public Result createOrder(Long voucherId){
// 实现一人一单功能
// 在扣减库存之前进行判断当前用户是否已经下过单
// 获取当前用户ID
Long userID = UserHolder.getUser().getId();
// 保证线程安全
synchronized (userID.toString().intern()){
// 查询数据库判断是否有当前用户的订单信息
int count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();
if (count > 0){
// 当前用户已经下过单,直接返回错误信息
return Result.fail("你已经下过一单了~");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单
// 开始扣减库存,新增订单
boolean updateResult = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId)
// .eq("stock", seckillVoucher.getStock())
.gt("stock", 0)
.update();
// 如果扣减库存异常直接返回
if (!updateResult) {
return Result.fail("更新库存异常!");
}
// 如果扣减库存正常,开始生成订单
// 首先获取全局唯一ID
long orderId = redisIdWorker.nextId("voucher");
// 创建新增订单
VoucherOrder voucherOrder = new VoucherOrder();
// 设置订单ID为全局唯一ID
voucherOrder.setId(orderId);
// 设置抢购用户ID
voucherOrder.setUserId(UserHolder.getUser().getId());
// 设置优惠券ID
voucherOrder.setVoucherId(voucherId);
// 保存订单至数据库
voucherOrderService.save(voucherOrder);
// 返回结果
return Result.ok(orderId);
}
createOrder
方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题,在seckillVoucher
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单,则去创建订单
Long userID = UserHolder.getUser().getId();
// 保证线程安全
synchronized (userID.toString().intern()){
return createOrder(voucherId);
}
但是以上做法依然有问题,因为调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务。
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单,则去创建订单
Long userID = UserHolder.getUser().getId();
// 保证线程安全
synchronized (userID.toString().intern()){
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createOrder(voucherId);
}
重启服务后进行测试,发现项目报错了:
Cannot find current proxy: Set 'exposeProxy' property on Advised to 'true' to make it available, and ensure that AopContext.currentProxy() is invoked in the same thread as the AOP invocation context.
翻译:
解决方案:
-
添加POM依赖坐标
<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <!-- 注意:不是这一个!! <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> </dependency> -->
- 在启动类上开启注解:
@EnableAspectJAutoProxy(exposeProxy = true)
集群环境下的并发问题
通过以上代码的编写并且以加锁的方式可以解决在单机情况下的一人一单的线程安全问题,但是在集群模式下就不行就同样会出现问题。我们可以通过更换端口的方式启动多个服务来模拟集群环境,然后测试同一账号用户去多次发送抢购请求,会发现仍然可以抢到多个订单。
有关锁失效原因分析
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,synchronized 锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
分布式锁
要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。
1.线程锁
主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如Synchronized、Lock等。
2.进程锁
为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
3.分布式锁
当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
基本原理
满足分布式系统或集群模式下多进程可见并且互斥的锁就是分布式锁。
分布式锁的核心思想就是让多个机器上的进程都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
分布式锁的由来
在传统单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。
但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。
当多个进程不在同一个系统中,就需要用分布式锁控制多个进程对资源的访问。
分布式锁的特点
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
1、互斥性:任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
2、安全性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。
3、死锁:获取锁的客户端因为某些原因(如宕机等)而未能释放锁,其它客户端再也无法获取到该锁。
4、容错
常见分布式锁的实现方式
-
MySQL
:MySQL本身就带有锁机制,但是由于MySQL性能本身一般,所以采用分布式锁的情况下,其实使用MySQL作为分布式锁比较少见。 -
Redis
:Redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用Redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁。 -
Zookeeper
:zookeeper也是企业级开发中较好的一个实现分布式锁的方案。
Redis分布式锁的实现
核心思路
我们利用 Redis
的 setNx
方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可
实现分布式锁时需要实现的两个基本方法:
-
获取锁:
-
互斥:确保只能有一个线程获取锁
-
非阻塞:尝试一次,成功返回true,失败返回false
-
-
释放锁:
-
手动释放
-
超时释放:获取锁时添加一个超时时间
-
代码实现
1.创建锁接口
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
2.创建锁的实现类
public class RedisLock implements ILock{
// 操作Redis
private StringRedisTemplate stringRedisTemplate;
// 锁的业务名称
private String name;
// 锁前缀
private static final String KEY_PREFIX="lock:";
public RedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
String key = KEY_PREFIX + name;
String value = Thread.currentThread().getId() + "";
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(isLock);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
3.修改业务代码
@Override
public Result seckill(Long voucherId) {
// 根据请求秒杀的优惠券ID去查询数据库
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 请求优惠券不存在,直接返回非法请求!
if (Objects.isNull(seckillVoucher)){
return Result.fail("非法请求!");
}
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 请求优惠券存在,判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(now)){
// 未开始直接返回
return Result.fail("秒杀还未开始!");
}
// 请求优惠券存在,判断秒杀是否已经结束
if (seckillVoucher.getEndTime().isBefore(now)){
// 已经结束直接返回
return Result.fail("秒杀已经结束!");
}
// 请求优惠券存在且在秒杀时间范围内,判断是否有库存
if (seckillVoucher.getStock() < 1){
// 库存不足,直接返回
return Result.fail("已经被抢光了...");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单,则去创建订单
Long userID = UserHolder.getUser().getId();
//创建锁对象(新增代码)
RedisLock lock = new RedisLock( stringRedisTemplate, "order:" + userID);
//获取锁对象
boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
4.测试
Redis分布式锁误删问题
情况分析
当持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况。
解决方案
解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁,至此就解决了锁的误删情况。
代码实现
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
-
如果一致则释放锁
-
如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
public class RedisLock implements ILock{
// 操作Redis
private StringRedisTemplate stringRedisTemplate;
// 锁的业务名称
private String name;
// 锁前缀
private static final String KEY_PREFIX="lock:";
// 线程ID前缀
private static final String THREAD_ID_PREFIX = UUID.randomUUID().toString(true);
public RedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
String key = KEY_PREFIX + name; // lock:order:2
String value = THREAD_ID_PREFIX + "-" + Thread.currentThread().getId(); // dafasdfsgfdsgdf-33
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(isLock);
}
@Override
public void unlock() {
// 获取线程标示
String threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
情况分析
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生。
解决方案
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作Redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。
这里重点介绍Redis提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行set name admin,则脚本是这样:
# 执行 set name admin
redis.call('set', 'name', 'admin')
例如,我们要先执行set name admin,再执行get name,则脚本如下:
# 先执行 set name admin
redis.call('set', 'name', 'admin')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
127.0.0.1:6379> help @scripting
EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0
例如,我们要执行 redis.call('set', 'name', 'admin') 这个脚本,语法如下:
EVAL "redis.call('set', 'name', 'admin') " 0
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
EVAL "redis.call('set', KEYS[1], ARGV[1]) " 1 name admin
释放锁的业务流程是这样的:
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
代码实现
利用Java代码调用Lua脚本改造分布式锁,首先在资源目录下新建lock.lua脚本文件:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
然后修改RedisLock.java文件:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("lock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
// // 获取线程标示
// String threadId = THREAD_ID_PREFIX + Thread.currentThread().getId();
// // 获取锁中的标示
// String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// // 判断标示是否一致
// if(threadId.equals(id)) {
// // 释放锁
// stringRedisTemplate.delete(KEY_PREFIX + name);
// }
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
THREAD_ID_PREFIX + Thread.currentThread().getId());
}
分布式锁Redission的简单使用
基于Redis 的setnx实现的分布式锁问题分析
基于Redis 的setnx实现的分布式锁存在下面的问题:
重入问题
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
什么是Redission
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。Redission提供了分布式锁的多种多样的功能:
官网地址:
GitHub地址: https://github.com/redisson/redisson
快速入门
1.引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
2.配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.149.145:6379").setPassword("3105501510");
return Redisson.create(config);
}
}
3.使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
在 VoucherOrderServiceImpl
注入RedissonClient
@Autowired
private RedissonClient redissonClient;
@Override
public Result seckill(Long voucherId) {
// 根据请求秒杀的优惠券ID去查询数据库
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
// 请求优惠券不存在,直接返回非法请求!
if (Objects.isNull(seckillVoucher)){
return Result.fail("非法请求!");
}
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
// 请求优惠券存在,判断秒杀是否开始
if (seckillVoucher.getBeginTime().isAfter(now)){
// 未开始直接返回
return Result.fail("秒杀还未开始!");
}
// 请求优惠券存在,判断秒杀是否已经结束
if (seckillVoucher.getEndTime().isBefore(now)){
// 已经结束直接返回
return Result.fail("秒杀已经结束!");
}
// 请求优惠券存在且在秒杀时间范围内,判断是否有库存
if (seckillVoucher.getStock() < 1){
// 库存不足,直接返回
return Result.fail("已经被抢光了...");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单,则去创建订单
Long userID = UserHolder.getUser().getId();
// 创建锁对象
// 创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
// RedisLock lock = new RedisLock( stringRedisTemplate, "order:" + userID);
// 创建锁对象
RLock Lock = redissonClient.getLock("order:" + userID);
// 尝试获取锁
boolean isLock = Lock.tryLock();
// boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createOrder(voucherId);
} finally {
//释放锁
Lock.unlock();
}
}
4.优化秒杀业务实现
异步秒杀思路
当用户发起请求,此时会请求到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
1、查询优惠卷
2、判断秒杀库存是否足够
3、查询订单
4、校验是否是一人一单
5、扣减库存
6、创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?
优化方案:我们将耗时比较短的逻辑判断放入到Redis中,比如判断库存是否足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点。
-
如何在Redis中去快速校验剩余库存以及一人一单
-
由于下单前的库存以及一人一单校验和下单是两个线程,那么如何知道到底哪个单最后是否成功,或者是下单完成
为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
整体思路:当用户下单之后,判断库存是否充足只需要在 Redis
中去根据key找对应的value是否大于0即可,如果小于1,则直接结束,如果充足,继续在Redis
中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,并将userId和优惠卷存入到Redis
中,并且返回0,整个过程需要保证是原子性的,我们可以使用Lua脚本来操作。
当以上判断逻辑走完之后,我们可以判断当前Redis
中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue
中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
Redis完成下单资格判断
-
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
-
基于Lua脚本,判断库存剩余、一人一单,决定用户是否抢购成功
-
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
-
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
// private static final String SECKILL_STOCK_KEY ="seckill:stock:"
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
2.基于Lua脚本,判断库存剩余、一人一单,决定用户是否抢购成功
3.如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("redis.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckill(Long voucherId) {
// 获取用户ID
Long userID = UserHolder.getUser().getId();
// 生成订单ID
long orderId = redisIdWorker.nextId("order");
// 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userID.toString(), String.valueOf(orderId));
assert result != null;
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// TODO 保存阻塞队列
// 3.返回订单id
return Result.ok(orderId);
}
基于阻塞队列实现秒杀优化【完整代码】
修改下单动作,现在我们去下单时,是通过lua表达式去原子执行判断逻辑,如果判断我出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,如果是0,则把下单的逻辑保存到队列中去,然后异步执行
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
// IVoucherOrderService的代理对象,为了保证事务
private IVoucherOrderService proxy;
// LUA脚本 实现判断库存剩余、一人一单,决定用户是否抢购成功
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
// 加载LUA脚本
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("redis.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckill(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本 注意:在LUA脚本里完成库存剩余判断、一人一单判断
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
assert result != null;
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 脚本执行返回0 则表示抢购成功生成订单
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列
orderTasks.add(voucherOrder);
//3.获取代理对象
proxy = (IVoucherOrderService)AopContext.currentProxy();
//4.返回订单id
return Result.ok(orderId);
}
// 异步处理订单线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 用于存放订单的阻塞队列
private final BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024 * 1024);
// 在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 创建用于线程池处理的任务内部类
// 当初始化完毕后,就会去从对列中去拿信息
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try {
// 注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
proxy.createOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
@Transactional
public void createOrder(VoucherOrder voucher){
// 获取当前用户ID
// Long userID = UserHolder.getUser().getId(); == null??
Long userId = voucher.getUserId();
// 查询数据库判断是否有当前用户的订单信息
int count = query().eq("user_id", userId).eq("voucher_id", voucher.getVoucherId()).count();
if (count > 0){
// 当前用户已经下过单,直接返回错误信息
log.warn("你已经下过一单了~");
}
// 请求优惠券存在且在秒杀时间范围内,且有库存, 且当前用户未下过单
// 开始扣减库存,新增订单
boolean updateResult = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucher.getVoucherId())
// .eq("stock", seckillVoucher.getStock())
.gt("stock", 0)
.update();
// 如果扣减库存异常直接返回
if (!updateResult) {
log.warn("更新库存异常!");
}
// 如果扣减库存正常,开始保存订单
save(voucher);
}
}
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
-
创建一个Stream类型的消息队列,名为stream.orders
-
修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
-
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\
修改lua表达式,新增3.6
-- 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
-- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
VoucherOrderServiceImpl.java
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有消息,继续下一次循环
continue;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
//处理异常消息
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createVoucherOrder(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理pendding订单异常", e);
try{
Thread.sleep(20);
}catch(Exception e){
e.printStackTrace();
}
}
}
}
}