NSQ 消息队列使用实战

网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。

NSQ 的全家桶介绍

  • nsqd:守护进程,客户端通信。默认端口 4150(TCP) 4151(HTTP)

  • nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。**一个 nsqlookupd 能够管理一群 nsqd。**默认端口::4160(TCP),:4161(HTTP)

  • nsqadmin:在线面板,能够通过浏览器直接访问。默认端口 :4171

从命令行启动

可以直接下载二进制文件。开三个终端,分别执行:

nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161

go-nsq 的使用

我封装了一个包:

 1package mq
 2
 3import (
 4	"encoding/json"
 5	"fmt"
 6	"time"
 7
 8	"github.com/nsqio/go-nsq"
 9	"go.uber.org/zap"
10)
11
12type MessageQueueConfig struct {
13	NsqAddr         string
14	NsqLookupdAddr  string
15	SupportedTopics []string
16}
17
18type MessageQueue struct {
19	config    MessageQueueConfig
20	producer  *nsq.Producer
21	consumers map[string]*nsq.Consumer
22}
23
24func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
25	zap.L().Debug("New message queue")
26	producer, err := initProducer(config.NsqAddr)
27	if err != nil {
28		return nil, err
29	}
30	consumers := make(map[string]*nsq.Consumer)
31	for _, topic := range config.SupportedTopics {
32		nsq.Register(topic,"default")
33		consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)
34		if err != nil {
35			return
36		}
37	}
38	return &MessageQueue{
39		config:    config,
40		producer:  producer,
41		consumers: consumers,
42	}, nil
43}
44
45func (mq *MessageQueue) Run() {
46	for name, c := range mq.consumers {
47		zap.L().Info("Run consumer for " + name)
48		// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
49		c.ConnectToNSQD(mq.config.NsqAddr)
50	}
51}
52
53func initProducer(addr string) (producer *nsq.Producer, err error) {
54	zap.L().Debug("initProducer to " + addr)
55	config := nsq.NewConfig()
56	producer, err = nsq.NewProducer(addr, config)	
57	return
58}
59
60func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
61	zap.L().Debug("initConsumer to " + topic + "/" + channel)
62	config := nsq.NewConfig()
63	config.LookupdPollInterval = 15 * time.Second
64	c, err = nsq.NewConsumer(topic, channel, config)
65	return
66}
67
68func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {
69	body, err := json.Marshal(data)
70	if err != nil {
71		return
72	}
73	zap.L().Info("Pub " + name + " to mq. data = " + string(body))
74	return mq.producer.Publish(name, body)
75}
76
77type Messagehandler func(v []byte)
78
79func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {
80	zap.L().Info("Subscribe " + name)
81	v, ok := mq.consumers[name]
82	if !ok {
83		err = fmt.Errorf("No such topic: " + name)
84		return
85	}
86	v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
87		handler(message.Body)
88		return nil
89	}))
90	return
91}

使用示例:

 1	m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
 2		NsqAddr:         "127.0.0.1:4150",
 3		NsqLookupdAddr:  "127.0.0.1:4161",
 4		SupportedTopics: []string{"hello"},
 5	})
 6
 7	if err != nil {
 8		zap.L().Fatal("Message queue error: " + err.Error())
 9	}
10
11	m.Sub("hello", func(resp []byte) {
12		zap.L().Info("S1 Got: " + string(resp))
13	})
14	m.Sub("hello", func(resp []byte) {
15		zap.L().Info("S2 Got: " + string(resp))
16	})
17	m.Run()
18	err = m.Pub("hello", "world")
19	if err != nil {
20		zap.L().Fatal("Message queue error: " + err.Error())
21	}
22	err = m.Pub("hello", "tom")
23	if err != nil {
24		zap.L().Fatal("Message queue error: " + err.Error())
25	}
26
27	sigChan := make(chan os.Signal, 1)
28	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
29	<-sigChan
30	os.Exit(0);

主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。

输出结果:

 12021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:29     New message queue
 22021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:58     initProducer to 127.0.0.1:4150
 32021-11-07T19:13:41.887+0800    DEBUG   mq/mq.go:65     initConsumer to hello/default
 42021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello
 52021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello
 62021-11-07T19:13:41.887+0800    INFO    mq/mq.go:51     Run consumer for hello
 72021/11/07 19:13:41 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd
 82021-11-07T19:13:41.887+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "world"
 92021/11/07 19:13:41 INF    1 (127.0.0.1:4150) connecting to nsqd
102021-11-07T19:13:41.888+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "tom"
112021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"
122021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:63   S2 Got: "tom"

从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息

遇到的问题

TOPIC_NOT_FOUND

遇到两个原因。

其一是大小写,Topic 名是大小写敏感的,因此 Hellohello 是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。

其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用 /topic/create 接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。

发现客户端轮询 HTTP

这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。

对于小项目,可以绕过 NsqLookupd:

1		// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
2		c.ConnectToNSQD(mq.config.NsqAddr)

如何让多个消费者消费同一个 topic?

显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。

第二种方法是根据用途定义 channel 名。

第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。

第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。

我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)

实现流水线 Handler

  1package mq
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"time"
  7
  8	"github.com/nsqio/go-nsq"
  9	"go.uber.org/zap"
 10)
 11
 12type MessageQueueConfig struct {
 13	NsqAddr         string
 14	NsqLookupdAddr  string
 15	EnableLookupd   bool
 16	SupportedTopics []string
 17}
 18
 19type MessageQueue struct {
 20	subscribers map[string]Subscriber
 21	config      MessageQueueConfig
 22	producer    *nsq.Producer
 23}
 24
 25type Messagehandler func(v []byte) bool
 26
 27// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil
 28type LinkedHandlerNode struct {
 29	Handler  *Messagehandler
 30	Index    int
 31	NextNode *LinkedHandlerNode
 32}
 33
 34type Subscriber struct {
 35	HandlerHeadNode *LinkedHandlerNode
 36	Consumer        *nsq.Consumer
 37	Handler         nsq.HandlerFunc
 38}
 39
 40func createProducer(addr string) (producer *nsq.Producer, err error) {
 41	zap.L().Debug("initProducer to " + addr)
 42	config := nsq.NewConfig()
 43	producer, err = nsq.NewProducer(addr, config)
 44	return
 45}
 46
 47func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
 48	zap.L().Debug("initConsumer to " + topic + "/" + channel)
 49	config := nsq.NewConfig()
 50	config.LookupdPollInterval = 15 * time.Second
 51	c, err = nsq.NewConsumer(topic, channel, config)
 52	return
 53}
 54
 55func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {	
 56	zap.L().Debug("New message queue")
 57	producer, err := createProducer(config.NsqAddr)
 58	if err != nil {
 59		return nil, err
 60	}
 61	subscribers := make(map[string]Subscriber)
 62	for _, topic := range config.SupportedTopics {
 63		nsq.Register(topic, "default")
 64		consumer, err := createConsumer(topic, "default", config.NsqAddr)
 65		if err != nil {
 66			return nil, err
 67		}
 68		// 头节点不参与实际使用,所以 Index = -1
 69		headNode := &LinkedHandlerNode{Index: -1}
 70		hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
 71			// 循环链式调用各个 Handler
 72			curNode := headNode.NextNode
 73			// 当不存在任何用户定义的 Handler 时抛出警告
 74			if(nil == curNode){
 75				return fmt.Errorf("No handler provided!")
 76			}
 77			for nil != curNode {
 78				msg := message.Body
 79				zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
 80				stop := (*curNode.Handler)(msg)
 81				if stop {
 82					zap.S().Debugf("the message has stopped spreading ")
 83					break
 84				}
 85				curNode = curNode.NextNode
 86			}
 87			return nil
 88		})
 89		consumer.AddHandler(hubHandler)
 90		subscribers[topic] = Subscriber{
 91			Consumer:        consumer,
 92			HandlerHeadNode: headNode,
 93		}
 94	}
 95	return &MessageQueue{
 96		config:      config,
 97		producer:    producer,
 98		subscribers: subscribers,
 99	}, nil
100}
101
102func (mq *MessageQueue) Run() {
103	for name, s := range mq.subscribers {
104		zap.L().Info("Run consumer for " + name)
105		if mq.config.EnableLookupd {
106			s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
107		} else {
108			s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
109		}
110	}
111}
112
113func (mq *MessageQueue) IsTopicSupported(topic string) bool {
114
115	for _, v := range mq.config.SupportedTopics {
116		if v == topic {
117			return true
118		}
119	}
120	return false
121}
122
123// Pub 向消息队列发送一个消息
124func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
125	if !mq.IsTopicSupported(topic) {
126		err = fmt.Errorf("unsupported topic name: " + topic)
127		return
128	}
129	body, err := json.Marshal(data)
130	if err != nil {
131		return
132	}
133	zap.L().Info("Pub " + topic + " to mq. data = " + string(body))
134	return mq.producer.Publish(topic, body)
135}
136
137// Sub 从消息队列订阅一个消息
138func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
139	if !mq.IsTopicSupported(topic) {
140		err = fmt.Errorf("unsupported topic name: " + topic)
141		return
142	}
143	zap.L().Info("Subscribe " + topic)
144	subscriber, ok := mq.subscribers[topic]
145	if !ok {
146		err = fmt.Errorf("No such topic: " + topic)
147		return
148	}
149	// 抵达最后一个有效链表节点
150	curNode := subscriber.HandlerHeadNode
151	for nil != curNode.NextNode {
152		curNode = curNode.NextNode
153	}
154	// 创建节点
155	curNode.NextNode = &LinkedHandlerNode{
156		Handler:  &handler,
157		Index:    1 + curNode.Index,
158		NextNode: nil,
159	}
160	return
161}

这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。

使用示例:

 1
 2	m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
 3		NsqAddr:         "127.0.0.1:4150",
 4		NsqLookupdAddr:  "127.0.0.1:4161",
 5		SupportedTopics: []string{"hello"},
 6		EnableLookupd:   false,
 7	})
 8
 9	if err != nil {
10		zap.L().Fatal("Message queue error: " + err.Error())
11	}
12
13	m.Sub("hello", func(resp []byte) bool {
14		zap.L().Info("S1 Got: " + string(resp))
15		return false
16	})
17	m.Sub("hello", func(resp []byte) bool {
18		zap.L().Info("S2 Got: " + string(resp))
19		return true
20	})
21	m.Sub("hello", func(resp []byte) bool {
22		zap.L().Info("S3 Got: " + string(resp))
23		return false
24	})
25	m.Run()
26	err = m.Pub("hello", "world")
27	if err != nil {
28		zap.L().Fatal("Message queue error: " + err.Error())
29	}
30	err = m.Pub("hello", "tom")
31	if err != nil {
32		zap.L().Fatal("Message queue error: " + err.Error())
33	}
34
35	sigChan := make(chan os.Signal, 1)
36	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
37	<-sigChan
38	os.Exit(0)

输出:

2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:40     New message queue
2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:89     initProducer to 127.0.0.1:4150
2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:96     initConsumer to hello/default
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:82     Run consumer for hello
2021/11/07 20:30:38 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.454+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "world"
2021/11/07 20:30:38 INF    1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.455+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "world"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading 
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading 
^C

可以看到,Handler 返回 true 时,就可以阻断消息的传播。

遇到的问题

反复测试之后,我发现这样一种情况下,即创建一个消息但是不消费,然后重启程序,会导致程序立即收到消息,而相关的处理函数还未准备好,于是程序崩溃。

当然解决方法也很简单,调整一下消息队列和其它服务的启动顺序即可。

另外要注意的一点就是数据迁移——如果有未消费完的消息,格式可能和新的代码不一致,则需要进行兼容性处理,或者暴力删除旧的消息。