likes
comments
collection
share

大道至简之- elastic

作者站长头像
站长
· 阅读数 11

我正在参加「掘金·启航计划」亲爱的家人们朋友们时隔7个月,打工人又开始上线了,我将持续更新golang灵魂系列。其中断更原因有很多,一方面由于开启公司加班的暴走模式, 另一方面由于小编逐渐感觉个人江郎才尽对golang失去其新鲜感(ps:小编不是喜新厌旧的人儿^▽^)。但是近期在工作编写小工具过程中,将golang作为主力开发语言,发现golang带来的轻量化、简洁的代码逻辑,极大提升个人工作效率,使我迷恋golang的“糖衣炮弹”,无法自拔。回到本文正题,今天带给童鞋们的是golang es客户端的扛把子 - olivere, 废话不多说,show me your code 。ớ₃ờ)ھ

大道至简之- elastic

halo elastic

依赖包版本选择需要和elasticsearch的版本匹配,如果使用Go mudules管理依赖,就要确保你使用的是7.0版本以上的elastic即可。

es 版本Elastic 版本依赖包地址
7.x7.0github.com/olivere/ela…
6.x6.0github.com/olivere/ela…
5.x5.0gopkg.in/olivere/ela…

进入正题,首先是我们要确保已经有个可以正常使用的elasticsearch 环境,可以直接通过客户端或者kibana访问,在确保这些前提下,我们开启cp模式(copy)动手写代码:

step 1:安装依赖包

go get gopkg.in/olivere/elastic/v7

step 2:引入依赖

import "github.com/olivere/elastic/v7"

step 3: 运行用例

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic"
)
func main() {
    client, err := elastic.NewClient(elastic.SetErrorLog(errorlog))
    info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
	}
	fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}

这些只是初步建立与es的链接,其中elastic库还包含了document api、search api、aggregation、indices api、cat api、query dsl、sorting等api。

elastic 创建客户端,连接es

如何创建一个客户端?

client, err := elastic.NewClient(elastic.SetURL("http://192.168.2.10:9201"))
if err != nil {
  // Handle error
  panic(err)
}
defer client.Stop()

多个节点连接

client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200", 
                                                "http://127.0.0.1:9201"))

elastic 配置:

  1. 指定地址连接es
SetURL(...string) // 使用,逗号分隔
  1. 解决elastic连接es 时自动转换连接地址
SetSniff(bool) // 默认值:true
  1. 指定 Elastic 是否通过尝试定期连接到其节点来执行运行状况检查
SetHealthcheck(bool)  //默认值:true
  1. 运行状况检查的超时
SetHealthcheckTimeout(time.Duration)  // 默认值:1second
  1. 指定 HTTP 基本身份验证详细信息
SetBasicAuth(username, password string)
  1. 指定两次运行状况检查之间的间隔
SetHealthcheckInterval(time.Duration)  // 默认值:60second
  1. 设置用于错误级别消息的记录器
SetErrorLog(*log.Logger)
  1. 设置用于消息级别消息的记录器
SetInfoLog(*log.Logger)
  1. 设置用于打印 HTTP 请求和响应的记录器
SetTraceLog(*log.Logger)
  1. 在请求端启用或禁用压缩
SetGzip(bool)  // 默认值: false
  1. 指定每次调用 时要发送的 HTTP 标头列表。notice: 请求级 HTTP 标头优先于客户端级 HTTP 标头
SetHeaders(http.Header{...})  //适用于7.0.7或更高版本

下面为配置客户端的示例:

client, err := elastic.NewClient(
  elastic.SetURL("http://10.0.1.1:9200", "http://10.0.1.2:9200"),
  elastic.SetSniff(false),
  elastic.SetHealthcheckInterval(10*time.Second),
  elastic.SetRetrier(NewCustomRetrier()),
  elastic.SetGzip(true),
  elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
  elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
  elastic.SetHeaders(http.Header{
    "X-Caller-Id": []string{"..."},
  }),
)

至此我们就创建完成客户端连接es的相关配置操作。接下来就是和操作数据库流程一样,创建索引、删除索引、 添加文档、获取文档、更新文档、删除文档等。

索引crud

判断是否存在指定索引

func (client *elastic.Client) checkIsExisted(index string) bool {
    exists, err := client.IndexExists(index).Do(context.Background())
    if err != nil {
        // Handle error
    }
    return exists;
}

创建索引

// Create a new index.
mapping := `{
	"settings":{
		"number_of_shards":1,
		"number_of_replicas":0
	},
	"mappings":{
		"properties":{
			"tags":{
				"type":"keyword"
			},
			"location":{
				"type":"geo_point"
			},
			"suggest_field":{
				"type":"completion"
			}
		}
	}
}`

func (client *elastic.Client) CreateIndex(idx string, body interface{}) bool {
    createIndex, err := client.CreateIndex(idx).BodyString(body.(map[stirng]interface{})).Do(context.Background())
    if err != nil {
        // Handle error
        panic(err)
    }
    if createIndex.Acknowledged {
        // if the result was false,it is timeout that create a new index
    	// and update the status of clusters,but it may be finished the creating.
        return true
    }
    return false
}

existed := client.CreateIndex("tweet", mapping)

对应的DSL:

PUT /tweet

{
	"settings":{
		"number_of_shards":1,
		"number_of_replicas":0
	},
	"mappings":{
		"properties":{
			"tags":{
				"type":"keyword"
			},
			"location":{
				"type":"geo_point"
			},
			"suggest_field":{
				"type":"completion"
			}
		}
	}
}

删除索引

func (client *elastic.Client) deleteIndex(idx string) bool {
    resp, err := client.DeleteIndex(idx).Do(context.Background())
	if err != nil {
		log.Println(err)
		return false
	}
	log.Println(resp.Acknowledged) // true
	return true
}

对应DSL:

DELETE /tweet

更改mapping

func (es *elastic.Client) PutIndexMapping(index string, mapping interface{}) bool {
	var resp *elastic.PutMappingResponse
	var err error
	switch reflect.TypeOf(mapping).Kind() {
	case reflect.String:
		resp, err = client.PutMapping().Index(index).
			Type("_doc").IgnoreUnavailable(true).
			BodyString(mapping.(string)).Do(context.Background())
	case reflect.Map:
		resp, err = client.PutMapping().Index(index).
			Type("_doc").IgnoreUnavailable(true).
			BodyJson(mapping.(map[string]interface{})).Do(context.Background())
	}
	if err != nil {
		log.Println(err)
		return false
	}
	if resp.Acknowledged {
		return true
	}
	return false
}

查询索引名称列表

names, err := client.IndexNames()
if err != nil {
    // Handle error
    panic(err)
}
for _, name := range names {
    fmt.Printf("%s\n", name)
}

对应的DSL:

GET _cat/indices

设置索引别名

_, err = client.Alias().Action(
	elastic.NewAliasAddAction("tweet").Index("twitter"),
).Do(context.Background())

对应的DSL:

POST _aliases
{
  "actions" : [{"add" : {"index" : "tweet" , "alias" : "twitter"}}]
}

文档的crud

插入文档

func (client *elastic.Client) InsertDoc(index,type string, doc interface{}) bool {
	resp, err := client.Index().Index(index).Type(type).
		BodyJson(doc.(map[string]interface{})).Do(context.Background())
	if err != nil {
		log.Println(err)
	}
	if resp.Result == "created" || resp.Result == "updated" {
		log.Println(resp.Result)
	}
	return true
}

对应的DSL:

POST /tweet/_doc/1
{
    "id": 123456,
    "name": "id1",
    "createTime": "2023-06-14 00:42:00"
}

查询DSL

构建NewBoolQuery

query := elastic.NewBoolQuery()
query.Must(elastic.NewTermsQuery("name", "user1"))

同时,Must(queries ...Query)支持多个query作为参数传入。下面我们用代码来说明:

q := elastic.NewBoolQuery()
q = q.Must(elastic.NewTermQuery("tag", "wow"))
q = q.MustNot(elastic.NewRangeQuery("age").From(10).To(20))
q = q.Filter(elastic.NewTermQuery("account", "1"))
q = q.Should(elastic.NewTermQuery("tag", "sometag"), 
             elastic.NewTermQuery("tag", "sometagtag"))
q = q.Boost(10)

其中,Must 表示为必须匹配条件,相当于sql中的 and ;

q.Must(elastic.NewTermQuery("tag", "wow"))

MustNot 表示为必须不匹配条件, 相当于sql中的 not ;

q.MustNot(elastic.NewRangeQuery("age").From(10).To(20))

should 表示为当满足条件时,则增加score,相当于sql中的 or ;

q.Should(elastic.NewTermQuery("tag", "sometag"))

filter 表示根据过滤条件得到满足的文档;

q.Filter(elastic.NewTermQuery("account", "1"))

terms 表示为匹配指定字段的多个值 ,相当于sql中的 in ;

elastic.NewTermsQueryFromStrings("name", []string{"zhangsan", "lisi"})

range 表示获取指定字段值的范围,相当于sql中的 <> ,其中gt为 > , lt为 < ,gte为 >=, lte为 <=;

elastic.NewRangeQuery("msgTime").
		Gte("2023-01-01 00:00:00").Lt("2023-12-30 23:59:59").
		Format("yyyy-MM-dd HH:mm:ss").TimeZone("+08:00")

exists 表示判断是否存在, 既可以判断该字段是否在es中是否为空

elastic.NewExistsQuery("name")

构建条件的接口都是以链式编程来实现,以简洁的api调用实现业务需求。

上面的代码得到对应的DSL:

{
	"bool": {
        "must":     { "term": { "tag": "wow" }},
        "must_not": { "range": { "age":   {"gte": 10, "lte": 20 }}},
        "filter": {"term": { "account": "1"}},
        "should": [
            { "term": { "tag": "sometag" }},
            { "term": { "tag": "sometagtag"}}
        ]
    }
}

范围查询

elastic.NewRangeQuery("msgTime").
		Gte(startTime).Lt(endTime).
		Format("yyyy-MM-dd HH:mm:ss").TimeZone("+08:00")

对应的DSL:

{
    "query": {
        "range": {
            "msgTime": {
                "gte": "2023-06-01",
                "lt": "2023-06-15",
                "format": "yyyy-MM-dd HH:mm:ss",
                "timezone": "+08:00"
            }
        }
    }
}

模糊匹配 match、fuzzy、wildCard

match

分词匹配检索, 按照默认分词器将传入参数进行分词,如下面例子会分成老、板进行检索

elastic.NewMatchQuery("name", "老板")

得到的结果可能是:

老板
老板娘
老公
老人
模板

对应的DSL:

{
    "query": {
        "match": {
            "name": "老板"
        }
    }
}

wildcard

通配符检索,即相当于sql的like,在前后拼接*,匹配0到多个任意字符;也可使用?匹配单个字符。

elastic.NewWildcardQuery("name", "*li*")

对应的DSL:

{
    "query": {
        "wildcard": {
            "name.keyword": "*li*"
        }
    }
}

匹配到的结果:

lisi
lily
lisa
limon

注意:在业务场景中应该警惕使用wildcard检索。以*或?开头的检索,会增加查询匹配需要的迭代次数并降低搜索性能,增大带有通配符构造出来的DFA(Deterministic Finite Automaton)的复杂堵,导致CPU开销增大,从而导致集群宕机的风险。那有什么解决类似mysql like查询的方案吗?

方案一:

使用Ngram分词,写入时优化分词,以更小细粒度分词,有利于数据召回(什么是数据召回?即从海量数据中,快速找到一部分相关或近满足需求的数据)

Ngram:就是将文本内容以字节大小为N的长度字节片段进行滑动窗口操作,每个片段称之为gram,对gram进行评分统计,按照设置好的阈值进行过滤,得到一个gram列表,此时列表中的每个gram就是一个特征向量维度。

{
  "settings": {
    "index.max_ngram_diff": 10,
    "number_of_shards": 128,
    "number_of_replicas": 2,
    "refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ngram_analyzer": {
          "tokenizer": "ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ngram_tokenizer": {
          "type": "ngram",
          "min_gram": 1,
          "max_gram": 10,
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text",
        "analyzer": "ngram_analyzer",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "createTime": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
      }
    }
  }
}

使用上面数据结构来创建或更新索引,注意es7版本或更新的版本min_gram和max_gram的粒度默认是1,分词是按照一个字符逐个分隔的,如果需要细粒度>1需要设置index_ngram_diff >=(max_gram - min_gram),否则会报异常。

方案二:

elasticsearch 7.9+版本,使用wildcard类型。同样也可以禁用wildcard模糊检索「elasticsearch版本必须在7.7+」,实现如下:

"search.allow_expensive_queries": false

fuzzy

模糊/纠错检索, 对输入参数进行一定程度的容错

elastic.NewFuzzyQuery("name", "李四")

对应DSL:

{
    "query": {
        "fuzzy": {
            "name.keyword": "李四"
        }
    }
}

匹配的结果:

李四
李连杰
李晨

Bulk 批量处理

bulk是什么?能如何操作api,与其他crud操作有何区别?我们带着这些疑问透过实践看问题,俗话说”实践是检验真理的唯一标准“。

bulk操作类型:

  • create:如果文档不存在,那么就创建它,存在的话,就会报错,发生异常报错不会影响其他操作
  • index:创建一个新文档或者替换一个现有的文档
  • update:部分更新一个文档
  • delete:删除一个文档

create 新增文档 (文档不存在)

如果文档不存在,则创建该文档;如果存在,则报异常,但不影响其他操作

req := elastic.NewBulkIndexRequest().OpType("create").
		Index("twitter").Id("1").Doc(tweet{Name: "zhangsan"})
req1 := elastic.NewBulkIndexRequest().OpType("create").
		Index("twitter").Id("2").Doc(tweet{Name: "lisi"})
req2 := elastic.NewBulkIndexRequest().OpType("create").
		Index("twitter").Id("3").Doc(tweet{Name: "wangwu"})
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(req)
bulkRequest = bulkRequest.Add(req1)
bulkRequest = bulkRequest.Add(req2)

bulkResponse, err := bulkRequest.Do(context.Background())
if err !=nil {
    panic(err)
}

对应的DSL(在metadata中需要指定index以及type):

POST /_bulk

{"create":{"_index": "tweet","_type": "_doc","_id": "1"}}
{"id": "1","name":"zhangsan"}
{"create":{"_index": "tweet","_type": "_doc","_id": "2"}}
{"id": "2","name":"lisi"}
{"create":{"_index": "tweet","_type": "_doc","_id": "2023"}}
{"id": "3","name":"wangwu"}

index 创建新文档(替换已经有的文档)

req := elastic.NewBulkIndexRequest().
		Index("twitter").Id("1").Doc(tweet{Name: "zhangsan"})

这里我们需要看看源码:

// NewBulkIndexRequest returns a new BulkIndexRequest.
// The operation type is "index" by default.
func NewBulkIndexRequest() *BulkIndexRequest {
	return &BulkIndexRequest{
		opType: "index",
	}
}

其中opType类型默认为index,故我们在创建文档时,可以不用指定操作类型。

对应的DSL:

POST /tweet/_doc/_bulk

{"index":{"_id": "1"}}
{"id": "1","nickname":"zhangsan"}

update 更新文档

update1Req := elastic.NewBulkUpdateRequest().Index("twitter").
				Type("tweet").Id("1").
                Doc(struct {
                Name int `json:"name"`
        }{
                Name: "zhangsan1",
        })
update2Req := elastic.NewBulkUpdateRequest().Index("twitter").
				Type("tweet").Id("2").
                Doc(struct {
                Name int `json:"name"`
        }{
                Name: "lisi1",
        })

对应的DSL:

POST /tweet/_doc/_bulk

{"update":{"_id": "1"}}
{"doc": {"name": "zhangsan1"}}
{"update":{"_id": "2"}}
{"doc": {"name":"lisi1"}}

delete 删除文档

delete1Req := elastic.NewBulkDeleteRequest().
				Index("twitter").Type("tweet").Id("1")
delete2Req := elastic.NewBulkDeleteRequest().
				Index("twitter").Type("tweet").Id("2")
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(delete1Req)
bulkRequest = bulkRequest.Add(delete2Req)

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
  panic(err)
}
deleted := bulkResponse.Deleted()

对应的DSL:

POST /tweet/_doc/_bulk

{"delete":{"_id": "1"}}
{"delete":{"_id": "2"}}

以上均是基于BulkRequest的api,处理请求方式均为同步。要想提升处理请求性能,推荐使用BulkProcessor。BulkProcessor将BulkRequest对象创建和客户端执行请求封装在一起,整个执行请求过程和时机对于开发者是完全透明,只需将请求添加到BulkProcessor中,等待所有请求执行完成即可。

Aggregation(聚合)

es聚合类型:

  1. Bucketing aggregation, 类似mysql 的group by;
  2. Metric aggregation, 数学运算,对文档统计分析;
  3. Matrix aggregation,对多个字段进行操作返回一个矩阵结果;
  4. Pipeline aggregation,聚合结果进行二次聚合

elasticsearch聚合语法结构:

大道至简之- elastic

指标聚合

Value Count

值聚合,统计文档总数,等同于sql的count函数

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}

aggs := elastic.NewValueCountAggregation().Field("type")

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("total", aggs). 
        Size(0). 
        Do(context.Background())
if err != nil {
    // Handle error
    panic(err)
}
agg, found := searchResult.Aggregations.ValueCount("total")
if !found {
	//todo
}
对应的DSL:
GET /good/_search?size=0
{
  "aggs": {
    "total": { 
      "value_count": { 
        "field": "type" 
      }
    }
  }
}

Cardinality

基数聚合,等同sql的count(DISTINCT 字段)

aggs := elastic.NewCardinalityAggregation().
        Field("type")

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("total", aggs). 
        Size(0). 
        Do(context.Background())
...
agg, found := searchResult.Aggregations.Cardinality("total")
...
对应的DSL:
POST /good/_search?size=0
{
    "aggs" : {
        "total" : { 
            "cardinality" : { 
                "field" : "type" 
            }
        }
    }
}

Avg

aggs := elastic.NewAvgAggregation().Field("price") 

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("avg_price", aggs). 
        Size(0). 
        Do(context.Background())
...
agg, found := searchResult.Aggregations.Avg("avg_price")
...
对应的DSL:
POST /good/_search?size=0
{
  "aggs": {
    "avg_price": { 
      "avg": { 
        "field": "price" 
      }
    }
  }
}

Sum

aggs := elastic.NewSumAggregation().Field("price")

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("total_price", aggs). 
        Size(0). 
        Do(context.Background())
...
agg, found := searchResult.Aggregations.Sum("total_price")
...
对应的DSL:
POST /good/_search?size=0
{
  "aggs": {
    "total_price": { 
      "sum": { 
        "field": "price" 
      }
    }
  }
}

Max

aggs := elastic.NewMaxAggregation().Field("price")

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("max_price", aggs). 
        Size(0). 
        Do(context.Background())
...
agg, found := searchResult.Aggregations.Max("max_price")
...
对应的DSL:
POST /good/_search?size=0
{
  "aggs": {
    "max_price": { 
      "max": { 
        "field": "price" 
      }
    }
  }
}

Min

aggs := elastic.NewMinAggregation().Field("price")

searchResult, err := client.Search().
        Index("good"). 
        Query(elastic.NewMatchAllQuery()). 
        Aggregation("min_price", aggs). 
        Size(0). 
        Do(context.Background())
...
agg, found := searchResult.Aggregations.Min("min_price")
...
对应的DSL:
POST /good/_search?size=0
{
  "aggs": {
    "min_price": { 
      "min": { 
        "field": "price" 
      }
    }
  }
}

桶聚合

Terms Aggregation : group by

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}
agetop := elastic.NewtermsAggregation().Field("age")
result, err := client.Search().Index("twitter").
				Query(elastic.NewMatchAllQuery()).
                Aggregation("agetop", agetop).
            	Size(0).
            	Do(context.Backgroud())
agg, found := result.Aggregations.Terms("agetop")
if !found {
    // todo
}
for _, ageBucket := range agg.Buckets {
    age := ageBucket.Key
}
对应的DSL:
GET /twitter/_search?size=0
{
    "size": 0,
    "aggs": {
        "agetop": {
            "terms": {
                "field": "age"
            }
        }
    }
}    

Histogram聚合

直方图聚合即根据数值间隔分组,histogram聚合分桶统计结果

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}
aggs := elastic.NewHistogramAggregation().
				Field("age").Interval(10) // 年龄间隔10分组

result, err := client.Search().
            	Index("twitter").
            	Query(elastic.NewMatchAllQuery()).
                Aggregation("age", aggs).
            	Size(0)
            	Do(context.Background())
if	err!= nil {
    // todo
}
agg, found := result.Aggregations.Histogram("age")
if !found {
    // todo
}
for _,bucket := range agg.Buckets {
    value := bucket.Key
}
对应的DSL:
POST /twitter/_search?size=0
{
    "aggs": {
        "age": {
            "histogram": {
                "field": "age",
                "interval": 10
            }
        }
    }
}    

Date histogram聚合

功能类似于histogram聚合,主要作用为处理时间类型之短,根据时间、日期进行分桶统计

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}
aggs := elastic.NewDateHistogramAggregation().
		Field("createTime"). 
		//  分组间隔:month代表每月、支持minute(每分钟)、hour(每小时)、day(每天)、week(每周)、year(每年)
		CalendarInterval("month").
		// 设置返回结果中桶key的时间格式
		Format("yyyy-MM-dd")

searchResult, err := client.Search().
		Index("twitter"). 
		Query(elastic.NewMatchAllQuery()).
		Aggregation("create_time", aggs). 
		Size(0). 
		Do(ctx) 
if	err!= nil {
    // todo
}
agg, found := result.Aggregations.Histogram("create_time")
if !found {
    // todo
}
for _,bucket := range agg.Buckets {
    value := bucket.Key
}
对应的DSL:
POST /twitter/_search?size=0
{
    "aggs" : {
        "create_time" : { // 聚合查询名字,随便取一个
            "date_histogram" : { // 聚合类型为: date_histogram
                "field" : "createTime", // 根据date字段分组
                "calendar_interval" : "month", // 分组间隔:month代表每月、支持minute(每分钟)、hour(每小时)、day(每天)、week(每周)、year(每年)
                "format" : "yyyy-MM-dd" // 设置返回结果中桶key的时间格式
            }
        }
    }
}

Range聚合

按照数值范围进行分桶统计

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}

aggs := elastic.NewRangeAggregation().
		Field("price"). 
		Keyed(true).
		LtWithKey("cheap", 70).
		BetweenWithKey("average", 70, 200).
		GtWithKey("expensive", 200)

searchResult, err := client.Search().
		Index("twitter"). 
		Query(elastic.NewMatchAllQuery()).
		Aggregation("prices", aggs). 
		Size(0). 
		Do(ctx) 
if	err!= nil {
    // todo
}
agg, found := result.Aggregations.Range("prices")
if !found {
    // todo
}
for _,bucket := range agg.Buckets {
    value := bucket.Key
}
对应的DSL:
GET /twitter/_search
{
    "aggs" : {
        "prices" : { 
            "range" : { 
                "field" : "price", 
                "ranges" : [ 
                    { "key" : "cheap", "to" : 70}, // price < 70
                    { "key" : "average", "from" : 70, "to" : 200 }, // price >= 70 && price < 200
                    { "key" : "expensive", "from" : 200.0 } // price >= 200
                ]
            }
        }
    }
}

嵌套聚合

client, err := elastic.NewClient()
if err != nil {
    // Handle error
    panic(err)
}
goods := elastic.NewTermsAggregation().Field("brand")

sumAggs := elastic.NewSumAggregation().Field("price")

goods = timeline.SubAggregation("total_price", sumAggs)

searchResult, err := client.Search().
    Index("good").                  
    Query(elastic.NewMatchAllQuery()). 
    Aggregation("goods", goods). 
    Pretty(true).                      
    Do(context.Background())
if err != nil {
    panic(err)
}

agg, found := searchResult.Aggregations.Terms("goods")
if !found {
	//todo
}
for _, goodBucket := range agg.Buckets {
	good := goodBucket.Key

    agg, sumfound := goodBucket.Aggregations.Sum("total_price")
	if !sumfound{
    	//todo
    }
    
}
对应的DSL:
GET /good/_search
{
    "aggs" : { 
        "goods" : { 
            "terms" : { 
              "field" : "brand"
            },
            "aggs": { 
              "total_price": { 
                "sum": { 
                  "field": "price"
                }
              }
            }
        }
    }
}

总结

通过上面的例子我们可以看出,操作elastic工具库api兼容了elasticsearch的写法提供了elasticsearch大部分功能。当然elasticsearch也推出自己官方的es工具包,感兴趣的童鞋们也可自行访问github查看。希望通过本文给小伙伴们在工作提高一点点帮助,大家好,我是懂的越多,知道的越少的灵魂艺术家,下期文章大家想知道关于golang的什么知识呢?