package notify import ( "github.com/nsqio/go-nsq" "gitlab.jxhh.com/stbz/library.git/logs" "time" ) type NsqConfig struct { Topic string Channel string Addr string Handler nsq.Handler } func InitConsumer(consumer *NsqConfig) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second * 30 //设置重连时间 c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者 if logs.CheckErr(err, "InitConsumer") { return } c.AddHandler(consumer.Handler) // 添加消费者接口 //建立NSQLookupd连接 err = c.ConnectToNSQD(consumer.Addr) if logs.CheckErr(err, "ConnectToNSQD") { return } } func InitBatchConsumer(consumer *NsqConfig, clientNum int) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second * 30 //设置重连时间 c, err := nsq.NewConsumer(consumer.Topic, consumer.Channel, cfg) // 新建一个消费者 if logs.CheckErr(err, "InitConsumer") { return } if clientNum == 0 { clientNum = 1 } c.ChangeMaxInFlight(clientNum) //可以根据nsqds数量来配置 c.AddHandler(consumer.Handler) // 批量添加消费者接口 //建立NSQLookupd连接 err = c.ConnectToNSQD(consumer.Addr) if logs.CheckErr(err, "ConnectToNSQD") { return } }