背景

由于es增删改操作有延迟,所以在某些业务场景如:先删除,在新增这种db下的场景就不适用于es了(es删除后在新增),毕竟db会有事务保证,所以在es层面只适合用deleteByQuery

代码场景

通过canal订阅mysql的数据变动,投递到mq中,mq消费同步至es中,es做deleteByQuery的代码如下

1
2
3
4
5
6
7
DeleteByQueryRequest request = new DeleteByQueryRequest("你的index");
request.types("_doc");
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.filter(QueryBuilders.termQuery("promotionId", promotionId)); // 条件一
builder.filter(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("shopId", shopId))); // 条件二
request.setQuery(builder);
highLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);

业务大致是一个活动适用于多个店铺(db层面是多条数据,活动Id一样,店铺id不一样),然后修改适用的店铺,在db层面会做先删在新增,所以在es层面删除和新增会有延迟,所以用deleteByQuery是最优选择

问题分析

org.elasticsearch.ElasticsearchStatusException: Unable to parse response body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2033)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1777)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1734)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1696)
at org.elasticsearch.client.RestHighLevelClient.deleteByQuery(RestHighLevelClient.java:585)
at cn.ce.omo.promotion.consumer.service.impl.PromotionSyncSerivceImpl.lambda$sync$0(PromotionSyncSerivceImpl.java:143)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at cn.ce.omo.promotion.consumer.service.impl.PromotionSyncSerivceImpl.sync(PromotionSyncSerivceImpl.java:121)
at cn.ce.omo.promotion.consumer.sync.PromotionSyncConsumer.onMessage(PromotionSyncConsumer.java:65)
at cn.ce.omo.promotion.consumer.sync.PromotionSyncConsumer.onMessage(PromotionSyncConsumer.java:30)
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:330)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [took]
at org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)
at org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:605)
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2053)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2030)
... 17 common frames omitted
Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://ip:port], URI [/indexName/_doc/_delete_by_query?requests_per_second=-1&ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true&ignore_throttled=true&timeout=1m], status line [HTTP/1.1 409 Conflict]
{"took":28,"timed_out":false,"total":1,"deleted":0,"batches":1,"version_conflicts":1,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"promotion","type":"_doc","id":"1513878196196659287-1476148670121377838","cause":{"type":"version_conflict_engine_exception","reason":"[_doc][1513878196196659287-1476148670121377838]: version conflict, required seqNo [2524949], primary term [4]. current document has seqNo [2524950] and primary term [4]","index_uuid":"Lvf_rUoPTKaTdI-38t6TFw","shard":"2","index":"promotion"},"status":409}]}
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:552)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:537)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 common frames omitted

Failed to parse object: expecting field with name [error] but found [took]

为什么会偶现的报这个错误呢?
原来了解了deleteByQuery的原理,就知道为什么会偶现的报错了

es服务端处理

  1. es服务端收到此命令之后,会先查询符合要删除的数据,返回的字段包含version
  2. es服务端再此期间会对外提供增删改查询,换言之deleteByQuery不是一个原子操作,在此期间也可以对外提供服务
  3. 这时候如果新的请求正好操作的是deleteByQuery中的数据,那么该数据的版本号会变
  4. 此时es做delete时发现版本号已变更(操作的数据已经被更新),默认中断后续的操作
    说明此数据是一个老的数据,删除会有意想不到的事情发生,所以es便不做删除操作

解决方案

其实该问题影响并不大,忽略即可,报错原因是因为要删除的数据已经被别人更新,es乐观锁导致删除失败

  1. request.setAbortOnVersionConflict(false);
    当版本冲突时忽略忽略异常,换言之不中断(abort),默认是true
  2. highLevelClient.deleteByQueryAsync(...)
    使用异步deleteByQuery删除,换言之此删除的结果并不影响我后续的操作