message.go 5.3 KB
package notify

import (
	"encoding/json"
	"errors"
	"github.com/gogf/gf/frame/g"
	"github.com/gogf/gf/util/gconv"
	"gitlab.jxhh.com/stbz/library.git/logs"
	"math/rand"
	"strconv"
	"time"
)

const (
	//通知服务topic
	MsgOrderTopic = "messageOrder"
	MsgGoodsTopic = "messageGoods"
	//通知下游topic
	MsgSenderTopic = "messageSenders"
	//云仓消息topic
	MsgCloudTopic = "newShopApi"
	//通知消息系统topic
	NotifyTopic = "notifyMessage"
	//API请求日志
	ApiRequestTopic = "apiRequest"


	//消息类型

	//商品消息
	ProductExpire = 101 //商品下架
	ProductModify = 102 //商品修改
	ProductPrice  = 103 //价格变更
	ProductRepost = 104 //商品商家

	//订单消息
	OrderSendgoods           = 201 //订单发货
	OrderComfirmReceiveGoods = 202 //确认收货
	OrderSuccess             = 203 //交易成功
	OrderCancel              = 204 //订单取消
	RefundApply              = 300 //申请售后
	RefundAgree              = 301 //商家同意售后
	RefundRefuse             = 302 //商家拒绝售后
	RefundSuccess            = 303 //售后成功

	//标签消息
	TagsImport = 401 //导入标签

	//选品消息
	GoodsStorageAdd    = 501 //添加选品库
	GoodsStorageRemove = 502 //移除选品库

	ErrMsgParamEmpty = "该类型消息对应字段不能为空"
	ErrMsgAppIDEmpty = "该类型消息appid字段不能为空"
	ErrMsgTpye 		 = "错误的消息类型"
)

//推送下游对应字段
var (
	SendMsgType = g.MapIntStr{
		ProductExpire: "goods.undercarriage",
		ProductModify: "goods.alter",
		ProductPrice:  "goods.price.alter",
		ProductRepost: "goods.on.sale",

		OrderSendgoods:           "order.delivery",
		OrderComfirmReceiveGoods: "order.delivered",
		OrderSuccess:             "order.success",
		OrderCancel:              "order.cancel",

		GoodsStorageAdd:    "goods.storage.add",
		GoodsStorageRemove: "goods.storage.remove",
		TagsImport:         "tags.import",

		RefundAgree:   "afterSale.agree",
		RefundRefuse:  "afterSale.refuse",
		RefundSuccess: "afterSale.success",
	}
)

func (p *NsqProducer) msgValidator(notifyMessage *NotifyMessage) (err error) {

	switch gconv.Int(notifyMessage.Type) / 100 % 10 {
	case 1:
		var msgData *GoodsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if len(msgData.GoodsIDs) == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
	case 2:
		var msgData *OrderMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.OrderSn == "" || msgData.Sku == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	case 3:
		var msgData *RefundMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.OrderSn == "" || msgData.Sku == 0 || msgData.AfterSaleID == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}

		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}

	case 4:
		var msgData *TagsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.Tags == "" {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	case 5:
		var msgData *GoodsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}

		if len(msgData.GoodsIDs) == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	default:
		err = errors.New(ErrMsgTpye)
	}
	return
}

/*
	各服务推送通知到消息系统 20220114 gk
*/

func (p *NsqProducer) NotifyMessage(notifyMessage *NotifyMessage) (err error) {

	err = p.msgValidator(notifyMessage)

	if logs.CheckErr(err, "NotifyMessage") {
		return
	}
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	var randNums string
	for i := 0; i <= 2; i++ {
		randNums += strconv.Itoa(r.Intn(9))
	}
	notifyMessage.ID = gconv.Int(gconv.String(time.Now().UnixNano()/1e6) + randNums)

	jsonBytes, _ := json.Marshal(notifyMessage)
	err = NsqProducers.Publish(NotifyTopic, string(jsonBytes))
	if !logs.CheckErr(err, "NotifyMessage") {
		logs.Info("NotifyMessage", "消息内容:【%v】", string(jsonBytes))
	}
	return
}

/*
	推送通知到服务系统系统 20220114 gk
	*****************************************************
    *******注意:该方法目前只有message-server使用 ***********
    *****************************************************
*/

func (p *NsqProducer) NotifyServer(notifyServer *NotifyServer) (err error) {

	jsonBytes, _ := json.Marshal(notifyServer)
	var pushTopic string
	switch notifyServer.MsgType / 100 % 10 {
	case 1:
		pushTopic = MsgGoodsTopic
	case 2:
		pushTopic = MsgOrderTopic
	case 3:
		pushTopic = MsgOrderTopic
	}

	if pushTopic == "" {
		return
	}
	err = NsqProducers.Publish(NotifyTopic, string(jsonBytes))
	if !logs.CheckErr(err, "NotifyServer") {
		logs.Info("NotifyServer", "消息内容:【%v】", string(jsonBytes))
	}
	return
}


/*
	 api请求日志 20220310 gk
*/

func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) {
	jsonBytes, _ := json.Marshal(notifyApiLog)
	err = NsqProducers.Publish(ApiRequestTopic, string(jsonBytes))
	if !logs.CheckErr(err, "NotifyApiLog") {
		logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes))
	}
	return
}