提交 918118ee authored 作者: 屈传平's avatar 屈传平

es

上级 522c9791
...@@ -13,11 +13,11 @@ const ( ...@@ -13,11 +13,11 @@ const (
refresh = "true" refresh = "true"
) )
//根据文档Id 快速获取内容 // 根据文档Id 快速获取内容
func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){ func GetDoc(req GetDocReq) (docContent *elastic.GetResult, err error) {
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败:"+err.Error()) err = errors.New("elastic连接失败:" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -25,18 +25,18 @@ func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){ ...@@ -25,18 +25,18 @@ func GetDoc(req GetDocReq) (docContent *elastic.GetResult,err error){
defer cancel() defer cancel()
docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx) docContent, err = client.Get().Index(req.IndexName).Id(req.DocId).Do(ctx)
if err != nil { if err != nil {
err = errors.New("获取文档失败,错误原因:"+err.Error()) err = errors.New("获取文档失败,错误原因:" + err.Error())
logs.Error("获取文档失败 错误原因:【%s】",err.Error()) logs.Error("获取文档失败 错误原因:【%s】", err.Error())
return return
} }
return return
} }
//根据搜索条件 获取总数 // 根据搜索条件 获取总数
func GetCount(req GetCountReq) (count int64,err error) { func GetCount(req GetCountReq) (count int64, err error) {
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -45,21 +45,21 @@ func GetCount(req GetCountReq) (count int64,err error) { ...@@ -45,21 +45,21 @@ func GetCount(req GetCountReq) (count int64,err error) {
countService := client.Count(req.IndexName).Pretty(true) countService := client.Count(req.IndexName).Pretty(true)
count, err = countService.Query(req.Condition).Do(ctx) count, err = countService.Query(req.Condition).Do(ctx)
if err != nil { if err != nil {
err = errors.New("获取总数失败,错误原因:"+err.Error()) err = errors.New("获取总数失败,错误原因:" + err.Error())
logs.Error("GetCount 获取总数失败:【%s】",err.Error()) logs.Error("GetCount 获取总数失败:【%s】", err.Error())
return return
} }
return return
} }
//根据搜索条件 获取数据 // 根据搜索条件 获取数据
func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) { func GetList(req GetListReq) (searchResult *elastic.SearchResult, err error) {
//检测是from_size查询还是search_after滚动式查询 //检测是from_size查询还是search_after滚动式查询
isSearchAfter, from, size := checkParam(req) isSearchAfter, from, size := checkParam(req)
//连接Es //连接Es
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -84,12 +84,13 @@ func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) { ...@@ -84,12 +84,13 @@ func GetList(req GetListReq) (searchResult *elastic.SearchResult,err error) {
return return
} }
//获取搜索service 准备数据
func GetSearchService(req GetListReq) (searchService *elastic.SearchService,err error) { // 获取搜索service 准备数据
func GetSearchService(req GetListReq) (searchService *elastic.SearchService, err error) {
//连接Es //连接Es
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -99,13 +100,13 @@ func GetSearchService(req GetListReq) (searchService *elastic.SearchService,err ...@@ -99,13 +100,13 @@ func GetSearchService(req GetListReq) (searchService *elastic.SearchService,err
searchService = client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition) searchService = client.Search().Index(req.IndexName).Pretty(true).Query(req.Condition)
return return
} }
func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult,err error) { func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult, err error) {
//检测是from_size查询还是search_after滚动式查询 //检测是from_size查询还是search_after滚动式查询
isSearchAfter, from, size := checkDoParam(req) isSearchAfter, from, size := checkDoParam(req)
//连接Es //连接Es
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -129,7 +130,8 @@ func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult ...@@ -129,7 +130,8 @@ func DoSearchService(req DoSearchServiceReq) (searchResult *elastic.SearchResult
return return
} }
//检验参数 区分 分页查询 滚动式查询
// 检验参数 区分 分页查询 滚动式查询
func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size int) { func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size int) {
if req.Page != 0 && req.Limit != 0 { if req.Page != 0 && req.Limit != 0 {
from = (req.Page - 1) * req.Limit from = (req.Page - 1) * req.Limit
...@@ -142,7 +144,8 @@ func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size in ...@@ -142,7 +144,8 @@ func checkDoParam(req DoSearchServiceReq) (isSearchAfter bool, from int, size in
} }
return return
} }
//检验参数 区分 分页查询 滚动式查询
// 检验参数 区分 分页查询 滚动式查询
func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) { func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) {
if req.Page != 0 && req.Limit != 0 { if req.Page != 0 && req.Limit != 0 {
from = (req.Page - 1) * req.Limit from = (req.Page - 1) * req.Limit
...@@ -156,7 +159,7 @@ func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) { ...@@ -156,7 +159,7 @@ func checkParam(req GetListReq) (isSearchAfter bool, from int, size int) {
return return
} }
//批量处理 准备数据 1.添加 2.更新 3.删除 // 批量处理 准备数据 1.添加 2.更新 3.删除
func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) { func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) {
bulkIndexAdd := new(elastic.BulkIndexRequest) bulkIndexAdd := new(elastic.BulkIndexRequest)
...@@ -187,47 +190,47 @@ func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) { ...@@ -187,47 +190,47 @@ func Bulk(req BulkReq) (bulkIndex BulkIndex, err error) {
return return
} }
//批量处理 准备数据 1.添加 // 批量处理 准备数据 1.添加
func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) { func BulkAdd(req BulkReq) (indexReq *elastic.BulkIndexRequest) {
indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc) indexReq = elastic.NewBulkIndexRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return return
} }
//批量处理 准备数据 1.删除 // 批量处理 准备数据 1.删除
func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) { func BulkDel(req BulkReq) (indexReq *elastic.BulkDeleteRequest) {
indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId) indexReq = elastic.NewBulkDeleteRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId)
return return
} }
//批量处理 准备数据 1.更新 // 批量处理 准备数据 1.更新
func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) { func BulkUpdate(req BulkReq) (indexReq *elastic.BulkUpdateRequest) {
indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc) indexReq = elastic.NewBulkUpdateRequest().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Doc(req.Doc)
return return
} }
//批量处理数据 // 批量处理数据
func BulkDo(esRequest []elastic.BulkableRequest) (bulkResponse *elastic.BulkResponse, err error) { func BulkDo(esRequest []elastic.BulkableRequest) (bulkResponse *elastic.BulkResponse, err error) {
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx) bulkResponse, err = client.Bulk().Add(esRequest...).Refresh(refresh).Do(ctx)
if err!=nil{ if err != nil {
err = errors.New("BulkRestApi Bulk操作错误"+err.Error()) err = errors.New("BulkRestApi Bulk操作错误" + err.Error())
logs.Error("BulkRestApi Bulk操作错误:【%s】",err.Error()) logs.Error("BulkRestApi Bulk操作错误:【%s】", err.Error())
} }
return return
} }
//通过script更新文档 更新文档中某一字段 // 通过script更新文档 更新文档中某一字段
func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateResponse,err error) { func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateResponse, err error) {
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -235,22 +238,22 @@ func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateRespon ...@@ -235,22 +238,22 @@ func UpdateByScript(req UpdateByScriptReq) (updateResponse *elastic.UpdateRespon
defer cancel() defer cancel()
updateResponse, err = client.Update().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx) updateResponse, err = client.Update().Index(req.IndexName).Id(req.DocId).Routing(req.RoutingId).Script(elastic.NewScript(req.ScriptCtx)).Do(ctx)
if err != nil { if err != nil {
err = errors.New("script更新文档失败"+err.Error()) err = errors.New("script更新文档失败" + err.Error())
logs.Error("script更新文档失败:【%s】",err.Error()) logs.Error("script更新文档失败:【%s】", err.Error())
return return
} }
return return
} }
//同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些 // 同BulkDo BulkDo http处理 BulkProcessor tcp处理 相对更安全些
func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){ func BulkProcessor(esRequest []elastic.BulkableRequest, numDocs int) (err error) {
if numDocs>30{ if numDocs > 30 {
err = errors.New("请合理输入参数,批量处理最大为30") err = errors.New("请合理输入参数,批量处理最大为30")
return return
} }
client, err := connection() client, err := connection()
if err != nil || client == nil { if err != nil || client == nil {
err = errors.New("elastic连接失败"+err.Error()) err = errors.New("elastic连接失败" + err.Error())
return return
} }
defer client.Stop() defer client.Stop()
...@@ -264,7 +267,7 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){ ...@@ -264,7 +267,7 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){
beforeFn := func(executionId int64, requests []elastic.BulkableRequest) { beforeFn := func(executionId int64, requests []elastic.BulkableRequest) {
atomic.AddInt64(&beforeRequests, int64(len(requests))) atomic.AddInt64(&beforeRequests, int64(len(requests)))
atomic.AddInt64(&before, 1) atomic.AddInt64(&before, 1)
logs.Info("elastic","序号:{%d} 开始执行 {%d} 条数据批量操作。", executionId,len(requests)) logs.Info("elastic", "序号:{%d} 开始执行 {%d} 条数据批量操作。", executionId, len(requests))
} }
afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { afterFn := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
// 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误 // 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
...@@ -274,10 +277,10 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){ ...@@ -274,10 +277,10 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){
} }
atomic.AddInt64(&afterRequests, int64(len(requests))) atomic.AddInt64(&afterRequests, int64(len(requests)))
if response.Errors{ if response.Errors {
logs.Error("BulkProcessor afterFn错误 %s",response.Failed()) logs.Error("BulkProcessor afterFn错误 %s", response.Failed())
}else{ } else {
logs.Info("log","序号:{%d} ,执行 {%d} 条数据批量操作成功,共耗费{%d}毫秒",executionId,len(requests),response.Took) logs.Info("log", "序号:{%d} ,执行 {%d} 条数据批量操作成功,共耗费{%d}毫秒", executionId, len(requests), response.Took)
} }
} }
...@@ -285,18 +288,18 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){ ...@@ -285,18 +288,18 @@ func BulkProcessor(esRequest []elastic.BulkableRequest,numDocs int) (err error){
//每添加30个request,执行一次bulk操作 //每添加30个request,执行一次bulk操作
p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx) p, err := client.BulkProcessor().Name("Worker-1").Before(beforeFn).After(afterFn).Stats(true).BulkActions(numDocs).Do(ctx)
if err != nil { if err != nil {
logs.Error("BulkProcessor Bulk操作错误:【%s】",err.Error()) logs.Error("BulkProcessor Bulk操作错误:【%s】", err.Error())
} }
for _,v:=range esRequest{ for _, v := range esRequest {
p.Add(v) p.Add(v)
} }
err =p.Flush() err = p.Flush()
if err != nil { if err != nil {
logs.Error("BulkProcessor Flush错误:【%s】",err.Error()) logs.Error("BulkProcessor Flush错误:【%s】", err.Error())
} }
err = p.Close() err = p.Close()
if err != nil { if err != nil {
logs.Error("BulkProcessor Close错误:【%s】",err.Error()) logs.Error("BulkProcessor Close错误:【%s】", err.Error())
} }
return return
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论