背景 redis cluster集群模式下3主3从,用redisson,scan命令会漏扫非cluster集群下没有问题,lettuce也没有问题,只有redisson+cluster+scan才有会有问题
发现的历程 搞库存重构,存放在redis的库存数据要更改名称,肯定不能用keys
命令,keys会阻塞,时间复杂度是O(n) 所以用scan
,但发现scan总是漏扫数据,用Lettuce就没有问题,所以断定是redisson源码出现问题
找问题的历程 分析源码,从scan开始,源码如下
建议debug阅读源码,不然容易混乱 本地没有环境建议远程debug,在jvm启动脚本加上-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=16770
用idea连接即可远程debug
分析源码 scan原理 调用scan会返回一个迭代器,通过迭代器scan会从头扫到尾,直至cursorId=0 源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public abstract class ScanCursor <T > implements Cursor <T > { private void scan (long cursorId) { ScanIteration<T> result = doScan(cursorId, this .scanOptions); processScanResult(result); } protected abstract ScanIteration<T> doScan (long cursorId, ScanOptions options) ; public final ScanCursor<T> open () { doOpen(cursorId); return this ; } protected void doOpen (long cursorId) { scan(cursorId); } private void processScanResult (ScanIteration<T> result) { if (result == null ) { resetDelegate(); state = CursorState.FINISHED; return ; } cursorId = Long.valueOf(result.getCursorId()); if (cursorId == 0 ) { state = CursorState.FINISHED; } if (!CollectionUtils.isEmpty(result.getItems())) { currentResult = result.iterator(); } else { currentResult = Collections.<T>emptyList().iterator(); } } @Override public boolean hasNext () { while (!currentResult.hasNext() && !CursorState.FINISHED.equals(state)) { scan(cursorId); } return currentResult.hasNext(); } @Override public T next () { T next = moveNext(currentResult); position++; return next; } }
scan-redisson的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 @Override public Cursor<byte []> scan(ScanOptions options) { return new ScanCursor<byte []>(0 , options) { private RedisClient client; private Iterator<MasterSlaveEntry> entries = redisson.getConnectionManager().getEntrySet().iterator(); private MasterSlaveEntry entry = entries.next(); @Override protected ScanIteration<byte []> doScan(long cursorId, ScanOptions options) { if (isQueueing() || isPipelined()) { throw new UnsupportedOperationException("'SSCAN' cannot be called in pipeline / transaction mode." ); } if (entry == null ) { return null ; } List<Object> args = new ArrayList<Object>(); cursorId = Math.max(cursorId, 0 ); args.add(cursorId); if (options.getPattern() != null ) { args.add("MATCH" ); args.add(options.getPattern()); } if (options.getCount() != null ) { args.add("COUNT" ); args.add(options.getCount()); } RFuture<ListScanResult<byte []>> f = executorService.readAsync(client, entry, ByteArrayCodec.INSTANCE, RedisCommands.SCAN, args.toArray()); ListScanResult<byte []> res = syncFuture(f); long pos = res.getPos(); client = res.getRedisClient(); if (pos == 0 ) { if (entries.hasNext()) { pos = -1 ; entry = entries.next(); } else { entry = null ; } } return new ScanIteration<byte []>(pos, res.getValues()); } }.open(); }
看看如何根据entry获取connection连接的,这个是重点 核心源码是readAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public <T, R> RFuture<R> readAsync (RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { return async(true , new NodeSource(entry, client), codec, command, params, false , false ); } public <V, R> RFuture<R> async (boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) { ... RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry); executor.execute(); ... } public void execute () { ... CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture(); ... } protected CompletableFuture<RedisConnection> getConnection () { if (readOnlyMode) { connectionFuture = connectionManager.connectionReadOp(source, command); } else { ... } return connectionFuture; } public CompletableFuture<RedisConnection> connectionReadOp (NodeSource source, RedisCommand<?> command) { MasterSlaveEntry entry = getEntry(source); ... return entry.connectionReadOp(command); } private MasterSlaveEntry getEntry (NodeSource source) { ... MasterSlaveEntry entry = source.getEntry(); if (source.getRedisClient() != null ) { entry = getEntry(source.getRedisClient()); } ... return entry; } public MasterSlaveEntry getEntry (RedisClient redisClient) { return redisClient; }
结论 通过阅读源码,我们发现通过迭代器ScanCursor在scan时client字段,就是个多余的存在,在scan时,client是记录当前操作的节点,在下一次scan时传递进去,以至于一直会用这个client获取connection,导致无法切换节点扫描
验证 由于我们暂时还不能提交修改源码,而且还需要一定的时间,那么怎么才能快速的验证呢? 修改源码,通过插桩的方式使其client失效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Slf4j public class ModifyRedisson implements ApplicationListener <ApplicationPreparedEvent > { @Override public void onApplicationEvent (ApplicationPreparedEvent event) { try { ClassLoader classLoader = RedissonConnection.class.getClassLoader(); ClassPool pool = ClassPool.getDefault(); pool.insertClassPath(new LoaderClassPath(classLoader)); CtClass wrapper = pool.get(RedissonConnection.class.getName()); List<CtClass> scanCursor = Stream.of(wrapper.getDeclaredClasses()) .filter(t -> t.getClassFile().getSuperclass().contains(ScanCursor.class.getSimpleName())) .collect(Collectors.toList()); Assert.isTrue(scanCursor.size() == 1 , "ScanCursor匿名类只能有一个" ); wrapper = scanCursor.get(0 ); wrapper.getDeclaredMethods()[0 ].insertAfter("{this.client = null;}" , true ); wrapper.toClass(classLoader, null ); log.warn("修改redisson源码,使client一直为null" ); } catch (Throwable e) { log.warn("修改源码异常" , e); } } }
meta-inf资源目录下新增spring.factories文件,内容如下,使其上面的类生效
1 2 org.springframework.context.ApplicationListener =ModifyRedisson
通过验证,以及打断点,发现果真如此,这次scan可以循环所有的节点执行scan了 在redisson没有发布新版本修复此问题值,可以用这种简陋的方式
开始我的第一次源码贡献
首先提issue,提交代码时可关联issuehttps://github.com/redisson/redisson/issues/4238
在github找到源码所在的仓库https://github.com/redisson/redisson
fork到自己仓库
pull到本地修改
提交并Push
提交mergeRequesthttps://github.com/redisson/redisson/pull/4242
等结果吧
后续 官方采纳了我的反馈并视为一个bug,但是没有采用我的代码(已经很满意了) 原文如下
Thanks for suggested changes! I'm afraid they aren't correct.
Since the client object should be used the same for the same entry.
The issue is that if position == 0 then client should be set to null.
Will fix that.
意思就是client不能一直为null(我提交的 )
,在切换节点的时候变为null(官方答复 )
,这样比较符合redisson的设计模式,但是修复的方式没变,都是client = null ,只是时机不一样,还是需要在多多学习redisson源码呀!继续学习吧~!
差一点就能与java开源著名的中间件框架做贡献啊,失之交臂啊,不给重新提交merge的机会。。。回复此邮件时人家顺便就给修复了
新问题
由于官网不能及时发布新的版本,自己在项目当中热热修复之后(字节码插桩的方式),发现还存在别的问题
scan命令扫描集群时,如果读模式配置的是主从都可以读read
,在主从之间来回扫的话,会出现数据不一致 经测试,从节点返回的游标id和主节点的不一样,不管是每次扫1000个还是100个,甚至是一个(COUNT 1
) 主从节点返回的游标id都不一样,目前看来是redis自身的问题
主节点:scan 0 COUNT 1
--> 16384 --> 变为二进制:100000000000000 从节点:scan 0 COUNT 1
--> 12288 --> 变为二进制:10000000000000 初步推测是redis的主节点和从节点的扩容机制不同,导致主从的scan命令返回的游标不同
解决方案是redisson源码做scan命令时,禁止主从切换,要么全部主读,要么全部从读即可。这个就先提个(issue )吧。 咱们后续观摩观摩redisson官方的修复