producer.go 1.9 KB
package notify

import (
	"fmt"
	"github.com/gogf/gf/frame/g"
	"github.com/nsqio/go-nsq"
	"time"
)

type NsqProducer struct {
	producer *nsq.Producer
}

var (
	NsqProducers *NsqProducer
	nsqConfig    *NsqConfig //重试
)

func New(config *NsqConfig) {
	nsqConfig = config
	NsqProducers = new(NsqProducer)
}

func InitProducer(config *NsqConfig) {
	var err error
	if NsqProducers.producer != nil {
		err = NsqProducers.producer.Ping()
		if err == nil {
			return
		}
	}
	NsqProducers.producer, err = nsq.NewProducer(config.Addr, nsq.NewConfig())
	if err != nil {
		return
	}
	err = NsqProducers.producer.Ping()
	if err != nil {
		NsqProducers.producer.Stop()
		return
	}

}

//Publish 发布消息
func (p *NsqProducer) Publish(topic string, message string) (err error) {
	InitProducer(nsqConfig)

	if NsqProducers.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = NsqProducers.producer.Publish(topic, []byte(message)) // 发布消息
		if err != nil {
			g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
		} else {
			g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
		}
		return
	}

	return fmt.Errorf("producer is nil", err)
}

//DeferredPublish 发布延迟消息
func (p *NsqProducer) DeferredPublish(topic string, delay time.Duration, message string) (err error) {
	InitProducer(nsqConfig)
	if NsqProducers.producer != nil {
		if message == "" { //不能发布空串,否则会导致error
			return nil
		}
		err = NsqProducers.producer.DeferredPublish(topic, delay, []byte(message)) // 发布消息
		if err != nil {
			g.Log().Cat("Producer").Cat("error").Infof(`消息内容【%v】错误【%v】`, message, err.Error())
		} else {
			g.Log().Cat("Producer").Infof(`消息内容【%v】`, message)
		}
		return err
	}

	return fmt.Errorf("producer is nil", err)
}