代码

先来看lua核销的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
local userId = ARGV[1]
local type = tostring(ARGV[2])
local invalid = {}
-- 循环所有的活动判断库存
for kIndex = 1, #KEYS do
repeat
local kName = KEYS[kIndex]

-- 不存在则忽略此key,并记录
if (redis.call('exists', kName) == 0) then
invalid[kName] = true
break
end
-- 不是incr则跳过
if ('incr' ~= type) then
break
end

local pId = string.sub(kName, string.find(kName, '}:') + 2)
-- 判断总参与数
local promotionTotal = tonumber(redis.call('hget', kName, 'promotion:total')) or -1
if (promotionTotal > -1) then
local soldPromotionTotal = tonumber(redis.call('hget', kName, 'sold:promotion:total')) or 0
if (soldPromotionTotal + 1 > promotionTotal) then
return '促销' .. pId .. '超出总限购';
end
end
-- 判断个人参与数
local perPromotionTotal = tonumber(redis.call('hget', kName, 'promotion:per')) or -1
if (perPromotionTotal > -1) then
local perSoldPromotionTotal = tonumber(redis.call('hget', kName, 'sold:promotion:per:' .. userId)) or 0
if (perSoldPromotionTotal + 1 > perPromotionTotal) then
return '会员参与促销' .. pId .. '超出限购';
end
end

-- 判断当前促销下的商品的库存
for skuId_skuNum in string.gmatch(ARGV[kIndex + 2], '[^' .. ';' .. ']+') do
local i = string.find(skuId_skuNum, ':');
local skuId = string.sub(skuId_skuNum, 0, i - 1)
local skuNum = tonumber(string.sub(skuId_skuNum, i + 1))
local skuTotal = tonumber(redis.call('hget', kName, 'sku:total:' .. skuId)) or -1
if (skuTotal > -1) then
local soldSkuTotal = tonumber(redis.call('hget', kName, 'sold:sku:total:' .. skuId)) or 0
if (skuNum + soldSkuTotal > skuTotal) then
return '促销' .. pId .. '中的商品' .. skuId .. '超出限购';
end
end
local skuPer = tonumber(redis.call('hget', kName, 'sku:per:' .. skuId)) or -1
if (skuPer > -1) then
local soldSkuTotal = tonumber(redis.call('hget', kName, 'sold:sku:per:' .. skuId .. ':' .. userId)) or 0
if (skuNum + soldSkuTotal > skuPer) then
return '会员参与促销' .. pId .. '中的商品' .. skuId .. '超出限购';
end
end
end
until true
end

-- 扣减库存
for kIndex = 1, #KEYS do
repeat
local kName = KEYS[kIndex]
-- 如果不存在则忽略此key
if (invalid[kName]) then
break
end

-- 活动总参与次数
local promotionNum = type == 'decr' and -1 or 1
redis.call('hincrby', kName, 'sold:promotion:total', promotionNum)
-- 活动个人参与次数
redis.call('hincrby', kName, 'sold:promotion:per:' .. userId, promotionNum)
-- sku库存
for skuId_skuNum in string.gmatch(ARGV[kIndex + 2], '[^' .. ';' .. ']+') do
local i = string.find(skuId_skuNum, ':');
local skuId = string.sub(skuId_skuNum, 0, i - 1)
local skuNum = tonumber(string.sub(skuId_skuNum, i + 1))
skuNum = type == 'decr' and -skuNum or skuNum
-- sku总库存
redis.call('hincrby', kName, 'sold:sku:total:' .. skuId, skuNum)
-- 活动总售卖sku数量(统计的时候用)
redis.call('hincrby', kName, 'sold:promotion:sku:total', skuNum)
-- sku个人库存
redis.call('hincrby', kName, 'sold:sku:per:' .. skuId .. ':' .. userId, skuNum)
end
until true
end
return '"1"'

看完后有点蒙?没关系,分析一下业务场景,为什么这样核销,你就明白啦

背景

电商促销模块,以下的业务的逻辑都集中在一块,只对外提供一个算价接口(根据skuIdList进行促销算价)

  1. 秒杀
    限时秒杀,直接指定金额
  2. 折扣
    在商品基础价上进行打折
  3. 拼团
    3人团、5人团,成团享受活动价
  4. 社区团购
    线下拼团,成团享受活动价
  5. 优惠套装
    指定n件,共m元

以上为单品促销
单品促销的限制:如果一个商品参加多个单品促销,那么该商品在算价的时候只会选择其中一个(优惠金额最高的)


以下为非单品促销,算价时以单品算价后的最优价格再次进行叠加算价
非单品促销是允许叠加单品促销的(如果单品促销的活动允许叠加,且非单品促销也允许叠加)

  1. n元m件
    指定mmmm件,任选m-1件,总共n元
  2. 满减折
    满元、满件,满足条件送赠品、送积分、优惠券码等
  3. 满包邮
    满元、满件,满足条件之后可以减免多少运费或者进行包邮
  4. 加价购
    满多少钱,在加少量的钱换购指定的商品
  5. ..等其他逻辑
  • 一个商品可以同时参加以上多种活动,每种活动都可以设置是否允许叠加其他活动的选项
  • 限购分为活动维度的设置(参与活动次数),以及商品维度的设置(商品的活动数量),另外还可以设置针对每个人参与限购的选项,总共分为以下四种促销限制
    1. 活动维度(每一个订单算参与1次)
    2. 活动上每个人的维度
    3. 商品维度(商品数量)
    4. 商品上每个人的维度

业务梳理

该业务难点

  1. 算价接口的响应时间
  2. 算价时活动限购的过滤
  3. 核销时库存的判断
    超卖则不允许核销

算价接口的响应时间

主要瓶颈在于io,所以我们就把活动数据存放在elasticSearch中,通过canal订阅mysql指定的表,使用rockerMq接收消息,同步到es中

canal-读音[kəˈnæl],原理是伪装为Mysql 的slave,然后向Mysql master发送dump协议,master收到请求后开始向slave同步binary log数据,然后canal解析并投递至指定的mq中

既减轻了mysql的压力,也保证了数据同步能达到最终一致性

算价时根据skuId从es查询有效的活动,什么是有效的呢?在活动时间的起止范围之内就是有效的
没有必要根据某个状态值判断,增加维护状态值的成本,而且es同步数据会有1秒左右的延迟,所以根据时间区间判断即可

1
2
3
4
String now = DateFormatUtils.ISO_8601_EXTENDED_DATETIME_FORMAT.format(System.currentTimeMillis());
BoolQueryBuilder starting = QueryBuilders.boolQuery();
starting.must(QueryBuilders.rangeQuery("startTime").lte(now));
starting.must(QueryBuilders.rangeQuery("endTime").gt(now));

算价时活动限购的过滤

库存数据结构

维护库存变动是一个频繁的增删改过程,所以redis缓存是最优的方案,我们把活动上的限购信息存放在redis中的hash结构中,为什么呢?
首先hash可以存多个field,而且每个field都可以做incr自增操作,在结合活动四种限购类型的考虑(活动上、活动上的每人、活动中商品的数量、活动中商品的每人的数量)
我们以活动的维度来维护库存,如果用hash结构,在写入或者读取时,根据活动id以setAll或者getAll做一次io即可读取或者修改,维护起来比较方便
其次是只需要给hash设定一个过期时间(活动的截止时间),如果为非hash结构比如说为key value结构,那么还需要每个key设置一个过期时间

结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// field字段
Map<String, Number> stockParam = new HashMap<>();
stockParam.put("pId", promotion.getId());

stockParam.put("promotion:total", promotion.getPromotionLimitTotal());// 活动维度
stockParam.put("promotion:perTotal", promotion.getPromotionPerLimit());// 活动上每个人的维度

// 循环该活动中的商品,有多少就Put多少
stockParam.put("sku:total:" + goods.getSkuId(), goods.getSkuTotalLimit()); // 活动中商品的数量
stockParam.put("sku:per:" + goods.getSkuId(), goods.getSkuPerLimit()); // 活动中商品每个人可购买的数量

// 在cluster集群的情况下,redis的key用花括号圈起来(只圈部分固定值即可,使其库存的信息都落在一个节点上),至于原因后面会有分析
String hashKey = "{promotion:stock}:" + promotion.getId();

// 放置redis中,开启事务,保证此hash肯定有过期时间
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
redisTemplate.multi();
redisTemplate.boundHashOps(hashKey)
.putAll(stockParam);
redisTemplate.expireAt(hashKey, promotion.getEndTime());
redisTemplate.exec();
return null;
}
});

// 如果是集群,不支持事务的情况下建议使用pipeline,一次io把所有的命令发给redis服务端
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
redisTemplate.boundHashOps(stockKey)
.putAll(stockParam);

redisTemplate.expireAt(stockKey, instant);
return null;
}
});

redis的Key前缀用"{}"花括号括起来,为什么呢?因为redis如果是cluster集群的话,lua操作的key必须都在一个机器上,否则报错(lua脚本不支持夸节点执行多个key)
"{}"花括号的意思为:redis只用花括号之间的值进行hashSlot运算,即都会在一个hashSlot中,那么lua脚本在核销的时候可以进行批量核销

lua脚本为何不支持夸节点执行多个key?
众所周知,redis是单线程执行命令,单线程意味着没有上下文切换的性能损耗
单线程的特性就是所有的命令都是顺序执行,所以Lua脚本也是顺序执行,再此期间其他命令不会穿插执行,所以只能操作当前的机器,无法操作其他节点的机器

算价时批量查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Long> promotionIds = Lists.newArrayList(1L, 2L, 3L); // 活动id

// 通过pipeline批量获取
List<Object> objects = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
promotionIds.forEach(pId -> connection.hGetAll(("{promotion:stock}:" + pId).getBytes(StandardCharsets.UTF_8)));
return null;
});

// 活动id与对应的库存名称与对应的数量,该map就是批量查询活动设置的库存信息,算价时只需要从redis查询一次即可
Map<Long, Map<String, Long>> pId$stockName$stockValueMap_map = objects.stream()
.filter(t -> t instanceof Map)
.map(Map.class::cast)
.filter(MapUtils::isNotEmpty)
.map(map -> {
@SuppressWarnings("unchecked")
Set<Map.Entry<Object, Object>> set = map.entrySet();
return set.stream()
.collect(Collectors.toMap(t -> String.valueOf(t.getKey()), o -> Long.parseLong(String.valueOf(o.getValue()))));
}).collect(Collectors.toMap(t -> t.get("pId"), t -> t));

通过批量查询即可获取设置的库存信息以及已售卖数量的信息

核销时库存的判断

一个订单会有多个商品,且会参加多个活动,所以在核销时,是不能上分布式锁的,为什么呢?
比如:
x订单的商品参加了a1,b1两个活动
y订单的商品参加了b1,a1两个活动
如果用分布式锁上锁的话,可能会产生死锁,而且循环上锁,判断库存,然后在写入库存,效率极低。所以用开头的lua脚本直接判断并扣取库存是最优的选择

核销代码

执行lua执行需要组装redis的keys和args
举列:
redis的keys为{promotion:stock}: + pId1,{promotion:stock}: + pId2 共两个key
args为skuId1:1;skuId2:2,skuId3:3; 共两个value
含义如下:
pId1核销两个sku,一个为skuId1,数量为1,一个为skuId2,数量为2
pId2核销一个sku,skuId3,数量为3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// redisKey与旗下的sku对应核销的数量,redisKey为{promotion:stock}: + pId
Map<String, Map<Long, Long>> pId$skuId$skuNumMap_map = new HashMap<>();

List<Object> redisKey = new ArrayList<>(pId$skuId$skuNumMap_map.keySet());

@SuppressWarnings("SuspiciousMethodCalls")
List<String> redisArgs = redisKey.stream()
.map(pId -> pId$skuId$skuNumMap_map.getOrDefault(pId, Collections.emptyMap())
.entrySet()
.stream()
.map(skuId$num_entry -> skuId$num_entry.getKey() + ":" + skuId$num_entry.getValue())
.collect(Collectors.joining(";"))
).collect(Collectors.toList());

// 第一个value为会员id
redisArgs.add(0, userId);
// 第二个参数为当前操作的类型
redisArgs.add(1, 'incr');

String errorMsg = redisTemplate.execute('lua脚本', redisKey, (Object[]) redisArgs.toArray(new String[0]));
// errorMsg为1代表核销成功,否则就是核销失败的错误信息