查看原文
其他

ElasticSearch聚合实战+优化

elsef SpringForAll社区 2021-05-27

点击上方☝SpringForAll社区 轻松关注!

及时获取有趣有料的技术文章

本文来源:http://r6d.cn/abkNh


elseif

读完需要

5分钟

速读仅需 3 分钟

聚合功能为 ES 注入了统计分析的血统,使用户在面对大数据提取统计指标时变得游刃有余。同样的工作,你在 Hadoop 中可能需要写 mapreduce 或 Hive,在 mongo 中你必须得用大段的 mapreduce 脚本,而在 ES 中仅仅调用一个 API 就能实现了。




   

按时间聚合

从官网找一个例子 ( https://www.elastic.co/guide/cn/elasticsearch/guide/current/_returning_empty_buckets.html ),最基本的按照时间进行聚合的:


{ "size" : 0, "aggs": { "sales": { "date_histogram": { "field": "sold", "interval": "month", "format": "yyyy-MM-dd", "min_doc_count" : 0, "extended_bounds" : { "min" : "2014-01-01", "max" : "2014-12-31" } } } }}


如根据sole字段来进行按日期的聚合,其中每个桶都是按照一个month的维度进行,另外我们还对具体的结果进行了格式化以便于阅读。同时考虑到可能某些月份的结果为空,也需要返回来只做完整的报表,即min_doc_count : 0,最关键的,date_histogram (和 histogram 一样)默认只会返回文档数目非零的 buckets。所以需要使用extended_bounds来控制上下界限,防止被默认过滤掉空结果的部分。



   

多层聚合

一个双层聚合的例子 ( https://www.elastic.co/guide/cn/elasticsearch/guide/current/_extended_example.html ):


{ "size" : 0, "aggs": { "sales": { "date_histogram": { "field": "sold", "interval": "quarter", "format": "yyyy-MM-dd", "min_doc_count" : 0, "extended_bounds" : { "min" : "2014-01-01", "max" : "2014-12-31" } }, "aggs": { "per_make_sum": { "terms": { "field": "make" }, "aggs": { "sum_price": { "sum": { "field": "price" } } } }, "total_sum": { "sum": { "field": "price" } } } } }}


最终按照每个季度再按照生产商的维度计算了每个生产商的销售额per_make_sum,同时还有一个所有生产商的总销售额total_sum, 注意这两个聚合的结果都是在第一个聚合结果下,即每个季度的sales下的。

如果有了具体的聚合结果,生成报表就方便多了,我们可以对不同的聚合结果用不同的样式展示,如上图的柱状图和折线图, 用类似 Kibana 和 Grafana 这种工具很方便生成。



{ "size" : 0, "query" : { "constant_score": { "filter": { "range": { "price": { "gte": 10000 } } } } }, "aggs" : { "single_avg_price": { "avg" : { "field" : "price" } } }}


这里使用了 constant_score 查询和 filter 约束:从根本上讲,使用 non-scoring 查询和使用 match 查询没有任何区别。查询(包括了一个过滤器)返回一组文档的子集,聚合正是操作这些文档。使用 filtering query 会忽略评分,并有可能会缓存结果数据等等。



   

Filter Bucket

我们可以指定一个过滤桶,当文档满足过滤桶的条件时,我们将其加入到桶内,例子 ( https://www.elastic.co/guide/cn/elasticsearch/guide/current/_filter_bucket.html ):


{ "size" : 0, "query":{ "match": { "make": "ford" } }, "aggs":{ "recent_sales": { "filter": { "range": { "sold": { "from": "now-1M" } } }, "aggs": { "average_price":{ "avg": { "field": "price" } } } } }}


使用过滤桶在查询范围基础上应用过滤器,avg 度量只会对 ford 和上个月售出的文档计算平均售价。

因为 filter 桶和其他桶的操作方式一样,所以可以随意将其他桶和度量嵌入其中。所有嵌套的组件都会 “继承” 这个过滤,这使我们可以按需针对聚合过滤出选择部分。




   

全局桶


{ "size" : 0, "query" : { "match" : { "make" : "ford" } }, "aggs" : { "single_avg_price": { "avg" : { "field" : "price" } }, "all": { "global" : {}, "aggs" : { "avg_price": { "avg" : { "field" : "price" } }
} } }}


single_avg_price 度量计算是基于查询范围内所有文档,即所有 福特 汽车。avg_price 度量是嵌套在 全局 桶下的,这意味着它完全忽略了范围并对所有文档进行计算。聚合返回的平均值是所有汽车的平均售价。



   

实战

下面的例子来自线上,做了一定的脱敏,这是一个scoping aggregation的例子:


{ "size": 0, "query": { "bool": { "filter": [ { "term": { "msgText": "hi" } }, { "bool": { "should": [ { "bool": { "filter": [ { "term": { "to": "22222222" } }, { "range": { "timestamp": { "from": 1589795975534, "to": null, "include_lower": true, "include_upper": true } } } ] } }, { "bool": { "filter": [ { "term": { "to": "11111111" } }, { "range": { "timestamp": { "from": 1589771519612, "to": null, "include_lower": true, "include_upper": true } } } ] } } ] } } ] } }, "from": 0, "aggs": { "sessions": { "terms": { "field": "sessionId", "execution_hint": "map", "size": 100 }, "aggs": { "top_ten_hits": { "top_hits": { "size": 10, "sort": { "timestamp": { "order": "desc" } } } } } } }}


结果如下:


{ "took": 4117, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 3659354, "max_score": 0.0, "hits": [ ] }, "aggregations": { "sessions": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "102162", "doc_count": 1606188, "top_ten_hits": { "hits": { "total": 1606188, "max_score": null, "hits": [ { "_index": "message_v2", "_type": "message", "_id": "id4", "_score": null, "sort": [ 1577615548758 ] }, { "_index": "message_v2", "_type": "message", "_id": "id3", "_score": null, "sort": [ 1577615548690 ] } ] } } }, { "key": "102273", "doc_count": 47396, "top_ten_hits": { "hits": { "total": 47396, "max_score": null, "hits": [ { "_index": "message_v2", "_type": "message", "_id": "id2", "_score": null, "sort": [ 1578917789916 ] } ] } } } ] } }}


由于我们的 mapping 设置禁止掉了_source,所以结果中没有展示hits(不是aggregations里的)的内容。

再举一个例子,查询全部 Nginx 的请求 URI 在时间维度的聚合的个数:


{ "size": 0, "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "1589990400000", "lte": "1590058860000", "format": "epoch_millis" } } }, { "query_string": { "analyze_wildcard": true, "query": "host: download.xxx.com AND tag: access AND status: >399" } } ] } }, "aggs": { "3": { "terms": { "field": "urlpath.keyword", "size": 10, "order": { "_count": "desc" }, "min_doc_count": 100 }, "aggs": { "2": { "date_histogram": { "interval": "4h", "field": "@timestamp", "min_doc_count": 0, "extended_bounds": { "min": "1589990400000", "max": "1590058860000" }, "format": "epoch_millis" }, "aggs": { } } } } }}


结果如下:


{ "took": 46, "timed_out": false, "_shards": { "total": 105, "successful": 105, "skipped": 95, "failed": 0 }, "hits": { "total": 8669, "max_score": 0, "hits": [ ] }, "aggregations": { "3": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "2": { "buckets": [ { "key_as_string": "1589990400000", "key": 1589990400000, "doc_count": 440 }, { "key_as_string": "1590004800000", "key": 1590004800000, "doc_count": 428 }, { "key_as_string": "1590019200000", "key": 1590019200000, "doc_count": 243 }, { "key_as_string": "1590033600000", "key": 1590033600000, "doc_count": 4238 }, { "key_as_string": "1590048000000", "key": 1590048000000, "doc_count": 3121 } ] }, "key": "/rest/v2/app/download", "doc_count": 8470 }, { "2": { "buckets": [ { "key_as_string": "1589990400000", "key": 1589990400000, "doc_count": 15 }, { "key_as_string": "1590004800000", "key": 1590004800000, "doc_count": 1 }, { "key_as_string": "1590019200000", "key": 1590019200000, "doc_count": 4 }, { "key_as_string": "1590033600000", "key": 1590033600000, "doc_count": 67 }, { "key_as_string": "1590048000000", "key": 1590048000000, "doc_count": 98 } ] }, "key": "/rest/v2/app/upload", "doc_count": 185 } ] } }




   

聚合优化

大多数时候对单个字段的聚合查询还是非常快的, 但是当需要同时聚合多个字段时,就可能会产生大量的分组,最终结果就是占用 es 大量内存,从而导致 OOM 的情况发生。

实践应用发现,以下情况都会比较慢:1)待聚合文档数比较多(千万、亿、十亿甚至更多);2)聚合条件比较复杂(多重条件聚合);3)全量聚合(翻页的场景用)

注意到上面的 ( https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#search-aggregations-bucket-terms-aggregation-execution-hint ) 属性了吗?


There are different mechanisms by which terms aggregations can be executed:
by using field values directly in order to aggregate data per-bucket (map)by using global ordinals of the field and allocating one bucket per global ordinal (global_ordinals)


1)global_ordinals 是关键字字段( keyword field )的默认选项,它使用 全局顺序(global ordinals) 来动态分配存储区,因此内存使用情况与作为聚合作用域一部分的文档值的数量成线性关系。

2)只有极少数文档与查询匹配匹配时才应考虑使用 map 方式。默认情况下,只有在脚本上运行聚合时才会使用 map,因为它们没有序号( ordinals )。

否则,基于顺序(ordinals) 的执行模式会相对更快。

我们的一个场景是按会话的维度聚合前 N 条符合条件的命中词的消息,所以选择了使用map的方式,即直接使用了具体的 value 内容来进行 per-bucket 的聚合,效果比默认方式耗时降低了一个数量级。


当然,最终还是要根据实际场景亲自测试一下,这里有一个测试数据 ( https://blog.csdn.net/laoyang360/article/details/79253294 )。



   

缓存

ES 中经常使用到的聚合结果集可以被缓存起来,以便更快速的系统响应。这些缓存的结果集和你掠过缓存直接查询的结果是一样的。因为,第一次聚合的条件与结果缓存起来后,ES 会判断你后续使用的聚合条件,如果聚合条件不变,并且检索的数据块未增更新,ES 会自动返回缓存的结果。

注意聚合结果的缓存只针对 size=0 的请求(参考 3.10 章节),还有在聚合请求中使用了动态参数的比如 Date Range 中的 now(参考 3.5 章节),ES 同样不会缓存结果,因为聚合条件是动态的,即使缓存了结果也没用了。



   

数据的不确定性

使用 terms 聚合,结果可能带有一定的偏差与错误性。

比如:

我们想要获取 name 字段中出现频率最高的前 5 个。

此时,客户端向 ES 发送聚合请求,主节点接收到请求后,会向每个独立的分片发送该请求。分片独立的计算自己分片上的前 5 个 name,然后返回。当所有的分片结果都返回后,在主节点进行结果的合并,再求出频率最高的前 5 个,返回给客户端。

这样就会造成一定的误差,比如最后返回的前 5 个中,有一个叫 A 的,有 50 个文档;B 有 49。但是由于每个分片独立的保存信息,信息的分布也是不确定的。有可能第一个分片中 B 的信息有 2 个,但是没有排到前 5, 所以没有在最后合并的结果中出现。这就导致 B 的总数少计算了 2,本来可能排到第一位,却排到了 A 的后面。

为了改善上面的问题,就可以使用 size 和 shard_size 参数。

  • size 参数规定了最后返回的 term 个数(默认是 10 个)

  • shard_size 参数规定了每个分片上返回的个数

如果 shard_size 小于 size,那么分片也会按照 size 指定的个数计算 通过这两个参数,如果我们想要返回前 5 个,size=5;shard_size 可以设置大于 5,这样每个分片返回的词条信息就会增多,相应的误差几率也会减小。



   

References

  • 聚合 ( https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html )

  • aggs-high-level ( https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggs-high-level.html )

  • elasticsearch 深入 —— Top Hits Aggregation ( https://blog.csdn.net/ctwy291314/article/details/82773180 )

  • ES 之五:ElasticSearch 聚合 ( https://www.cnblogs.com/duanxz/p/6528161.html )




2021Java深入资料领取方式回复“20210112”

墙裂推荐

【深度】互联网技术人的社群,点击了解!




 合格的后端Coder都应该写好UT和Mock测试

 Java8中Stream原理分析

 Rate Limiter深度剖析

 Java泛型相关的深入总结


关注公众号,回复“spring”有惊喜!!!

如果资源对你有帮助的话

❤️给个在看,是最大的支持❤️

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存