package notify import ( "fmt" "github.com/gogf/gf/frame/g" "github.com/nsqio/go-nsq" "gitlab.jxhh.com/stbz/library.git/logs" "time" ) type NsqProducer struct { producer *nsq.Producer } var ( NsqProducers *NsqProducer nsqConfig *NsqConfig //重试 ) func New(config *NsqConfig) { nsqConfig = config NsqProducers = InitProducer(config) } func InitProducer(config *NsqConfig) *NsqProducer { producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig()) if logs.CheckErr(err, "InitProducer") { //panic(err) producer.Stop() } err = producer.Ping() if logs.CheckErr(err, "InitProducer") { producer.Stop() } return &NsqProducer{ producer: producer, } } //发布消息 func (p *NsqProducer) Publish(topic string, message string) (err error) { defer func() { if err != nil { //重试连接 InitProducer(nsqConfig) } }() if p.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = p.producer.Publish(topic, []byte(message)) // 发布消息 if err != nil { g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) } else { g.Log().Cat("Producer").Infof(`消息内容【%v】`, message) } return } return fmt.Errorf("producer is nil", err) } //发布延迟消息 func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) error { //logs.Infof(context.Background(),"topic",message) var err error if p.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = p.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息 if err != nil { g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) } else { g.Log().Cat("Producer").Infof(`消息内容【%v】`, message) } return err } return fmt.Errorf("producer is nil", err) }