producer.go 1.9 KB
package notify

import (
	"fmt"
	"github.com/gogf/gf/frame/g"
	"github.com/nsqio/go-nsq"
	"gitlab.jxhh.com/stbz/library.git/logs"
	"time"
)

type NsqProducer struct {
	producer *nsq.Producer
}

var (
	NsqProducers *NsqProducer
	nsqConfig    *NsqConfig //重试
)

func New(config *NsqConfig) {
	nsqConfig = config
	NsqProducers = InitProducer(config)
}

func InitProducer(config *NsqConfig) *NsqProducer {
	producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig())
	if logs.CheckErr(err, "InitProducer") {
		//panic(err)
		producer.Stop()
	}

	err = producer.Ping()
	if logs.CheckErr(err, "InitProducer") {
		producer.Stop()
	}

	return &NsqProducer{
		producer: producer,
	}
}

//发布消息
func (p *NsqProducer) Publish(topic string, message string) (err error) {
	defer func() {
		if err != nil {
			//重试连接
			InitProducer(nsqConfig)
		}
	}()

	if p.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = p.producer.Publish(topic, []byte(message)) // 发布消息
		if err != nil {
			g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
		} else {
			g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
		}
		return
	}

	return fmt.Errorf("producer is nil", err)
}

//发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) error {

	//logs.Infof(context.Background(),"topic",message)

	var err error
	if p.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = p.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息
		if err != nil {
			g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
		} else {
			g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
		}
		return err
	}

	return fmt.Errorf("producer is nil", err)
}