service_test.go 6.2 KB
package elastic

import (
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
	"time"
)
//绑定结构体
type JdRegion struct {
	Title string `json:"title"`
	Id int `json:"id"`
	Cover string `json:"cover"`
}
//通过文档Id获取文档数据
func testDocId()  {
	res,err:=GetDoc(GetDocReq{
		DocId: "goods_3281484",//文档Id
		IndexName: "goods_app_index_test",//索引名称
	})
	var jdRegion JdRegion
	err =json.Unmarshal(res.Source, &jdRegion)//数据解析绑定结构体
	fmt.Println(jdRegion,err)
}
//通过搜索条件获取数据总数
func testGetCount()  {
	//搜索条件,根据自身业务调整
	q :=elastic.NewBoolQuery()
	q=q.Filter(elastic.NewTermQuery("status", 1))

	//通过条件获取条数
	res,err:=GetCount(GetCountReq{
		Condition:q, //搜索条件
		IndexName: "goods_app_index_test",//索引名称
	})

	fmt.Println(res,err)
}
//通过搜索条件 分页 获取数据
func testGetListByFromSize()  {
	//搜索条件,根据自身业务调整
	q :=elastic.NewBoolQuery()
	q=q.Filter(elastic.NewTermQuery("status", 1))

	//通过条件获取数据列表
	res,err:=GetList(GetListReq{
		Condition:q,//搜索条件
		IndexName: "goods_app_index_test",//索引名称
		Page: 1,//页码
		Limit: 10,//条数
	})
	fmt.Println(err)
	//循环列表数据
	for _, hit := range res.Hits.Hits {
		var jdRegion JdRegion
		err := json.Unmarshal(hit.Source, &jdRegion) //数据解析绑定结构体
		fmt.Println(err)
		fmt.Println(jdRegion)
	}

}
//通过搜索条件 滚动式查询 不可分页 获取数据
func testGetListSearchAfter()  {
	//搜索条件,根据自身业务调整
	q :=elastic.NewBoolQuery()
	q=q.Filter(elastic.NewTermQuery("status", 1))
	//通过条件获取数据列表
	res,err:=GetList(GetListReq{
		Condition:q,//搜索条件
		IndexName: "goods_app_index_test",//索引名称
		SearchAfter:&SearchAfter{
			Value: 0,//从商品Id大于0开始查询
		},
		SortField: struct { //滚动式查询需排序数据
			Field string `json:"field"`//排序字段名称 需指定数值型字段 例如商品Id
			Sort  bool   `json:"sort" `//正序 true  倒叙 false
		}{
			"id",//商品Id
			true,//正序
		},
	})
	//循环列表数据
	for _, hit := range res.Hits.Hits {
		var jdRegion JdRegion
		err := json.Unmarshal(hit.Source, &jdRegion) ///数据解析绑定结构体
		fmt.Println(err)
		fmt.Println(jdRegion)
	}
	fmt.Println(err)
}

type AddChooseGoodESParam struct {
	SellerId          int         `json:"seller_id"`
	MyJoinField       MyJoinField `json:"my_join_field"`
	AddTimeChooseGood int64       `json:"add_time_choose_good"`
	ChannelId         int         `json:"channel_id"`
	GroupIds          string      `json:"groups_ids"`
}
type MyJoinField struct {
	Name   string `json:"name"`
	Parent string `json:"parent"`
}
//restapi http方式批量处理数据 1.增加 add 2.更新 update 3,删除 delete
func testBulk()  {
	//初始化数据切片
	esRequest := make([]elastic.BulkableRequest, 0)
	//定义文档Id,路由Id,如有父子文档定义父文档Id
	var i []int
	i = []int{2885234, 2885240, 2885235}
	for _, v := range i {
		docId := fmt.Sprintf("%s_%d_%d", "seller", 1, v)
		routingId := fmt.Sprintf("%s_%d", "goods", v)
		parent := fmt.Sprintf("%s_%d", "goods", v)
		//自定义结构体 赋值需要处理的数据
		esParam := AddChooseGoodESParam{
			SellerId:          1,
			MyJoinField:       MyJoinField{Name: "seller", Parent: parent},
			AddTimeChooseGood: time.Now().UnixNano(),
			ChannelId:         0,
			GroupIds:          "0",
		}
		//准备批量数据
		res, err := Bulk(BulkReq{
			Type:      "add",                  //处理方式1.add 2.update 3.delete
			IndexName: "goods_app_index_test", //索引名称
			DocId:     docId,                  //文档Id
			RoutingId: routingId,              //路由Id,指定路由更快速定位到分片,提高处理速度,
			Doc:       esParam,                //数据,interface类型
		})
		fmt.Println(err, res)
		//添加到切片中
		esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
	}
	//操作处理添加到索引中
	bulkResponse,err:=BulkDo(esRequest)
	fmt.Println(err)
	//结果状态 新增201为成功 修改200为成功 删除200为成功 具体其他状态请查阅文档
	for _, val := range bulkResponse.Indexed() {
		fmt.Println("新增成功",val.Status)
		fmt.Println("修改成功",val.Status)
		fmt.Println("删除成功",val.Status)
	}

}
//根据script修改文档中部分字段
func testScript()  {
	//script为修改语句
	script := fmt.Sprintf("%s'%s'", "ctx._source.group_ids = ", "0-1")
	updateResponse,err :=UpdateByScript(UpdateByScriptReq{
		DocId:     "goods_3281484",//文档Id
		RoutingId: "goods_3281484",//路由Id
		ScriptCtx: script,//script语句
		IndexName: "goods_app_index_test",//索引名称
	})
	fmt.Println(err,updateResponse)
	return
}
//tcp方式批量处理数据 更安全快速 1.增加 add 2.更新 update 3,删除 delete
func testBulkProcessor() {
	//初始化数据切片
	esRequest := make([]elastic.BulkableRequest, 0)
	var i []int
	i = []int{2885234, 2885240, 2885235}
	for _, v := range i {
		//定义文档Id,路由Id,如有父子文档定义父文档Id
		docId := fmt.Sprintf("%s_%d_%d", "seller", 2, v)
		routingId := fmt.Sprintf("%s_%d", "goods", v)
		parent := fmt.Sprintf("%s_%d", "goods", v)
		//自定义结构体 赋值需要处理的数据
		esParam := AddChooseGoodESParam{
			SellerId:          1,
			MyJoinField:       MyJoinField{Name: "seller", Parent: parent},
			AddTimeChooseGood: time.Now().UnixNano(),
			ChannelId:         0,
			GroupIds:          "0",
		}
		//准备批量数据
		res, err := Bulk(BulkReq{
			Type:      "add",                  //处理方式1.add 2.update 3.delete
			IndexName: "goods_app_index_test", //索引名称
			DocId:     docId,                  //文档Id
			RoutingId: routingId,              //路由Id,指定路由更快速定位到分片,提高处理速度,
			Doc:       esParam,                //数据,interface类型
		})
		fmt.Println("------", err, res)
		//添加到切片中
		esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
	}
	//操作处理添加到索引中 numDocs 一次批量处理的数据最多30个
	err := BulkProcessor(esRequest,  3)
	fmt.Println(err)
	return
}