producer.go 1.4 KB
package notify

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"gitlab.jxhh.com/stbz/library.git/logs"
	"time"
)

type NsqProducer struct {
	producer *nsq.Producer
}

var NsqProducers *NsqProducer

func New(config *NsqConfig) {
	NsqProducers = InitProducer(config)
}


func InitProducer(config *NsqConfig) *NsqProducer {
	producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig())
	if logs.CheckErr(err,"InitProducer") {
		//panic(err)
		producer.Stop()
	}

	err = producer.Ping()
	if logs.CheckErr(err,"InitProducer") {
		producer.Stop()
	}

	return &NsqProducer{
		producer: producer,
	}
}

//发布消息
func (p *NsqProducer) Publish(topic string, message string) error {

	logs.Info("topic","topic:%v,message:%v",topic,message)

	var err error
	if p.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = p.producer.Publish(topic, []byte(message)) // 发布消息
		logs.CheckErr(err, "Nsq Publish")
		return err
	}

	return fmt.Errorf("producer is nil", err)
}

//发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message string) error {

	logs.Info("topic",message)

	var err error
	if p.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = p.producer.DeferredPublish(topic,delay,[]byte(message)) // 发布消息
		logs.CheckErr(err, "Nsq Publish")
		return err
	}

	return fmt.Errorf("producer is nil", err)
}