提交 8e52b1f6 authored 作者: 张立波's avatar 张立波

消息

上级 f256b7f9
...@@ -19,13 +19,11 @@ var ( ...@@ -19,13 +19,11 @@ var (
func New(config *NsqConfig) { func New(config *NsqConfig) {
nsqConfig = config nsqConfig = config
NsqProducers = InitProducer(config)
} }
func InitProducer(config *NsqConfig) *NsqProducer { func InitProducer(config *NsqConfig) {
producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig()) producer, err := nsq.NewProducer(config.Addr, nsq.NewConfig())
if logs.CheckErr(err, "InitProducer") { if logs.CheckErr(err, "InitProducer") {
//panic(err)
producer.Stop() producer.Stop()
} }
...@@ -34,21 +32,20 @@ func InitProducer(config *NsqConfig) *NsqProducer { ...@@ -34,21 +32,20 @@ func InitProducer(config *NsqConfig) *NsqProducer {
producer.Stop() producer.Stop()
} }
return &NsqProducer{ NsqProducers = &NsqProducer{
producer: producer, producer: producer,
} }
} }
//发布消息 //Publish 发布消息
func (p *NsqProducer) Publish(topic string, message string) (err error) { func (p *NsqProducer) Publish(topic string, message string) (err error) {
//重试连接 InitProducer(nsqConfig)
NsqProducers = InitProducer(nsqConfig)
if p.producer != nil { if NsqProducers.producer != nil {
if message == "" { //不能发布空串,否则会导致error if message == "" { //不能发布空串,否则会导致error
return nil return nil
} }
err = p.producer.Publish(topic, []byte(message)) // 发布消息 err = NsqProducers.producer.Publish(topic, []byte(message)) // 发布消息
if err != nil { if err != nil {
g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
} else { } else {
...@@ -60,16 +57,14 @@ func (p *NsqProducer) Publish(topic string, message string) (err error) { ...@@ -60,16 +57,14 @@ func (p *NsqProducer) Publish(topic string, message string) (err error) {
return fmt.Errorf("producer is nil", err) return fmt.Errorf("producer is nil", err)
} }
//发布延迟消息 //DeferredPublish 发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) error { func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) (err error) {
InitProducer(nsqConfig)
NsqProducers = InitProducer(nsqConfig) if NsqProducers.producer != nil {
var err error
if p.producer != nil {
if message == "" { //不能发布空串,否则会导致error if message == "" { //不能发布空串,否则会导致error
return nil return nil
} }
err = p.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息 err = NsqProducers.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息
if err != nil { if err != nil {
g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error()) g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
} else { } else {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论