package notify import ( "fmt" "github.com/nsqio/go-nsq" "gitlab.jxhh.com/stbz/library.git/logs" "time" ) type NsqProducer struct { producer *nsq.Producer } var ( NsqProducers *NsqProducer nsqConfig *NsqConfig //重试 ) func New(config *NsqConfig) { nsqConfig = config NsqProducers = InitProducer(config) } func InitProducer(config *NsqConfig) *NsqProducer { producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig()) if logs.CheckErr(err,"InitProducer") { //panic(err) producer.Stop() } err = producer.Ping() if logs.CheckErr(err,"InitProducer") { producer.Stop() } return &NsqProducer{ producer: producer, } } //发布消息 func (p *NsqProducer) Publish(topic string, message string) error { //logs.Info("topic","topic:%v,message:%v",topic,message) var err error defer func() { if err != nil { //重试连接 InitProducer(nsqConfig) } }() if p.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = p.producer.Publish(topic, []byte(message)) // 发布消息 logs.CheckErr(err, "Nsq Publish") return err } return fmt.Errorf("producer is nil", err) } //发布延迟消息 func (p *NsqProducer) DeferredPublish(topic string,delay time.Duration, message string) error { //logs.Infof(context.Background(),"topic",message) var err error if p.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = p.producer.DeferredPublish(topic,delay,[]byte(message)) // 发布消息 logs.CheckErr(err, "Nsq Publish") return err } return fmt.Errorf("producer is nil", err) }