1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package elastic
import (
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"time"
)
//绑定结构体
type JdRegion struct {
Title string `json:"title"`
Id int `json:"id"`
Cover string `json:"cover"`
}
//通过文档Id获取文档数据
func testDocId() {
res,err:=GetDoc(GetDocReq{
DocId: "goods_3281484",//文档Id
IndexName: "goods_app_index_test",//索引名称
})
var jdRegion JdRegion
err =json.Unmarshal(res.Source, &jdRegion)//数据解析绑定结构体
fmt.Println(jdRegion,err)
}
//通过搜索条件获取数据总数
func testGetCount() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取条数
res,err:=GetCount(GetCountReq{
Condition:q, //搜索条件
IndexName: "goods_app_index_test",//索引名称
})
fmt.Println(res,err)
}
//通过搜索条件 分页 获取数据
func testGetListByFromSize() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取数据列表
res,err:=GetList(GetListReq{
Condition:q,//搜索条件
IndexName: "goods_app_index_test",//索引名称
Page: 1,//页码
Limit: 10,//条数
})
fmt.Println(err)
//循环列表数据
for _, hit := range res.Hits.Hits {
var jdRegion JdRegion
err := json.Unmarshal(hit.Source, &jdRegion) //数据解析绑定结构体
fmt.Println(err)
fmt.Println(jdRegion)
}
}
//通过搜索条件 滚动式查询 不可分页 获取数据
func testGetListSearchAfter() {
//搜索条件,根据自身业务调整
q :=elastic.NewBoolQuery()
q=q.Filter(elastic.NewTermQuery("status", 1))
//通过条件获取数据列表
res,err:=GetList(GetListReq{
Condition:q,//搜索条件
IndexName: "goods_app_index_test",//索引名称
SearchAfter:&SearchAfter{
Value: 0,//从商品Id大于0开始查询
},
SortField: struct { //滚动式查询需排序数据
Field string `json:"field"`//排序字段名称 需指定数值型字段 例如商品Id
Sort bool `json:"sort" `//正序 true 倒叙 false
}{
"id",//商品Id
true,//正序
},
})
//循环列表数据
for _, hit := range res.Hits.Hits {
var jdRegion JdRegion
err := json.Unmarshal(hit.Source, &jdRegion) ///数据解析绑定结构体
fmt.Println(err)
fmt.Println(jdRegion)
}
fmt.Println(err)
}
type AddChooseGoodESParam struct {
SellerId int `json:"seller_id"`
MyJoinField MyJoinField `json:"my_join_field"`
AddTimeChooseGood int64 `json:"add_time_choose_good"`
ChannelId int `json:"channel_id"`
GroupIds string `json:"groups_ids"`
}
type MyJoinField struct {
Name string `json:"name"`
Parent string `json:"parent"`
}
//restapi http方式批量处理数据 1.增加 add 2.更新 update 3,删除 delete
func testBulk() {
//初始化数据切片
esRequest := make([]elastic.BulkableRequest, 0)
//定义文档Id,路由Id,如有父子文档定义父文档Id
var i []int
i = []int{2885234, 2885240, 2885235}
for _, v := range i {
docId := fmt.Sprintf("%s_%d_%d", "seller", 1, v)
routingId := fmt.Sprintf("%s_%d", "goods", v)
parent := fmt.Sprintf("%s_%d", "goods", v)
//自定义结构体 赋值需要处理的数据
esParam := AddChooseGoodESParam{
SellerId: 1,
MyJoinField: MyJoinField{Name: "seller", Parent: parent},
AddTimeChooseGood: time.Now().UnixNano(),
ChannelId: 0,
GroupIds: "0",
}
//准备批量数据
res, err := Bulk(BulkReq{
Type: "add", //处理方式1.add 2.update 3.delete
IndexName: "goods_app_index_test", //索引名称
DocId: docId, //文档Id
RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度,
Doc: esParam, //数据,interface类型
})
fmt.Println(err, res)
//添加到切片中
esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
}
//操作处理添加到索引中
bulkResponse,err:=BulkDo(esRequest)
fmt.Println(err)
//结果状态 新增201为成功 修改200为成功 删除200为成功 具体其他状态请查阅文档
for _, val := range bulkResponse.Indexed() {
fmt.Println("新增成功",val.Status)
fmt.Println("修改成功",val.Status)
fmt.Println("删除成功",val.Status)
}
}
//根据script修改文档中部分字段
func testScript() {
//script为修改语句
script := fmt.Sprintf("%s'%s'", "ctx._source.group_ids = ", "0-1")
updateResponse,err :=UpdateByScript(UpdateByScriptReq{
DocId: "goods_3281484",//文档Id
RoutingId: "goods_3281484",//路由Id
ScriptCtx: script,//script语句
IndexName: "goods_app_index_test",//索引名称
})
fmt.Println(err,updateResponse)
return
}
//tcp方式批量处理数据 更安全快速 1.增加 add 2.更新 update 3,删除 delete
func testBulkProcessor() {
//初始化数据切片
esRequest := make([]elastic.BulkableRequest, 0)
var i []int
i = []int{2885234, 2885240, 2885235}
for _, v := range i {
//定义文档Id,路由Id,如有父子文档定义父文档Id
docId := fmt.Sprintf("%s_%d_%d", "seller", 2, v)
routingId := fmt.Sprintf("%s_%d", "goods", v)
parent := fmt.Sprintf("%s_%d", "goods", v)
//自定义结构体 赋值需要处理的数据
esParam := AddChooseGoodESParam{
SellerId: 1,
MyJoinField: MyJoinField{Name: "seller", Parent: parent},
AddTimeChooseGood: time.Now().UnixNano(),
ChannelId: 0,
GroupIds: "0",
}
//准备批量数据
res, err := Bulk(BulkReq{
Type: "add", //处理方式1.add 2.update 3.delete
IndexName: "goods_app_index_test", //索引名称
DocId: docId, //文档Id
RoutingId: routingId, //路由Id,指定路由更快速定位到分片,提高处理速度,
Doc: esParam, //数据,interface类型
})
fmt.Println("------", err, res)
//添加到切片中
esRequest = append(esRequest, res.BulkIndexRequest) //BulkIndexRequest,BulkUpdateRequest,BulkDeleteRequest
}
//操作处理添加到索引中 numDocs 一次批量处理的数据最多30个
err := BulkProcessor(esRequest, 3)
fmt.Println(err)
return
}