package notify import ( "fmt" "github.com/gogf/gf/frame/g" "github.com/nsqio/go-nsq" "time" ) type NsqProducer struct { producer *nsq.Producer } var ( NsqProducers *NsqProducer nsqConfig *NsqConfig //重试 ) func New(config *NsqConfig) { nsqConfig = config NsqProducers = new(NsqProducer) } func InitProducer(config *NsqConfig) { var err error if NsqProducers.producer != nil { err = NsqProducers.producer.Ping() if err == nil { return } } NsqProducers.producer, err = nsq.NewProducer(config.Addr, nsq.NewConfig()) if err != nil { return } err = NsqProducers.producer.Ping() if err != nil { NsqProducers.producer.Stop() return } } //Publish 发布消息 func (p *NsqProducer) Publish(topic string, message string) (err error) { InitProducer(nsqConfig) if NsqProducers.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = NsqProducers.producer.Publish(topic, []byte(message)) // 发布消息 if err != nil { g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) } else { g.Log().Cat("Producer").Infof(`消息内容【%v】`, message) } return } return fmt.Errorf("producer is nil", err) } //DeferredPublish 发布延迟消息 func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) (err error) { InitProducer(nsqConfig) if NsqProducers.producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = NsqProducers.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息 if err != nil { g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) } else { g.Log().Cat("Producer").Infof(`消息内容【%v】`, message) } return err } return fmt.Errorf("producer is nil", err) }