package elastic import ( "encoding/json" "fmt" "github.com/olivere/elastic/v7" "time" ) //绑定结构体 type JdRegion struct { Title string `json:"title"` Id int `json:"id"` Cover string `json:"cover"` } //通过文档Id获取文档数据 func testDocId() { res,err:=GetDoc(GetDocReq{ DocId: "goods_3281484",//文档Id IndexName: "goods_app_index_test",//索引名称 }) var jdRegion JdRegion err =json.Unmarshal(res.Source, &jdRegion)//数据解析绑定结构体 fmt.Println(jdRegion,err) } //通过搜索条件获取数据总数 func testGetCount() { //搜索条件,根据自身业务调整 q :=elastic.NewBoolQuery() q=q.Filter(elastic.NewTermQuery("status", 1)) //通过条件获取条数 res,err:=GetCount(GetCountReq{ Condition:q, //搜索条件 IndexName: "goods_app_index_test",//索引名称 }) fmt.Println(res,err) } //通过搜索条件 分页 获取数据 func testGetListByFromSize() { //搜索条件,根据自身业务调整 q :=elastic.NewBoolQuery() q=q.Filter(elastic.NewTermQuery("status", 1)) //通过条件获取数据列表 res,err:=GetList(GetListReq{ Condition:q,//搜索条件 IndexName: "goods_app_index_test",//索引名称 Page: 1,//页码 Limit: 10,//条数 }) fmt.Println(err) //循环列表数据 for _, hit := range res.Hits.Hits { var jdRegion JdRegion err := json.Unmarshal(hit.Source, &jdRegion) //数据解析绑定结构体 fmt.Println(err) fmt.Println(jdRegion) } } //通过搜索条件 滚动式查询 不可分页 获取数据 func testGetListSearchAfter() { //搜索条件,根据自身业务调整 q :=elastic.NewBoolQuery() q=q.Filter(elastic.NewTermQuery("status", 1)) //通过条件获取数据列表 res,err:=GetList(GetListReq{ Condition:q,//搜索条件 IndexName: "goods_app_index_test",//索引名称 SearchAfter:&SearchAfter{ Value: 0,//从商品Id大于0开始查询 }, SortField: struct { //滚动式查询需排序数据 Field string `json:"field"`//排序字段名称 需指定数值型字段 例如商品Id Sort bool `json:"sort" `//正序 true 倒叙 false }{ "id",//商品Id true,//正序 }, }) //循环列表数据 for _, hit := range res.Hits.Hits { var jdRegion JdRegion err := json.Unmarshal(hit.Source, &jdRegion) ///数据解析绑定结构体 fmt.Println(err) fmt.Println(jdRegion) } fmt.Println(err) } type AddChooseGoodESParam struct { SellerId int `json:"seller_id"` MyJoinField MyJoinField `json:"my_join_field"` AddTimeChooseGood int64 `json:"add_time_choose_good"` ChannelId int `json:"channel_id"` GroupIds string `json:"groups_ids"` } type MyJoinField struct { Name string `json:"name"` Parent string `json:"parent"` } //restapi http方式批量处理数据 1.增加 add 2.更新 update 3,删除 delete func testBulk() { //初始化数据切片 esRequest := make([]elastic.BulkableRequest, 0) //定义文档Id,路由Id,如有父子文档定义父文档Id var i []int i = []int{2885234, 2885240, 2885235} for _, v := range i { docId := fmt.Sprintf("%s_%d_%d", "seller", 1, v) routingId := fmt.Sprintf("%s_%d", "goods", v) parent := fmt.Sprintf("%s_%d", "goods", v) //自定义结构体 赋值需要处理的数据 esParam := AddChooseGoodESParam{ SellerId: 1, MyJoinField: MyJoinField{Name: "seller", Parent: parent}, AddTimeChooseGood: time.Now().UnixNano(), ChannelId: 0, GroupIds: "0", } //准备批量数据 res, err := Bulk(BulkReq{ Type: "add", //处理方式1.add 2.update 3.delete IndexName: "goods_app_index_test", //索引名称 DocId: docId, //文档Id RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度, Doc: esParam, //数据,interface类型 }) fmt.Println(err, res) //添加到切片中 esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest } //操作处理添加到索引中 bulkResponse,err:=BulkDo(esRequest) fmt.Println(err) //结果状态 新增201为成功 修改200为成功 删除200为成功 具体其他状态请查阅文档 for _, val := range bulkResponse.Indexed() { fmt.Println("新增成功",val.Status) fmt.Println("修改成功",val.Status) fmt.Println("删除成功",val.Status) } } //根据script修改文档中部分字段 func testScript() { //script为修改语句 script := fmt.Sprintf("%s'%s'", "ctx._source.group_ids = ", "0-1") updateResponse,err :=UpdateByScript(UpdateByScriptReq{ DocId: "goods_3281484",//文档Id RoutingId: "goods_3281484",//路由Id ScriptCtx: script,//script语句 IndexName: "goods_app_index_test",//索引名称 }) fmt.Println(err,updateResponse) return } //tcp方式批量处理数据 更安全快速 1.增加 add 2.更新 update 3,删除 delete func testBulkProcessor() { //初始化数据切片 esRequest := make([]elastic.BulkableRequest, 0) var i []int i = []int{2885234, 2885240, 2885235} for _, v := range i { //定义文档Id,路由Id,如有父子文档定义父文档Id docId := fmt.Sprintf("%s_%d_%d", "seller", 2, v) routingId := fmt.Sprintf("%s_%d", "goods", v) parent := fmt.Sprintf("%s_%d", "goods", v) //自定义结构体 赋值需要处理的数据 esParam := AddChooseGoodESParam{ SellerId: 1, MyJoinField: MyJoinField{Name: "seller", Parent: parent}, AddTimeChooseGood: time.Now().UnixNano(), ChannelId: 0, GroupIds: "0", } //准备批量数据 res, err := Bulk(BulkReq{ Type: "add", //处理方式1.add 2.update 3.delete IndexName: "goods_app_index_test", //索引名称 DocId: docId, //文档Id RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度, Doc: esParam, //数据,interface类型 }) fmt.Println("------", err, res) //添加到切片中 esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest } //操作处理添加到索引中 numDocs 一次批量处理的数据最多30个 err := BulkProcessor(esRequest, 3) fmt.Println(err) return }