我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享
电商当项目经验已经非常普遍了,不管你是包装的还是真实的,起码要能讲清楚电商中常见的问题,比如库存的操作怎么防止商品被超卖。
在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。
解决方案
分析
在上面的第一种和第二种方式都是基于数据来扣减库存。
update number set x=x-1 where x > 0
MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。
基于redis实现扣减库存的具体实现
我们使用redis的lua脚本来实现扣减库存
由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存
/**
* 获取库存回调
* @author yuhao.wang
*/
public interface IStockCallback {
/**
* 获取库存
* @return
*/
int getStock();
}
/**
* 扣库存
*
* @author yuhao.wang
*/
@Service
public class StockService {
Logger logger = LoggerFactory.getLogger(StockService.class);
/**
* 不限库存
*/
public static final long UNINITIALIZED_STOCK = -3L;
/**
* Redis 客户端
*/
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 执行扣库存的脚本
*/
public static final String STOCK_LUA;
static {
/**
*
* @desc 扣减库存Lua脚本
* 库存(stock)-1:表示不限库存
* 库存(stock)0:表示没有库存
* 库存(stock)大于0:表示剩余库存
*
* @params 库存key
* @return
* -3:库存未初始化
* -2:库存不足
* -1:不限库存
* 大于等于0:剩余库存(扣减之后剩余的库存)
* redis缓存的库存(value)是-1表示不限库存,直接返回1
*/
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
sb.append(" local num = tonumber(ARGV[1]);");
sb.append(" if (stock == -1) then");
sb.append(" return -1;");
sb.append(" end;");
sb.append(" if (stock >= num) then");
sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");
sb.append(" end;");
sb.append(" return -2;");
sb.append("end;");
sb.append("return -3;");
STOCK_LUA = sb.toString();
}
/**
* @param key 库存key
* @param expire 库存有效时间,单位秒
* @param num 扣减数量
* @param stockCallback 初始化库存回调函数
* @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
*/
public long stock(String key, long expire, int num, IStockCallback stockCallback) {
long stock = stock(key, num);
// 初始化库存
if (stock == UNINITIALIZED_STOCK) {
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
// 获取锁
if (redisLock.tryLock()) {
// 双重验证,避免并发时重复回源到数据库
stock = stock(key, num);
if (stock == UNINITIALIZED_STOCK) {
// 获取初始化库存
final int initStock = stockCallback.getStock();
// 将库存设置到redis
redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
// 调一次扣库存的操作
stock = stock(key, num);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
redisLock.unlock();
}
}
return stock;
}
/**
* 加库存(还原库存)
*
* @param key 库存key
* @param num 库存数量
* @return
*/
public long addStock(String key, int num) {
return addStock(key, null, num);
}
/**
* 加库存
*
* @param key 库存key
* @param expire 过期时间(秒)
* @param num 库存数量
* @return
*/
public long addStock(String key, Long expire, int num) {
boolean hasKey = redisTemplate.hasKey(key);
// 判断key是否存在,存在就直接更新
if (hasKey) {
return redisTemplate.opsForValue().increment(key, num);
}
Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
if (redisLock.tryLock()) {
// 获取到锁后再次判断一下是否有key
hasKey = redisTemplate.hasKey(key);
if (!hasKey) {
// 初始化库存
redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
redisLock.unlock();
}
return num;
}
/**
* 获取库存
*
* @param key 库存key
* @return -1:不限库存; 大于等于0:剩余库存
*/
public int getStock(String key) {
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
return stock == null ? -1 : stock;
}
/**
* 扣库存
*
* @param key 库存key
* @param num 扣减库存数量
* @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
*/
private Long stock(String key, int num) {
// 脚本里的KEYS参数
List<String> keys = new ArrayList<>();
keys.add(key);
// 脚本里的ARGV参数
List<String> args = new ArrayList<>();
args.add(Integer.toString(num));
long result = redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
}
// 单机模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
}
return UNINITIALIZED_STOCK;
}
});
return result;
}
}
/**
* @author yuhao.wang
*/
@RestController
public class StockController {
@Autowired
private StockService stockService;
@RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object stock() {
// 商品ID
long commodityId = 1;
// 库存ID
String redisKey = "redis_key:stock:" + commodityId;
long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
return stock >= 0;
}
/**
* 获取初始的库存
*
* @return
*/
private int initStock(long commodityId) {
// TODO 这里做一些初始化库存的操作
return 1000;
}
@RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object getStock() {
// 商品ID
long commodityId = 1;
// 库存ID
String redisKey = "redis_key:stock:" + commodityId;
return stockService.getStock(redisKey);
}
@RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Object addStock() {
// 商品ID
long commodityId = 2;
// 库存ID
String redisKey = "redis_key:stock:" + commodityId;
return stockService.addStock(redisKey, 2);
}
}
如喜欢本文,请点击右上角,把文章分享到朋友圈
如有想了解学习的技术点,请留言给若飞安排分享
·END·
相关阅读:
作者:xiaolyuh
来源:my.oschina.net/xiaolyuh/blog/1615639
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com