NSQ 消息队列使用实战
网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。
NSQ 的全家桶介绍
-
nsqd:守护进程,客户端通信。默认端口
4150
(TCP)4151
(HTTP) -
nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。**一个 nsqlookupd 能够管理一群 nsqd。**默认端口:
:4160
(TCP),:4161
(HTTP) -
nsqadmin:在线面板,能够通过浏览器直接访问。默认端口
:4171
从命令行启动
可以直接下载二进制文件。开三个终端,分别执行:
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161
go-nsq 的使用
我封装了一个包:
1package mq
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "github.com/nsqio/go-nsq"
9 "go.uber.org/zap"
10)
11
12type MessageQueueConfig struct {
13 NsqAddr string
14 NsqLookupdAddr string
15 SupportedTopics []string
16}
17
18type MessageQueue struct {
19 config MessageQueueConfig
20 producer *nsq.Producer
21 consumers map[string]*nsq.Consumer
22}
23
24func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
25 zap.L().Debug("New message queue")
26 producer, err := initProducer(config.NsqAddr)
27 if err != nil {
28 return nil, err
29 }
30 consumers := make(map[string]*nsq.Consumer)
31 for _, topic := range config.SupportedTopics {
32 nsq.Register(topic,"default")
33 consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)
34 if err != nil {
35 return
36 }
37 }
38 return &MessageQueue{
39 config: config,
40 producer: producer,
41 consumers: consumers,
42 }, nil
43}
44
45func (mq *MessageQueue) Run() {
46 for name, c := range mq.consumers {
47 zap.L().Info("Run consumer for " + name)
48 // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
49 c.ConnectToNSQD(mq.config.NsqAddr)
50 }
51}
52
53func initProducer(addr string) (producer *nsq.Producer, err error) {
54 zap.L().Debug("initProducer to " + addr)
55 config := nsq.NewConfig()
56 producer, err = nsq.NewProducer(addr, config)
57 return
58}
59
60func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
61 zap.L().Debug("initConsumer to " + topic + "/" + channel)
62 config := nsq.NewConfig()
63 config.LookupdPollInterval = 15 * time.Second
64 c, err = nsq.NewConsumer(topic, channel, config)
65 return
66}
67
68func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {
69 body, err := json.Marshal(data)
70 if err != nil {
71 return
72 }
73 zap.L().Info("Pub " + name + " to mq. data = " + string(body))
74 return mq.producer.Publish(name, body)
75}
76
77type Messagehandler func(v []byte)
78
79func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {
80 zap.L().Info("Subscribe " + name)
81 v, ok := mq.consumers[name]
82 if !ok {
83 err = fmt.Errorf("No such topic: " + name)
84 return
85 }
86 v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
87 handler(message.Body)
88 return nil
89 }))
90 return
91}
使用示例:
1 m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
2 NsqAddr: "127.0.0.1:4150",
3 NsqLookupdAddr: "127.0.0.1:4161",
4 SupportedTopics: []string{"hello"},
5 })
6
7 if err != nil {
8 zap.L().Fatal("Message queue error: " + err.Error())
9 }
10
11 m.Sub("hello", func(resp []byte) {
12 zap.L().Info("S1 Got: " + string(resp))
13 })
14 m.Sub("hello", func(resp []byte) {
15 zap.L().Info("S2 Got: " + string(resp))
16 })
17 m.Run()
18 err = m.Pub("hello", "world")
19 if err != nil {
20 zap.L().Fatal("Message queue error: " + err.Error())
21 }
22 err = m.Pub("hello", "tom")
23 if err != nil {
24 zap.L().Fatal("Message queue error: " + err.Error())
25 }
26
27 sigChan := make(chan os.Signal, 1)
28 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
29 <-sigChan
30 os.Exit(0);
主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。
输出结果:
12021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:29 New message queue
22021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:58 initProducer to 127.0.0.1:4150
32021-11-07T19:13:41.887+0800 DEBUG mq/mq.go:65 initConsumer to hello/default
42021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello
52021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello
62021-11-07T19:13:41.887+0800 INFO mq/mq.go:51 Run consumer for hello
72021/11/07 19:13:41 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd
82021-11-07T19:13:41.887+0800 INFO mq/mq.go:77 Pub hello to mq. data = "world"
92021/11/07 19:13:41 INF 1 (127.0.0.1:4150) connecting to nsqd
102021-11-07T19:13:41.888+0800 INFO mq/mq.go:77 Pub hello to mq. data = "tom"
112021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world"
122021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:63 S2 Got: "tom"
从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息。
遇到的问题
TOPIC_NOT_FOUND
遇到两个原因。
其一是大小写,Topic 名是大小写敏感的,因此 Hello
和 hello
是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。
其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用 /topic/create
接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。
发现客户端轮询 HTTP
这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。
对于小项目,可以绕过 NsqLookupd:
1 // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
2 c.ConnectToNSQD(mq.config.NsqAddr)
如何让多个消费者消费同一个 topic?
显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。
第二种方法是根据用途定义 channel 名。
第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。
第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。
我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)
实现流水线 Handler
1package mq
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "github.com/nsqio/go-nsq"
9 "go.uber.org/zap"
10)
11
12type MessageQueueConfig struct {
13 NsqAddr string
14 NsqLookupdAddr string
15 EnableLookupd bool
16 SupportedTopics []string
17}
18
19type MessageQueue struct {
20 subscribers map[string]Subscriber
21 config MessageQueueConfig
22 producer *nsq.Producer
23}
24
25type Messagehandler func(v []byte) bool
26
27// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil
28type LinkedHandlerNode struct {
29 Handler *Messagehandler
30 Index int
31 NextNode *LinkedHandlerNode
32}
33
34type Subscriber struct {
35 HandlerHeadNode *LinkedHandlerNode
36 Consumer *nsq.Consumer
37 Handler nsq.HandlerFunc
38}
39
40func createProducer(addr string) (producer *nsq.Producer, err error) {
41 zap.L().Debug("initProducer to " + addr)
42 config := nsq.NewConfig()
43 producer, err = nsq.NewProducer(addr, config)
44 return
45}
46
47func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
48 zap.L().Debug("initConsumer to " + topic + "/" + channel)
49 config := nsq.NewConfig()
50 config.LookupdPollInterval = 15 * time.Second
51 c, err = nsq.NewConsumer(topic, channel, config)
52 return
53}
54
55func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
56 zap.L().Debug("New message queue")
57 producer, err := createProducer(config.NsqAddr)
58 if err != nil {
59 return nil, err
60 }
61 subscribers := make(map[string]Subscriber)
62 for _, topic := range config.SupportedTopics {
63 nsq.Register(topic, "default")
64 consumer, err := createConsumer(topic, "default", config.NsqAddr)
65 if err != nil {
66 return nil, err
67 }
68 // 头节点不参与实际使用,所以 Index = -1
69 headNode := &LinkedHandlerNode{Index: -1}
70 hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
71 // 循环链式调用各个 Handler
72 curNode := headNode.NextNode
73 // 当不存在任何用户定义的 Handler 时抛出警告
74 if(nil == curNode){
75 return fmt.Errorf("No handler provided!")
76 }
77 for nil != curNode {
78 msg := message.Body
79 zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
80 stop := (*curNode.Handler)(msg)
81 if stop {
82 zap.S().Debugf("the message has stopped spreading ")
83 break
84 }
85 curNode = curNode.NextNode
86 }
87 return nil
88 })
89 consumer.AddHandler(hubHandler)
90 subscribers[topic] = Subscriber{
91 Consumer: consumer,
92 HandlerHeadNode: headNode,
93 }
94 }
95 return &MessageQueue{
96 config: config,
97 producer: producer,
98 subscribers: subscribers,
99 }, nil
100}
101
102func (mq *MessageQueue) Run() {
103 for name, s := range mq.subscribers {
104 zap.L().Info("Run consumer for " + name)
105 if mq.config.EnableLookupd {
106 s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
107 } else {
108 s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
109 }
110 }
111}
112
113func (mq *MessageQueue) IsTopicSupported(topic string) bool {
114
115 for _, v := range mq.config.SupportedTopics {
116 if v == topic {
117 return true
118 }
119 }
120 return false
121}
122
123// Pub 向消息队列发送一个消息
124func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
125 if !mq.IsTopicSupported(topic) {
126 err = fmt.Errorf("unsupported topic name: " + topic)
127 return
128 }
129 body, err := json.Marshal(data)
130 if err != nil {
131 return
132 }
133 zap.L().Info("Pub " + topic + " to mq. data = " + string(body))
134 return mq.producer.Publish(topic, body)
135}
136
137// Sub 从消息队列订阅一个消息
138func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
139 if !mq.IsTopicSupported(topic) {
140 err = fmt.Errorf("unsupported topic name: " + topic)
141 return
142 }
143 zap.L().Info("Subscribe " + topic)
144 subscriber, ok := mq.subscribers[topic]
145 if !ok {
146 err = fmt.Errorf("No such topic: " + topic)
147 return
148 }
149 // 抵达最后一个有效链表节点
150 curNode := subscriber.HandlerHeadNode
151 for nil != curNode.NextNode {
152 curNode = curNode.NextNode
153 }
154 // 创建节点
155 curNode.NextNode = &LinkedHandlerNode{
156 Handler: &handler,
157 Index: 1 + curNode.Index,
158 NextNode: nil,
159 }
160 return
161}
这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。
使用示例:
1
2 m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
3 NsqAddr: "127.0.0.1:4150",
4 NsqLookupdAddr: "127.0.0.1:4161",
5 SupportedTopics: []string{"hello"},
6 EnableLookupd: false,
7 })
8
9 if err != nil {
10 zap.L().Fatal("Message queue error: " + err.Error())
11 }
12
13 m.Sub("hello", func(resp []byte) bool {
14 zap.L().Info("S1 Got: " + string(resp))
15 return false
16 })
17 m.Sub("hello", func(resp []byte) bool {
18 zap.L().Info("S2 Got: " + string(resp))
19 return true
20 })
21 m.Sub("hello", func(resp []byte) bool {
22 zap.L().Info("S3 Got: " + string(resp))
23 return false
24 })
25 m.Run()
26 err = m.Pub("hello", "world")
27 if err != nil {
28 zap.L().Fatal("Message queue error: " + err.Error())
29 }
30 err = m.Pub("hello", "tom")
31 if err != nil {
32 zap.L().Fatal("Message queue error: " + err.Error())
33 }
34
35 sigChan := make(chan os.Signal, 1)
36 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
37 <-sigChan
38 os.Exit(0)
输出:
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:40 New message queue
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:89 initProducer to 127.0.0.1:4150
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:96 initConsumer to hello/default
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello
2021-11-07T20:30:38.448+0800 INFO mq/mq.go:82 Run consumer for hello
2021/11/07 20:30:38 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.454+0800 INFO mq/mq.go:108 Pub hello to mq. data = "world"
2021/11/07 20:30:38 INF 1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.455+0800 INFO mq/mq.go:108 Pub hello to mq. data = "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "world"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "tom"
2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading
^C
可以看到,Handler 返回 true 时,就可以阻断消息的传播。
遇到的问题
反复测试之后,我发现这样一种情况下,即创建一个消息但是不消费,然后重启程序,会导致程序立即收到消息,而相关的处理函数还未准备好,于是程序崩溃。
当然解决方法也很简单,调整一下消息队列和其它服务的启动顺序即可。
另外要注意的一点就是数据迁移——如果有未消费完的消息,格式可能和新的代码不一致,则需要进行兼容性处理,或者暴力删除旧的消息。