package notify import ( "encoding/json" "errors" "github.com/gogf/gf/frame/g" "github.com/gogf/gf/util/gconv" "gitlab.jxhh.com/stbz/library.git/logs" "math/rand" "strconv" "time" ) const ( //通知服务topic MsgOrderTopic = "messageOrder" MsgGoodsTopic = "messageGoods" //通知下游topic MsgSenderTopic = "messageSender" //云仓消息topic MsgCloudTopic = "newShopApi" //通知消息系统topic NotifyTopic = "notifyMessage" //API请求日志 ApiRequestTopic = "apiRequest" //消息类型 //商品消息 ProductExpire = 101 //商品下架 ProductModify = 102 //商品修改 ProductPrice = 103 //价格变更 ProductRepost = 104 //商品商家 //订单消息 OrderSendgoods = 201 //订单发货 OrderComfirmReceiveGoods = 202 //确认收货 OrderSuccess = 203 //交易成功 OrderCancel = 204 //订单取消 RefundApply = 300 //申请售后 RefundAgree = 301 //商家同意售后 RefundRefuse = 302 //商家拒绝售后 RefundSuccess = 303 //售后成功 //标签消息 TagsImport = 401 //导入标签 //选品消息 GoodsStorageAdd = 501 //添加选品库 GoodsStorageRemove = 502 //移除选品库 ErrMsgParamEmpty = "该类型消息对应字段不能为空" ErrMsgAppIDEmpty = "该类型消息appid字段不能为空" ErrMsgTpye = "错误的消息类型" ) //推送下游对应字段 var ( SendMsgType = g.MapIntStr{ ProductExpire: "goods.undercarriage", ProductModify: "goods.alter", ProductPrice: "goods.price.alter", ProductRepost: "goods.on.sale", OrderSendgoods: "order.delivery", OrderComfirmReceiveGoods: "order.delivered", OrderSuccess: "order.success", OrderCancel: "order.cancel", GoodsStorageAdd: "goods.storage.add", GoodsStorageRemove: "goods.storage.remove", TagsImport: "tags.import", RefundAgree: "afterSale.agree", RefundRefuse: "afterSale.refuse", RefundSuccess: "afterSale.success", } ) func (p *NsqProducer) msgValidator(notifyMessage *NotifyMessage) (err error) { switch gconv.Int(notifyMessage.Type) / 100 % 10 { case 1: var msgData *GoodsMsgData if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil { return } if len(msgData.GoodsIDs) == 0 { err = errors.New(ErrMsgParamEmpty) return } case 2: var msgData *OrderMsgData if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil { return } if msgData.OrderSn == "" || msgData.Sku == 0 { err = errors.New(ErrMsgParamEmpty) return } if notifyMessage.AppID == 0 { err = errors.New(ErrMsgAppIDEmpty) return } case 3: var msgData *RefundMsgData if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil { return } if msgData.OrderSn == "" || msgData.Sku == 0 || msgData.AfterSaleID == 0 { err = errors.New(ErrMsgParamEmpty) return } if notifyMessage.AppID == 0 { err = errors.New(ErrMsgAppIDEmpty) return } case 4: var msgData *TagsMsgData if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil { return } if msgData.Tags == "" { err = errors.New(ErrMsgParamEmpty) return } if notifyMessage.AppID == 0 { err = errors.New(ErrMsgAppIDEmpty) return } case 5: var msgData *GoodsMsgData if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil { return } if len(msgData.GoodsIDs) == 0 { err = errors.New(ErrMsgParamEmpty) return } if notifyMessage.AppID == 0 { err = errors.New(ErrMsgAppIDEmpty) return } default: err = errors.New(ErrMsgTpye) } return } /* 各服务推送通知到消息系统 20220114 gk */ func (p *NsqProducer) NotifyMessage(notifyMessage *NotifyMessage) (err error) { err = p.msgValidator(notifyMessage) if logs.CheckErr(err, "NotifyMessage") { return } r := rand.New(rand.NewSource(time.Now().UnixNano())) var randNums string for i := 0; i <= 2; i++ { randNums += strconv.Itoa(r.Intn(9)) } notifyMessage.ID = gconv.Int(gconv.String(time.Now().UnixNano()/1e6) + randNums) jsonBytes, _ := json.Marshal(notifyMessage) err = NsqProducers.Publish(NotifyTopic, string(jsonBytes)) if !logs.CheckErr(err, "NotifyMessage") { logs.Info("NotifyMessage", "消息内容:【%v】", string(jsonBytes)) } return } /* 推送通知到服务系统系统 20220114 gk ***************************************************** *******注意:该方法目前只有message-server使用 *********** ***************************************************** */ func (p *NsqProducer) NotifyServer(notifyServer *NotifyServer) (err error) { jsonBytes, _ := json.Marshal(notifyServer) var pushTopic string switch notifyServer.MsgType / 100 % 10 { case 1: pushTopic = MsgGoodsTopic case 2: pushTopic = MsgOrderTopic case 3: pushTopic = MsgOrderTopic } if pushTopic == "" { return } err = NsqProducers.Publish(NotifyTopic, string(jsonBytes)) if !logs.CheckErr(err, "NotifyServer") { logs.Info("NotifyServer", "消息内容:【%v】", string(jsonBytes)) } return } /* api请求日志 20220310 gk */ func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) { jsonBytes, _ := json.Marshal(notifyApiLog) err = NsqProducers.Publish(ApiRequestTopic, string(jsonBytes)) if !logs.CheckErr(err, "NotifyApiLog") { logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes)) } return }