service.go 8.9 KB
package elastic

import (
	"context"
	"errors"
	"github.com/olivere/elastic/v7"
	"gitlab.jxhh.com/stbz/library.git/logs"
	"sync/atomic"
	"time"
)

const (
	refresh = "true"
)

//根据文档Id 快速获取内容
func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败:"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
	if err != nil  {
		err = errors.New("获取文档失败,错误原因:"+err.Error())
		logs.Error("获取文档失败 错误原因:【%s】",err.Error())
		return
	}
	return
}

//根据搜索条件 获取总数
func GetCount(req GetCountReq) (count int64,err error) {
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	countService := client.Count(req.IndexName).Pretty(true)
	count, err = countService.Query(req.Condition).Do(ctx)
	if err != nil {
		err = errors.New("获取总数失败,错误原因:"+err.Error())
		logs.Error("GetCount 获取总数失败:【%s】",err.Error())
		return
	}
	return
}

//根据搜索条件 获取数据
func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) {
	//检测是from_size查询还是search_after滚动式查询
	isSearchAfter, from, size := checkParam(req)
	//连接Es
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	//准备查询条件
	searchService := client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition).Sort(req.SortField.Field, req.SortField.Sort)

	if isSearchAfter == false {
		//分页查询Es
		searchResult, err = searchService.From(from).Size(size).Do(ctx)
	}

	if isSearchAfter == true {
		//滚动式查询Es
		searchResult, err = searchService.
			SearchAfter(req.SearchAfter.Value).
			From(from).Size(size).
			Sort(req.SortField.Field, req.SortField.Sort).
			Do(ctx)
	}

	return
}
//获取搜索service 准备数据
func GetSearchService(req GetListReq) (searchService *elastic.SearchService,err error) {
	//连接Es
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	_, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	//准备查询条件
	searchService = client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition)
	return
}
func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult,err error) {
	//检测是from_size查询还是search_after滚动式查询
	isSearchAfter, from, size := checkDoParam(req)
	//连接Es
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	//准备查询条件

	if isSearchAfter == false {
		//分页查询Es
		searchResult, err = req.SearchService.From(from).Size(size).Do(ctx)
	}

	if isSearchAfter == true {
		//滚动式查询Es
		searchResult, err = req.SearchService.
			SearchAfter(req.SearchAfter.Value).
			From(from).Size(size).
			Sort(req.SortField.Field, req.SortField.Sort).
			Do(ctx)
	}

	return
}
//检验参数 区分 分页查询 滚动式查询
func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size int) {
	if req.Page != 0 && req.Limit != 0 {
		from = (req.Page - 1) * req.Limit
		size = req.Limit
	}
	if req.SearchAfter != nil {
		from = 0
		size = 50
		isSearchAfter = true
	}
	return
}
//检验参数 区分 分页查询 滚动式查询
func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) {
	if req.Page != 0 && req.Limit != 0 {
		from = (req.Page - 1) * req.Limit
		size = req.Limit
	}
	if req.SearchAfter != nil {
		from = 0
		size = 50
		isSearchAfter = true
	}
	return
}

//批量处理 准备数据 1.添加 2.更新 3.删除
func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) {

	bulkIndexAdd := new(elastic.BulkIndexRequest)
	bulkIndexUpdate := new(elastic.BulkUpdateRequest)
	bulkIndexDel := new(elastic.BulkDeleteRequest)

	switch req.Type {
	case "add":
		bulkIndexAdd = BulkAdd(req)
	case "update":
		bulkIndexUpdate = BulkUpdate(req)
	case "del":
		bulkIndexDel = BulkDel(req)

	}
	bulkIndex.BulkIndexRequest = new(elastic.BulkIndexRequest)
	bulkIndex.BulkUpdateRequest = new(elastic.BulkUpdateRequest)
	bulkIndex.BulkDeleteRequest = new(elastic.BulkDeleteRequest)
	if bulkIndexAdd != nil {
		bulkIndex.BulkIndexRequest = bulkIndexAdd
	}
	if bulkIndexUpdate != nil {
		bulkIndex.BulkUpdateRequest = bulkIndexUpdate
	}
	if bulkIndexDel != nil {
		bulkIndex.BulkDeleteRequest = bulkIndexDel
	}
	return
}

//批量处理 准备数据 1.添加
func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) {
	indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
	return
}

//批量处理 准备数据 1.删除
func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) {
	indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId)
	return
}

//批量处理 准备数据 1.更新
func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) {
	indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
	return
}

//批量处理数据
func BulkDo(esRequest []elastic.BulkableRequest) (bulkResponse *elastic.BulkResponse, err error) {
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx)
	if err!=nil{
		err = errors.New("BulkRestApi Bulk操作错误"+err.Error())
		logs.Error("BulkRestApi Bulk操作错误:【%s】",err.Error())
	}
	return
}

//通过script更新文档 更新文档中某一字段
func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateResponse,err error) {
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	updateResponse, err = client.Update().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
	if err != nil {
		err = errors.New("script更新文档失败"+err.Error())
		logs.Error("script更新文档失败:【%s】",err.Error())
		return
	}
	return
}

//同BulkDo  BulkDo http处理 BulkProcessor tcp处理 相对更安全些
func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){
	if numDocs>30{
		err = errors.New("请合理输入参数,批量处理最大为30")
		return
	}
	client, err := connection()
	if err != nil || client == nil {
		err = errors.New("elastic连接失败"+err.Error())
		return
	}
	defer client.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	var beforeRequests int64
	var before int64
	var afters int64
	var failures int64
	var afterRequests int64
	beforeFn := func(executionId int64, requests []elastic.BulkableRequest) {
		atomic.AddInt64(&beforeRequests, int64(len(requests)))
		atomic.AddInt64(&before, 1)
		logs.Info("elastic","序号:{%d} 开始执行 {%d} 条数据批量操作。", executionId,len(requests))
	}
	afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
		// 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
		atomic.AddInt64(&afters, 1)
		if err != nil {
			atomic.AddInt64(&failures, 1)
		}
		atomic.AddInt64(&afterRequests, int64(len(requests)))

		if response.Errors{
			logs.Error("BulkProcessor afterFn错误 %s",response.Failed())
		}else{
			logs.Info("log","序号:{%d} ,执行 {%d} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took)
		}

	}

	//每添加30个request,执行一次bulk操作
	p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx)
	if err != nil {
		logs.Error("BulkProcessor Bulk操作错误:【%s】",err.Error())
	}
	for _,v:=range esRequest{
		p.Add(v)
	}
	err =p.Flush()
	if err != nil {
		logs.Error("BulkProcessor Flush错误:【%s】",err.Error())
	}
	err = p.Close()
	if err != nil {
		logs.Error("BulkProcessor Close错误:【%s】",err.Error())
	}

	return
}