背景

redis cluster集群模式下3主3从,用redisson,scan命令会漏扫
非cluster集群下没有问题,lettuce也没有问题,只有redisson+cluster+scan才有会有问题

发现的历程

搞库存重构,存放在redis的库存数据要更改名称,肯定不能用keys命令,keys会阻塞,时间复杂度是O(n)
所以用scan,但发现scan总是漏扫数据,用Lettuce就没有问题,所以断定是redisson源码出现问题

找问题的历程

分析源码,从scan开始,源码如下

建议debug阅读源码,不然容易混乱
本地没有环境建议远程debug,在jvm启动脚本加上-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=16770 用idea连接即可远程debug

分析源码

scan原理

调用scan会返回一个迭代器,通过迭代器scan会从头扫到尾,直至cursorId=0
源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public abstract class ScanCursor<T> implements Cursor<T> {

private void scan(long cursorId) {
ScanIteration<T> result = doScan(cursorId, this.scanOptions);
processScanResult(result);
}

// 由子类实现scan即可
protected abstract ScanIteration<T> doScan(long cursorId, ScanOptions options);

// 迭代器扫描
public final ScanCursor<T> open() {
doOpen(cursorId);
return this;
}

// 迭代器扫描
protected void doOpen(long cursorId) {
scan(cursorId);
}

// 处理扫描的结果
private void processScanResult(ScanIteration<T> result) {
// 扫描完成(没有任何结果)
if (result == null) {
resetDelegate();
state = CursorState.FINISHED;
return;
}

// 记录本次cursorId,下次继续扫
cursorId = Long.valueOf(result.getCursorId());

if (cursorId == 0) {
// 扫描完成
state = CursorState.FINISHED;
}

// 记录当前的结果
if (!CollectionUtils.isEmpty(result.getItems())) {
currentResult = result.iterator();
} else {
currentResult = Collections.<T>emptyList().iterator();
}
}

@Override
public boolean hasNext() {

// 如果当前没有结果,并且还没扫描完,则从上一次扫描的cursorId继续scan
while (!currentResult.hasNext() && !CursorState.FINISHED.equals(state)) {
scan(cursorId);
}

// 没有就真的没有了
return currentResult.hasNext();
}

@Override
public T next() {
T next = moveNext(currentResult);
position++;
return next;
}
}

scan-redisson的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override
public Cursor<byte[]> scan(ScanOptions options) {
// 以匿名内部类的方式实现
return new ScanCursor<byte[]>(0, options) {

// 就是这个引发的bug,目前没发现有什么作用
private RedisClient client;

// 所有节点
private Iterator<MasterSlaveEntry> entries = redisson.getConnectionManager().getEntrySet().iterator();
// 当前遍历的节点
private MasterSlaveEntry entry = entries.next();

// 扫描
@Override
protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
// 如果是队列或者pipelined则直接报异常,因为scan要立马返回结果
if (isQueueing() || isPipelined()) {
throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode.");
}

// entry为当前操作的服务器节点。cluser集群模式下,会遍历所有的节点
if (entry == null) {
return null;
}

// 组装redisson命令
List<Object> args = new ArrayList<Object>();
cursorId = Math.max(cursorId, 0);
args.add(cursorId);
if (options.getPattern() != null) {
args.add("MATCH");
args.add(options.getPattern());
}
if (options.getCount() != null) {
args.add("COUNT");
args.add(options.getCount());
}

// 执行
RFuture<ListScanResult<byte[]>> f = executorService.readAsync(client, entry, ByteArrayCodec.INSTANCE, RedisCommands.SCAN, args.toArray());
ListScanResult<byte[]> res = syncFuture(f);
long pos = res.getPos();

// 记录本次在哪个节点上执行的,下次scan时会把此当做一个参数传递进去
client = res.getRedisClient();

// 如果当前节点全部扫描完成了,则会取下一个节点,直至所有的节点结束
if (pos == 0) {
if (entries.hasNext()) {
pos = -1;
entry = entries.next();
} else {
entry = null;
}
}

return new ScanIteration<byte[]>(pos, res.getValues());
}
}.open();

}

看看如何根据entry获取connection连接的,这个是重点

核心源码是readAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
// NodeSource包含了entry和client
return async(true, new NodeSource(entry, client), codec, command, params, false, false);
}

public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
...
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
// 调用执行
executor.execute();
...
}

public void execute() {
...
CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
...
}

protected CompletableFuture<RedisConnection> getConnection() {
// scan为读模式
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
...
}
return connectionFuture;
}
public CompletableFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);
...
return entry.connectionReadOp(command);
}

private MasterSlaveEntry getEntry(NodeSource source) {
...
MasterSlaveEntry entry = source.getEntry();

// 罪魁祸首的就是他
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
}
...
return entry;
}

public MasterSlaveEntry getEntry(RedisClient redisClient) {
// 伪代码
return redisClient;
}

结论

通过阅读源码,我们发现通过迭代器ScanCursor在scan时client字段,就是个多余的存在,在scan时,client是记录当前操作的节点,在下一次scan时传递进去,以至于一直会用这个client获取connection,导致无法切换节点扫描

验证

由于我们暂时还不能提交修改源码,而且还需要一定的时间,那么怎么才能快速的验证呢?
修改源码,通过插桩的方式使其client失效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class ModifyRedisson implements ApplicationListener<ApplicationPreparedEvent> {

@Override
public void onApplicationEvent(ApplicationPreparedEvent event) {
try {
ClassLoader classLoader = RedissonConnection.class.getClassLoader();
ClassPool pool = ClassPool.getDefault();
pool.insertClassPath(new LoaderClassPath(classLoader));
CtClass wrapper = pool.get(RedissonConnection.class.getName());

List<CtClass> scanCursor = Stream.of(wrapper.getDeclaredClasses())
.filter(t -> t.getClassFile().getSuperclass().contains(ScanCursor.class.getSimpleName()))
.collect(Collectors.toList());

Assert.isTrue(scanCursor.size() == 1, "ScanCursor匿名类只能有一个");

wrapper = scanCursor.get(0);
wrapper.getDeclaredMethods()[0].insertAfter("{this.client = null;}", true);
wrapper.toClass(classLoader, null);
log.warn("修改redisson源码,使client一直为null");
} catch (Throwable e) {
log.warn("修改源码异常", e);
}

}
}

meta-inf资源目录下新增spring.factories文件,内容如下,使其上面的类生效

1
2
# Run Listeners
org.springframework.context.ApplicationListener=ModifyRedisson

通过验证,以及打断点,发现果真如此,这次scan可以循环所有的节点执行scan了
在redisson没有发布新版本修复此问题值,可以用这种简陋的方式

开始我的第一次源码贡献

  1. 首先提issue,提交代码时可关联issue
    https://github.com/redisson/redisson/issues/4238
  2. 在github找到源码所在的仓库
    https://github.com/redisson/redisson
  3. fork到自己仓库
  4. pull到本地修改
  5. 提交并Push
  6. 提交mergeRequest
    https://github.com/redisson/redisson/pull/4242
  7. 等结果吧

后续

官方采纳了我的反馈并视为一个bug,但是没有采用我的代码(已经很满意了)
原文如下

Thanks for suggested changes! I'm afraid they aren't correct. 
Since the client object should be used the same for the same entry. 
The issue is that if position == 0 then client should be set to null. 
Will fix that.

意思就是client不能一直为null(我提交的) ,在切换节点的时候变为null(官方答复) ,这样比较符合redisson的设计模式,但是修复的方式没变,都是client = null,只是时机不一样,还是需要在多多学习redisson源码呀!继续学习吧~!

差一点就能与java开源著名的中间件框架做贡献啊,失之交臂啊,不给重新提交merge的机会。。。回复此邮件时人家顺便就给修复了

新问题

由于官网不能及时发布新的版本,自己在项目当中热热修复之后(字节码插桩的方式),发现还存在别的问题

scan命令扫描集群时,如果读模式配置的是主从都可以读read,在主从之间来回扫的话,会出现数据不一致
经测试,从节点返回的游标id和主节点的不一样,不管是每次扫1000个还是100个,甚至是一个(COUNT 1)
主从节点返回的游标id都不一样,目前看来是redis自身的问题

主节点:scan 0 COUNT 1 --> 16384 --> 变为二进制:100000000000000
从节点:scan 0 COUNT 1 --> 12288 --> 变为二进制:10000000000000
初步推测是redis的主节点和从节点的扩容机制不同,导致主从的scan命令返回的游标不同

解决方案是redisson源码做scan命令时,禁止主从切换,要么全部主读,要么全部从读即可。这个就先提个(issue)吧。
咱们后续观摩观摩redisson官方的修复