message.go 5.2 KB
package notify

import (
	"encoding/json"
	"errors"
	"github.com/gogf/gf/encoding/gjson"
	"github.com/gogf/gf/frame/g"
	"github.com/gogf/gf/util/gconv"
	"gitlab.jxhh.com/stbz/library.git/logs"
	"math/rand"
	"strconv"
	"time"
)

const (
	//通知服务topic
	MsgOrderTopic = "messageOrder"
	MsgGoodsTopic = "messageGoods"
	//通知下游topic
	MsgSenderTopic = "messageSender"
	//云仓消息topic
	MsgCloudTopic = "newShopApi"
	//通知消息系统topic
	NotifyTopic = "notifyMessage"
	//API请求日志
	ApiRequestTopic = "apiRequest"
	//通知消息系统topic
	SingleTopic = "singleMessage"

	//消息类型

	//商品消息
	ProductExpire = 101 //商品下架
	ProductModify = 102 //商品修改
	ProductPrice  = 103 //价格变更
	ProductRepost = 104 //商品商家

	//订单消息
	OrderSendgoods           = 201 //订单发货
	OrderComfirmReceiveGoods = 202 //确认收货
	OrderSuccess             = 203 //交易成功
	OrderCancel              = 204 //订单取消
	OrderPay                 = 205 //延迟支付
	OrderRejection           = 206 //已拒收
	OrderReturn              = 207 //拒收已入库
	OrderCreateSuccess       = 211 //建单成功
	OrderCreateFailed        = 212 //建单失败
	OrderCancelFailed        = 214 //取消失败

	RefundApply           = 300 //申请售后
	RefundAgree           = 301 //商家同意售后
	RefundRefuse          = 302 //商家拒绝售后
	RefundSuccess         = 303 //售后成功
	RefundApplySuccess    = 311 //拒收已入库
	RefundApplyFail       = 312 //拒收已入库
	RefundCancel          = 313 //售后取消
	RefundApproved        = 314 //审核通过
	RefundReviewRejection = 315 //审核驳回

	//标签消息
	TagsImport = 401 //导入标签

	//选品消息
	GoodsStorageAdd    = 501 //添加选品库
	GoodsStorageRemove = 502 //移除选品库

	ErrMsgParamEmpty = "该类型消息对应字段不能为空"
	ErrMsgAppIDEmpty = "该类型消息appid字段不能为空"
	ErrMsgTpye       = "错误的消息类型"
)

//推送下游对应字段
var (
	SendMsgType = g.MapIntStr{
		ProductExpire: "goods.undercarriage",
		ProductModify: "goods.alter",
		ProductPrice:  "goods.price.alter",
		ProductRepost: "goods.on.sale",

		OrderSendgoods:           "order.delivery",
		OrderComfirmReceiveGoods: "order.delivered",
		OrderSuccess:             "order.success",
		OrderCancel:              "order.cancel",

		GoodsStorageAdd:    "goods.storage.add",
		GoodsStorageRemove: "goods.storage.remove",
		TagsImport:         "tags.import",

		RefundAgree:   "afterSale.agree",
		RefundRefuse:  "afterSale.refuse",
		RefundSuccess: "afterSale.success",
	}
)

func (p *NsqProducer) msgValidator(notifyMessage *NotifyMessage) (err error) {

	switch gconv.Int(notifyMessage.Type) / 100 % 10 {
	case 1:
		var msgData *GoodsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if len(msgData.GoodsIDs) == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
	case 2:
		var msgData *OrderMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.OrderSn == "" || msgData.Sku == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	case 3:
		var msgData *RefundMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.OrderSn == "" || msgData.Sku == 0 || msgData.AfterSaleID == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}

		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}

	case 4:
		var msgData *TagsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}
		if msgData.Tags == "" {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	case 5:
		var msgData *GoodsMsgData
		if err = gconv.Struct(notifyMessage.Data, &msgData); err != nil {
			return
		}

		if len(msgData.GoodsIDs) == 0 {
			err = errors.New(ErrMsgParamEmpty)
			return
		}
		if notifyMessage.AppID == 0 {
			err = errors.New(ErrMsgAppIDEmpty)
			return
		}
	default:
		err = errors.New(ErrMsgTpye)
	}
	return
}

/*
	各服务推送通知到消息系统 20220114 gk
*/

func (p *NsqProducer) NotifyMessage(notifyMessage *NotifyMessage) (err error) {

	err = p.msgValidator(notifyMessage)

	if logs.CheckErr(err, "NotifyMessage") {
		return
	}
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	var randNums string
	for i := 0; i <= 2; i++ {
		randNums += strconv.Itoa(r.Intn(9))
	}
	notifyMessage.ID = gconv.Int(gconv.String(time.Now().UnixNano()/1e6) + randNums)
	notifyMsg := NotifyTopic
	if  notifyMessage.AppID > 0 {
		notifyMsg = SingleTopic
	}

	err = NsqProducers.Publish(notifyMsg, gjson.New(notifyMessage).MustToJsonString())
	if !logs.CheckErr(err, "NotifyMessage") {
		logs.Info("NotifyMessage", "消息内容:【%v】", gjson.New(notifyMessage).MustToJsonString())
	}
	return
}


/*
 api请求日志 20220310 gk
*/

func (p *NsqProducer) NotifyApiLog(notifyApiLog *NotifyApiLog) (err error) {
	jsonBytes, _ := json.Marshal(notifyApiLog)
	err = NsqProducers.Publish(ApiRequestTopic, string(jsonBytes))
	if !logs.CheckErr(err, "NotifyApiLog") {
		logs.Info("NotifyApiLog", "消息内容:【%v】", string(jsonBytes))
	}
	return
}