大道至简之- elastic
我正在参加「掘金·启航计划」亲爱的家人们朋友们时隔7个月,打工人又开始上线了,我将持续更新golang灵魂系列。其中断更原因有很多,一方面由于开启公司加班的暴走模式, 另一方面由于小编逐渐感觉个人江郎才尽对golang失去其新鲜感(ps:小编不是喜新厌旧的人儿^▽^)。但是近期在工作编写小工具过程中,将golang作为主力开发语言,发现golang带来的轻量化、简洁的代码逻辑,极大提升个人工作效率,使我迷恋golang的“糖衣炮弹”,无法自拔。回到本文正题,今天带给童鞋们的是golang es客户端的扛把子 - olivere, 废话不多说,show me your code 。ớ₃ờ)ھ
halo elastic
依赖包版本选择需要和elasticsearch的版本匹配,如果使用Go mudules管理依赖,就要确保你使用的是7.0版本以上的elastic即可。
es 版本 | Elastic 版本 | 依赖包地址 |
---|---|---|
7.x | 7.0 | github.com/olivere/ela… |
6.x | 6.0 | github.com/olivere/ela… |
5.x | 5.0 | gopkg.in/olivere/ela… |
进入正题,首先是我们要确保已经有个可以正常使用的elasticsearch 环境,可以直接通过客户端或者kibana访问,在确保这些前提下,我们开启cp模式(copy)动手写代码:
step 1:安装依赖包
go get gopkg.in/olivere/elastic/v7
step 2:引入依赖
import "github.com/olivere/elastic/v7"
step 3: 运行用例
package main
import (
"context"
"fmt"
"github.com/olivere/elastic"
)
func main() {
client, err := elastic.NewClient(elastic.SetErrorLog(errorlog))
info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}
这些只是初步建立与es的链接,其中elastic库还包含了document api、search api、aggregation、indices api、cat api、query dsl、sorting等api。
elastic 创建客户端,连接es
如何创建一个客户端?
client, err := elastic.NewClient(elastic.SetURL("http://192.168.2.10:9201"))
if err != nil {
// Handle error
panic(err)
}
defer client.Stop()
多个节点连接
client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200",
"http://127.0.0.1:9201"))
elastic 配置:
- 指定地址连接es
SetURL(...string) // 使用,逗号分隔
- 解决elastic连接es 时自动转换连接地址
SetSniff(bool) // 默认值:true
- 指定 Elastic 是否通过尝试定期连接到其节点来执行运行状况检查
SetHealthcheck(bool) //默认值:true
- 运行状况检查的超时
SetHealthcheckTimeout(time.Duration) // 默认值:1second
- 指定 HTTP 基本身份验证详细信息
SetBasicAuth(username, password string)
- 指定两次运行状况检查之间的间隔
SetHealthcheckInterval(time.Duration) // 默认值:60second
- 设置用于错误级别消息的记录器
SetErrorLog(*log.Logger)
- 设置用于消息级别消息的记录器
SetInfoLog(*log.Logger)
- 设置用于打印 HTTP 请求和响应的记录器
SetTraceLog(*log.Logger)
- 在请求端启用或禁用压缩
SetGzip(bool) // 默认值: false
- 指定每次调用 时要发送的 HTTP 标头列表。notice: 请求级 HTTP 标头优先于客户端级 HTTP 标头。
SetHeaders(http.Header{...}) //适用于7.0.7或更高版本
下面为配置客户端的示例:
client, err := elastic.NewClient(
elastic.SetURL("http://10.0.1.1:9200", "http://10.0.1.2:9200"),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetRetrier(NewCustomRetrier()),
elastic.SetGzip(true),
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
elastic.SetHeaders(http.Header{
"X-Caller-Id": []string{"..."},
}),
)
至此我们就创建完成客户端连接es的相关配置操作。接下来就是和操作数据库流程一样,创建索引、删除索引、 添加文档、获取文档、更新文档、删除文档等。
索引crud
判断是否存在指定索引
func (client *elastic.Client) checkIsExisted(index string) bool {
exists, err := client.IndexExists(index).Do(context.Background())
if err != nil {
// Handle error
}
return exists;
}
创建索引
// Create a new index.
mapping := `{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"properties":{
"tags":{
"type":"keyword"
},
"location":{
"type":"geo_point"
},
"suggest_field":{
"type":"completion"
}
}
}
}`
func (client *elastic.Client) CreateIndex(idx string, body interface{}) bool {
createIndex, err := client.CreateIndex(idx).BodyString(body.(map[stirng]interface{})).Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
if createIndex.Acknowledged {
// if the result was false,it is timeout that create a new index
// and update the status of clusters,but it may be finished the creating.
return true
}
return false
}
existed := client.CreateIndex("tweet", mapping)
对应的DSL:
PUT /tweet
{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"properties":{
"tags":{
"type":"keyword"
},
"location":{
"type":"geo_point"
},
"suggest_field":{
"type":"completion"
}
}
}
}
删除索引
func (client *elastic.Client) deleteIndex(idx string) bool {
resp, err := client.DeleteIndex(idx).Do(context.Background())
if err != nil {
log.Println(err)
return false
}
log.Println(resp.Acknowledged) // true
return true
}
对应DSL:
DELETE /tweet
更改mapping
func (es *elastic.Client) PutIndexMapping(index string, mapping interface{}) bool {
var resp *elastic.PutMappingResponse
var err error
switch reflect.TypeOf(mapping).Kind() {
case reflect.String:
resp, err = client.PutMapping().Index(index).
Type("_doc").IgnoreUnavailable(true).
BodyString(mapping.(string)).Do(context.Background())
case reflect.Map:
resp, err = client.PutMapping().Index(index).
Type("_doc").IgnoreUnavailable(true).
BodyJson(mapping.(map[string]interface{})).Do(context.Background())
}
if err != nil {
log.Println(err)
return false
}
if resp.Acknowledged {
return true
}
return false
}
查询索引名称列表
names, err := client.IndexNames()
if err != nil {
// Handle error
panic(err)
}
for _, name := range names {
fmt.Printf("%s\n", name)
}
对应的DSL:
GET _cat/indices
设置索引别名
_, err = client.Alias().Action(
elastic.NewAliasAddAction("tweet").Index("twitter"),
).Do(context.Background())
对应的DSL:
POST _aliases
{
"actions" : [{"add" : {"index" : "tweet" , "alias" : "twitter"}}]
}
文档的crud
插入文档
func (client *elastic.Client) InsertDoc(index,type string, doc interface{}) bool {
resp, err := client.Index().Index(index).Type(type).
BodyJson(doc.(map[string]interface{})).Do(context.Background())
if err != nil {
log.Println(err)
}
if resp.Result == "created" || resp.Result == "updated" {
log.Println(resp.Result)
}
return true
}
对应的DSL:
POST /tweet/_doc/1
{
"id": 123456,
"name": "id1",
"createTime": "2023-06-14 00:42:00"
}
查询DSL
构建NewBoolQuery
query := elastic.NewBoolQuery()
query.Must(elastic.NewTermsQuery("name", "user1"))
同时,Must(queries ...Query)支持多个query作为参数传入。下面我们用代码来说明:
q := elastic.NewBoolQuery()
q = q.Must(elastic.NewTermQuery("tag", "wow"))
q = q.MustNot(elastic.NewRangeQuery("age").From(10).To(20))
q = q.Filter(elastic.NewTermQuery("account", "1"))
q = q.Should(elastic.NewTermQuery("tag", "sometag"),
elastic.NewTermQuery("tag", "sometagtag"))
q = q.Boost(10)
其中,Must 表示为必须匹配条件,相当于sql中的 and ;
q.Must(elastic.NewTermQuery("tag", "wow"))
MustNot 表示为必须不匹配条件, 相当于sql中的 not ;
q.MustNot(elastic.NewRangeQuery("age").From(10).To(20))
should 表示为当满足条件时,则增加score,相当于sql中的 or ;
q.Should(elastic.NewTermQuery("tag", "sometag"))
filter 表示根据过滤条件得到满足的文档;
q.Filter(elastic.NewTermQuery("account", "1"))
terms 表示为匹配指定字段的多个值 ,相当于sql中的 in ;
elastic.NewTermsQueryFromStrings("name", []string{"zhangsan", "lisi"})
range 表示获取指定字段值的范围,相当于sql中的 <> ,其中gt为 > , lt为 < ,gte为 >=, lte为 <=;
elastic.NewRangeQuery("msgTime").
Gte("2023-01-01 00:00:00").Lt("2023-12-30 23:59:59").
Format("yyyy-MM-dd HH:mm:ss").TimeZone("+08:00")
exists 表示判断是否存在, 既可以判断该字段是否在es中是否为空
elastic.NewExistsQuery("name")
构建条件的接口都是以链式编程来实现,以简洁的api调用实现业务需求。
上面的代码得到对应的DSL:
{
"bool": {
"must": { "term": { "tag": "wow" }},
"must_not": { "range": { "age": {"gte": 10, "lte": 20 }}},
"filter": {"term": { "account": "1"}},
"should": [
{ "term": { "tag": "sometag" }},
{ "term": { "tag": "sometagtag"}}
]
}
}
范围查询
elastic.NewRangeQuery("msgTime").
Gte(startTime).Lt(endTime).
Format("yyyy-MM-dd HH:mm:ss").TimeZone("+08:00")
对应的DSL:
{
"query": {
"range": {
"msgTime": {
"gte": "2023-06-01",
"lt": "2023-06-15",
"format": "yyyy-MM-dd HH:mm:ss",
"timezone": "+08:00"
}
}
}
}
模糊匹配 match、fuzzy、wildCard
match
分词匹配检索, 按照默认分词器将传入参数进行分词,如下面例子会分成老、板进行检索
elastic.NewMatchQuery("name", "老板")
得到的结果可能是:
老板
老板娘
老公
老人
模板
对应的DSL:
{
"query": {
"match": {
"name": "老板"
}
}
}
wildcard
通配符检索,即相当于sql的like,在前后拼接*,匹配0到多个任意字符;也可使用?匹配单个字符。
elastic.NewWildcardQuery("name", "*li*")
对应的DSL:
{
"query": {
"wildcard": {
"name.keyword": "*li*"
}
}
}
匹配到的结果:
lisi
lily
lisa
limon
注意:在业务场景中应该警惕使用wildcard检索。以*或?开头的检索,会增加查询匹配需要的迭代次数并降低搜索性能,增大带有通配符构造出来的DFA(Deterministic Finite Automaton)的复杂堵,导致CPU开销增大,从而导致集群宕机的风险。那有什么解决类似mysql like查询的方案吗?
方案一:
使用Ngram分词,写入时优化分词,以更小细粒度分词,有利于数据召回(什么是数据召回?即从海量数据中,快速找到一部分相关或近满足需求的数据)
Ngram:就是将文本内容以字节大小为N的长度字节片段进行滑动窗口操作,每个片段称之为gram,对gram进行评分统计,按照设置好的阈值进行过滤,得到一个gram列表,此时列表中的每个gram就是一个特征向量维度。
{
"settings": {
"index.max_ngram_diff": 10,
"number_of_shards": 128,
"number_of_replicas": 2,
"refresh_interval": "5s",
"analysis": {
"analyzer": {
"ngram_analyzer": {
"tokenizer": "ngram_tokenizer"
}
},
"tokenizer": {
"ngram_tokenizer": {
"type": "ngram",
"min_gram": 1,
"max_gram": 10,
"token_chars": [
"letter",
"digit"
]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ngram_analyzer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
}
}
}
}
使用上面数据结构来创建或更新索引,注意es7版本或更新的版本min_gram和max_gram的粒度默认是1,分词是按照一个字符逐个分隔的,如果需要细粒度>1需要设置index_ngram_diff >=(max_gram - min_gram),否则会报异常。
方案二:
elasticsearch 7.9+版本,使用wildcard类型。同样也可以禁用wildcard模糊检索「elasticsearch版本必须在7.7+」,实现如下:
"search.allow_expensive_queries": false
fuzzy
模糊/纠错检索, 对输入参数进行一定程度的容错
elastic.NewFuzzyQuery("name", "李四")
对应DSL:
{
"query": {
"fuzzy": {
"name.keyword": "李四"
}
}
}
匹配的结果:
李四
李连杰
李晨
Bulk 批量处理
bulk是什么?能如何操作api,与其他crud操作有何区别?我们带着这些疑问透过实践看问题,俗话说”实践是检验真理的唯一标准“。
bulk操作类型:
- create:如果文档不存在,那么就创建它,存在的话,就会报错,发生异常报错不会影响其他操作
- index:创建一个新文档或者替换一个现有的文档
- update:部分更新一个文档
- delete:删除一个文档
create 新增文档 (文档不存在)
如果文档不存在,则创建该文档;如果存在,则报异常,但不影响其他操作
req := elastic.NewBulkIndexRequest().OpType("create").
Index("twitter").Id("1").Doc(tweet{Name: "zhangsan"})
req1 := elastic.NewBulkIndexRequest().OpType("create").
Index("twitter").Id("2").Doc(tweet{Name: "lisi"})
req2 := elastic.NewBulkIndexRequest().OpType("create").
Index("twitter").Id("3").Doc(tweet{Name: "wangwu"})
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(req)
bulkRequest = bulkRequest.Add(req1)
bulkRequest = bulkRequest.Add(req2)
bulkResponse, err := bulkRequest.Do(context.Background())
if err !=nil {
panic(err)
}
对应的DSL(在metadata中需要指定index以及type):
POST /_bulk
{"create":{"_index": "tweet","_type": "_doc","_id": "1"}}
{"id": "1","name":"zhangsan"}
{"create":{"_index": "tweet","_type": "_doc","_id": "2"}}
{"id": "2","name":"lisi"}
{"create":{"_index": "tweet","_type": "_doc","_id": "2023"}}
{"id": "3","name":"wangwu"}
index 创建新文档(替换已经有的文档)
req := elastic.NewBulkIndexRequest().
Index("twitter").Id("1").Doc(tweet{Name: "zhangsan"})
这里我们需要看看源码:
// NewBulkIndexRequest returns a new BulkIndexRequest.
// The operation type is "index" by default.
func NewBulkIndexRequest() *BulkIndexRequest {
return &BulkIndexRequest{
opType: "index",
}
}
其中opType类型默认为index,故我们在创建文档时,可以不用指定操作类型。
对应的DSL:
POST /tweet/_doc/_bulk
{"index":{"_id": "1"}}
{"id": "1","nickname":"zhangsan"}
update 更新文档
update1Req := elastic.NewBulkUpdateRequest().Index("twitter").
Type("tweet").Id("1").
Doc(struct {
Name int `json:"name"`
}{
Name: "zhangsan1",
})
update2Req := elastic.NewBulkUpdateRequest().Index("twitter").
Type("tweet").Id("2").
Doc(struct {
Name int `json:"name"`
}{
Name: "lisi1",
})
对应的DSL:
POST /tweet/_doc/_bulk
{"update":{"_id": "1"}}
{"doc": {"name": "zhangsan1"}}
{"update":{"_id": "2"}}
{"doc": {"name":"lisi1"}}
delete 删除文档
delete1Req := elastic.NewBulkDeleteRequest().
Index("twitter").Type("tweet").Id("1")
delete2Req := elastic.NewBulkDeleteRequest().
Index("twitter").Type("tweet").Id("2")
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(delete1Req)
bulkRequest = bulkRequest.Add(delete2Req)
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
panic(err)
}
deleted := bulkResponse.Deleted()
对应的DSL:
POST /tweet/_doc/_bulk
{"delete":{"_id": "1"}}
{"delete":{"_id": "2"}}
以上均是基于BulkRequest的api,处理请求方式均为同步。要想提升处理请求性能,推荐使用BulkProcessor。BulkProcessor将BulkRequest对象创建和客户端执行请求封装在一起,整个执行请求过程和时机对于开发者是完全透明,只需将请求添加到BulkProcessor中,等待所有请求执行完成即可。
Aggregation(聚合)
es聚合类型:
- Bucketing aggregation, 类似mysql 的group by;
- Metric aggregation, 数学运算,对文档统计分析;
- Matrix aggregation,对多个字段进行操作返回一个矩阵结果;
- Pipeline aggregation,聚合结果进行二次聚合
elasticsearch聚合语法结构:
指标聚合
Value Count
值聚合,统计文档总数,等同于sql的count函数
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
aggs := elastic.NewValueCountAggregation().Field("type")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("total", aggs).
Size(0).
Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
agg, found := searchResult.Aggregations.ValueCount("total")
if !found {
//todo
}
对应的DSL:
GET /good/_search?size=0
{
"aggs": {
"total": {
"value_count": {
"field": "type"
}
}
}
}
Cardinality
基数聚合,等同sql的count(DISTINCT 字段)
aggs := elastic.NewCardinalityAggregation().
Field("type")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("total", aggs).
Size(0).
Do(context.Background())
...
agg, found := searchResult.Aggregations.Cardinality("total")
...
对应的DSL:
POST /good/_search?size=0
{
"aggs" : {
"total" : {
"cardinality" : {
"field" : "type"
}
}
}
}
Avg
aggs := elastic.NewAvgAggregation().Field("price")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("avg_price", aggs).
Size(0).
Do(context.Background())
...
agg, found := searchResult.Aggregations.Avg("avg_price")
...
对应的DSL:
POST /good/_search?size=0
{
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
Sum
aggs := elastic.NewSumAggregation().Field("price")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("total_price", aggs).
Size(0).
Do(context.Background())
...
agg, found := searchResult.Aggregations.Sum("total_price")
...
对应的DSL:
POST /good/_search?size=0
{
"aggs": {
"total_price": {
"sum": {
"field": "price"
}
}
}
}
Max
aggs := elastic.NewMaxAggregation().Field("price")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("max_price", aggs).
Size(0).
Do(context.Background())
...
agg, found := searchResult.Aggregations.Max("max_price")
...
对应的DSL:
POST /good/_search?size=0
{
"aggs": {
"max_price": {
"max": {
"field": "price"
}
}
}
}
Min
aggs := elastic.NewMinAggregation().Field("price")
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("min_price", aggs).
Size(0).
Do(context.Background())
...
agg, found := searchResult.Aggregations.Min("min_price")
...
对应的DSL:
POST /good/_search?size=0
{
"aggs": {
"min_price": {
"min": {
"field": "price"
}
}
}
}
桶聚合
Terms Aggregation : group by
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
agetop := elastic.NewtermsAggregation().Field("age")
result, err := client.Search().Index("twitter").
Query(elastic.NewMatchAllQuery()).
Aggregation("agetop", agetop).
Size(0).
Do(context.Backgroud())
agg, found := result.Aggregations.Terms("agetop")
if !found {
// todo
}
for _, ageBucket := range agg.Buckets {
age := ageBucket.Key
}
对应的DSL:
GET /twitter/_search?size=0
{
"size": 0,
"aggs": {
"agetop": {
"terms": {
"field": "age"
}
}
}
}
Histogram聚合
直方图聚合即根据数值间隔分组,histogram聚合分桶统计结果
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
aggs := elastic.NewHistogramAggregation().
Field("age").Interval(10) // 年龄间隔10分组
result, err := client.Search().
Index("twitter").
Query(elastic.NewMatchAllQuery()).
Aggregation("age", aggs).
Size(0)
Do(context.Background())
if err!= nil {
// todo
}
agg, found := result.Aggregations.Histogram("age")
if !found {
// todo
}
for _,bucket := range agg.Buckets {
value := bucket.Key
}
对应的DSL:
POST /twitter/_search?size=0
{
"aggs": {
"age": {
"histogram": {
"field": "age",
"interval": 10
}
}
}
}
Date histogram聚合
功能类似于histogram聚合,主要作用为处理时间类型之短,根据时间、日期进行分桶统计
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
aggs := elastic.NewDateHistogramAggregation().
Field("createTime").
// 分组间隔:month代表每月、支持minute(每分钟)、hour(每小时)、day(每天)、week(每周)、year(每年)
CalendarInterval("month").
// 设置返回结果中桶key的时间格式
Format("yyyy-MM-dd")
searchResult, err := client.Search().
Index("twitter").
Query(elastic.NewMatchAllQuery()).
Aggregation("create_time", aggs).
Size(0).
Do(ctx)
if err!= nil {
// todo
}
agg, found := result.Aggregations.Histogram("create_time")
if !found {
// todo
}
for _,bucket := range agg.Buckets {
value := bucket.Key
}
对应的DSL:
POST /twitter/_search?size=0
{
"aggs" : {
"create_time" : { // 聚合查询名字,随便取一个
"date_histogram" : { // 聚合类型为: date_histogram
"field" : "createTime", // 根据date字段分组
"calendar_interval" : "month", // 分组间隔:month代表每月、支持minute(每分钟)、hour(每小时)、day(每天)、week(每周)、year(每年)
"format" : "yyyy-MM-dd" // 设置返回结果中桶key的时间格式
}
}
}
}
Range聚合
按照数值范围进行分桶统计
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
aggs := elastic.NewRangeAggregation().
Field("price").
Keyed(true).
LtWithKey("cheap", 70).
BetweenWithKey("average", 70, 200).
GtWithKey("expensive", 200)
searchResult, err := client.Search().
Index("twitter").
Query(elastic.NewMatchAllQuery()).
Aggregation("prices", aggs).
Size(0).
Do(ctx)
if err!= nil {
// todo
}
agg, found := result.Aggregations.Range("prices")
if !found {
// todo
}
for _,bucket := range agg.Buckets {
value := bucket.Key
}
对应的DSL:
GET /twitter/_search
{
"aggs" : {
"prices" : {
"range" : {
"field" : "price",
"ranges" : [
{ "key" : "cheap", "to" : 70}, // price < 70
{ "key" : "average", "from" : 70, "to" : 200 }, // price >= 70 && price < 200
{ "key" : "expensive", "from" : 200.0 } // price >= 200
]
}
}
}
}
嵌套聚合
client, err := elastic.NewClient()
if err != nil {
// Handle error
panic(err)
}
goods := elastic.NewTermsAggregation().Field("brand")
sumAggs := elastic.NewSumAggregation().Field("price")
goods = timeline.SubAggregation("total_price", sumAggs)
searchResult, err := client.Search().
Index("good").
Query(elastic.NewMatchAllQuery()).
Aggregation("goods", goods).
Pretty(true).
Do(context.Background())
if err != nil {
panic(err)
}
agg, found := searchResult.Aggregations.Terms("goods")
if !found {
//todo
}
for _, goodBucket := range agg.Buckets {
good := goodBucket.Key
agg, sumfound := goodBucket.Aggregations.Sum("total_price")
if !sumfound{
//todo
}
}
对应的DSL:
GET /good/_search
{
"aggs" : {
"goods" : {
"terms" : {
"field" : "brand"
},
"aggs": {
"total_price": {
"sum": {
"field": "price"
}
}
}
}
}
}
总结
通过上面的例子我们可以看出,操作elastic工具库api兼容了elasticsearch的写法提供了elasticsearch大部分功能。当然elasticsearch也推出自己官方的es工具包,感兴趣的童鞋们也可自行访问github查看。希望通过本文给小伙伴们在工作提高一点点帮助,大家好,我是懂的越多,知道的越少的灵魂艺术家,下期文章大家想知道关于golang的什么知识呢?
转载自:https://juejin.cn/post/7245919919224815671