package elastic import ( "context" "errors" "github.com/olivere/elastic/v7" "gitlab.jxhh.com/stbz/library.git/logs" "sync/atomic" "time" ) const ( refresh = "true" ) //根据文档Id 快速获取内容 func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){ client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败:"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx) if err != nil { err = errors.New("获取文档失败,错误原因:"+err.Error()) logs.Error("获取文档失败 错误原因:【%s】",err.Error()) return } return } //根据搜索条件 获取总数 func GetCount(req GetCountReq) (count int64,err error) { client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() countService := client.Count(req.IndexName).Pretty(true) count, err = countService.Query(req.Condition).Do(ctx) if err != nil { err = errors.New("获取总数失败,错误原因:"+err.Error()) logs.Error("GetCount 获取总数失败:【%s】",err.Error()) return } return } //根据搜索条件 获取数据 func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) { //检测是from_size查询还是search_after滚动式查询 isSearchAfter, from, size := checkParam(req) //连接Es client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() //准备查询条件 searchService := client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition).Sort(req.SortField.Field, req.SortField.Sort) if isSearchAfter == false { //分页查询Es searchResult, err = searchService.From(from).Size(size).Do(ctx) } if isSearchAfter == true { //滚动式查询Es searchResult, err = searchService. SearchAfter(req.SearchAfter.Value). From(from).Size(size). Sort(req.SortField.Field, req.SortField.Sort). Do(ctx) } return } //获取搜索service 准备数据 func GetSearchService(req GetListReq) (searchService *elastic.SearchService,err error) { //连接Es client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() _, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() //准备查询条件 searchService = client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition) return } func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult,err error) { //检测是from_size查询还是search_after滚动式查询 isSearchAfter, from, size := checkDoParam(req) //连接Es client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() //准备查询条件 if isSearchAfter == false { //分页查询Es searchResult, err = req.SearchService.From(from).Size(size).Do(ctx) } if isSearchAfter == true { //滚动式查询Es searchResult, err = req.SearchService. SearchAfter(req.SearchAfter.Value). From(from).Size(size). Sort(req.SortField.Field, req.SortField.Sort). Do(ctx) } return } //检验参数 区分 分页查询 滚动式查询 func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size int) { if req.Page != 0 && req.Limit != 0 { from = (req.Page - 1) * req.Limit size = req.Limit } if req.SearchAfter != nil { from = 0 size = 50 isSearchAfter = true } return } //检验参数 区分 分页查询 滚动式查询 func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) { if req.Page != 0 && req.Limit != 0 { from = (req.Page - 1) * req.Limit size = req.Limit } if req.SearchAfter != nil { from = 0 size = 50 isSearchAfter = true } return } //批量处理 准备数据 1.添加 2.更新 3.删除 func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) { bulkIndexAdd := new(elastic.BulkIndexRequest) bulkIndexUpdate := new(elastic.BulkUpdateRequest) bulkIndexDel := new(elastic.BulkDeleteRequest) switch req.Type { case "add": bulkIndexAdd = BulkAdd(req) case "update": bulkIndexUpdate = BulkUpdate(req) case "del": bulkIndexDel = BulkDel(req) } bulkIndex.BulkIndexRequest = new(elastic.BulkIndexRequest) bulkIndex.BulkUpdateRequest = new(elastic.BulkUpdateRequest) bulkIndex.BulkDeleteRequest = new(elastic.BulkDeleteRequest) if bulkIndexAdd != nil { bulkIndex.BulkIndexRequest = bulkIndexAdd } if bulkIndexUpdate != nil { bulkIndex.BulkUpdateRequest = bulkIndexUpdate } if bulkIndexDel != nil { bulkIndex.BulkDeleteRequest = bulkIndexDel } return } //批量处理 准备数据 1.添加 func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) { indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc) return } //批量处理 准备数据 1.删除 func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) { indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId) return } //批量处理 准备数据 1.更新 func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) { indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc) return } //批量处理数据 func BulkDo(esRequest []elastic.BulkableRequest) (bulkResponse *elastic.BulkResponse, err error) { client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx) if err!=nil{ err = errors.New("BulkRestApi Bulk操作错误"+err.Error()) logs.Error("BulkRestApi Bulk操作错误:【%s】",err.Error()) } return } //通过script更新文档 更新文档中某一字段 func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateResponse,err error) { client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() updateResponse, err = client.Update().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx) if err != nil { err = errors.New("script更新文档失败"+err.Error()) logs.Error("script更新文档失败:【%s】",err.Error()) return } return } //同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些 func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){ if numDocs>30{ err = errors.New("请合理输入参数,批量处理最大为30") return } client, err := connection() if err != nil || client == nil { err = errors.New("elastic连接失败"+err.Error()) return } defer client.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var beforeRequests int64 var before int64 var afters int64 var failures int64 var afterRequests int64 beforeFn := func(executionId int64, requests []elastic.BulkableRequest) { atomic.AddInt64(&beforeRequests, int64(len(requests))) atomic.AddInt64(&before, 1) logs.Info("elastic","序号:{%d} 开始执行 {%d} 条数据批量操作。", executionId,len(requests)) } afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { // 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误 atomic.AddInt64(&afters, 1) if err != nil { atomic.AddInt64(&failures, 1) } atomic.AddInt64(&afterRequests, int64(len(requests))) if response.Errors{ logs.Error("BulkProcessor afterFn错误 %s",response.Failed()) }else{ logs.Info("log","序号:{%d} ,执行 {%d} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took) } } //每添加30个request,执行一次bulk操作 p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx) if err != nil { logs.Error("BulkProcessor Bulk操作错误:【%s】",err.Error()) } for _,v:=range esRequest{ p.Add(v) } err =p.Flush() if err != nil { logs.Error("BulkProcessor Flush错误:【%s】",err.Error()) } err = p.Close() if err != nil { logs.Error("BulkProcessor Close错误:【%s】",err.Error()) } return }