0%

NSQ源码剖析——简单高性能的MQ实现

最近在公司内部进行了一次以Nsq为主题的技术分享,经过稍许的整理,把PPT转作博文呈现。

一、NSQ组件

nsqd
一个负责接收、排队、转发消息到客户端的守护进程
nsqlookupd
管理拓扑信息并提供最终一致性的发现服务的守护进程
nsqadmin
一套 Web 用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
utilities
常见基础功能、数据流处理工具,如 nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq


NSQ集群组件关系图

二、NSQD构成

Topic:一个 topic 就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建 topic。

Channel: 从属于Topic,是生产者与消费者之间的消息通道,相当于消息队列


NSQ Topic-Channel关系示意图

三、源码分析

抛出疑问 答案
消费模式 ?
消息消费可靠性保证 ?
消息堆积 + 持久化 ?
消息丢失 ?
时序 ?
分布式实现 ?
延迟队列、优先级队列等高级特性 ?

NSQD入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (n *NSQD) Main() error {
/* ....省略.... */
//start tcp server
n.tcpServer.ctx = ctx
n.waitGroup.Wrap(func() {
//nsqd/tcp.go:16
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})

//start http server
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})

if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
//queueScanLoop runs in a single goroutine to process in-flight and deferred
n.waitGroup.Wrap(n.queueScanLoop)
//lookup handle
n.waitGroup.Wrap(n.lookupLoop)
/* ....省略.... */
}

Tcp Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
var wg sync.WaitGroup

for {
clientConn, err := listener.Accept()
/* ....省略.... */
wg.Add(1)
go func() {
//nsqd/tcp.go:16
handler.Handle(clientConn)
wg.Done()
}()
}

// wait to return until all handler goroutines complete
wg.Wait()
/* ....省略.... */
return nil
}

Tcp Connect Handle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *tcpServer) Handle(clientConn net.Conn) {
clientConn.RemoteAddr())
/* ....省略.... */
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
clientConn.Close()
return
}
//连接建立,客户端发送4个字节的协议版本
protocolMagic := string(buf)
//现在只支持V2协议,后续协议扩展只需要实现Protocol接口即可
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
return
}
//nsqd/protocol_v2.go:36
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}
p.conns.Delete(clientConn.RemoteAddr())
}

IO Loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (p *protocolV2) IOLoop(conn net.Conn) error {
/* ....省略.... */
client := newClientV2(clientID, conn, p.ctx)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan

//处理客户端的上行消息
for {
/* ....省略.... */
//\n分割命令
line, err = client.Reader.ReadSlice('\n')
/* ....省略.... */
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
//空格分割参数
params := bytes.Split(line, separatorBytes)
var response []byte
//根据参数指令,执行相应的命令
response, err = p.Exec(client, params)
/* ....省略.... */
}
/* ....省略.... */
}

PUB协议格式
1
2
PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]

Producer Pub指令处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
topicName := string(params[1])
/* ....省略.... */
//数据读取
bodyLen, err := readLen(client.Reader, client.lenSlice)
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
/* ....省略.... */

//获取推送目的topic
topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
//消息推入队列
err = topic.PutMessage(msg)
/* ....省略.... */
}

Topic消息投递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (t *Topic) PutMessage(m *Message) error {
/* ....省略.... */
err := t.put(m)
if err != nil {
return err
}
return nil
}

func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m: //内存队列
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend) //磁盘队列
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}

获取Topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first.
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
}
n.Lock()
t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
}
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t
n.logf("TOPIC(%s): created", t.name)
// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
n.Unlock()
/* 下面省略从nsqlookupd获取topic信息代码,因为这个nsqd实例可能是新加的机器,所以需要执行nsqlookupd查询 */
return t
}

新建Topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
//内存队列结构
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
pauseChan: make(chan bool),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
//磁盘队列
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {
t.backend = diskqueue.New(topicName,
ctx.nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
ctx.nsqd.getOpts().Logger)
}
//最重要goroutine
t.waitGroup.Wrap(func() { t.messagePump() })
t.ctx.nsqd.Notify(t)
return t
}

Topic事件循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (t *Topic) messagePump() {
/* ....省略.... */
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
}
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
/* ....省略.... */
}

Channel消息投递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
if c.Exiting() {
return errors.New("exiting")
}
err := c.put(m)
if err != nil {
return err
}
atomic.AddUint64(&c.messageCount, 1)
return nil
}

func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
return err
}
}
return nil
}

PUB流程结束,向Consumer投递流程分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
/* ....省略.... */
for {
/* ....省略.... */
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
select {
//client channel订阅消息通知,通知后不再接收channel订阅消息
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
//读取客户端订阅通道的文件队列中的消息
case b := <-backendMsgChan:
//根据采样率读取对应比例消息
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
//将消息解码
msg, err := decodeMessage(b)
if err != nil {
continue
}
msg.Attempts++
//将消息加入优先队列,设置超时时间,如果超时没收到客户端确认回复会重新进行投递,以此保证消息至少消费一次,但并不会保证消息只消费一次
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
//发送消息给客户端
err = p.SendMessage(client, msg)

//读取客户端订阅通道的内存队列中的消息
case msg := <-memoryMsgChan:
//根据采样率读取对应比例消息
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
//发送消息给客户端
err = p.SendMessage(client, msg)
}
/* ....省略.... */

消息存入优先队列,保证消息至少消费一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
//存入inFlightMessage map
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
//存入inflightmessage优先队列
c.addToInFlightPQ(msg)
return nil
}

func (c *Channel) addToInFlightPQ(msg *Message) {
c.inFlightMutex.Lock()
c.inFlightPQ.Push(msg)
c.inFlightMutex.Unlock()
}

优先队列图示


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// queueScanLoop runs in a single goroutine to process 
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).in-flight and deferred
func (n *NSQD) queueScanLoop() {
/* ....省略.... */
workTicker := time.NewTicker(n.getOpts().QueueScanInterval) //default 100ms
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //default 5s

channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}

num := n.getOpts().QueueScanSelectionCount
//向workCh投递channel
loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}

numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
//存在过期消息比例高于标定值,继续一轮扫描
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
/* ....省略.... */

客户端确认消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
state := atomic.LoadInt32(&client.State)
if state != stateSubscribed && state != stateClosing {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state")
}

if len(params) < 2 {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params")
}

id, err := getMessageID(params[1])
if err != nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
}

err = client.Channel.FinishMessage(client.ID, *id)
if err != nil {
return nil, protocol.NewClientErr(err, "E_FIN_FAILED",
fmt.Sprintf("FIN %s failed %s", *id, err.Error()))
}

client.FinishedMessage()

return nil, nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--
} else {
// expand
//启动一个queueScanWorker
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}

扫描优先队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
//判断两个队列中是否存在到期消息
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
//将判断结果发送到responseCh
responseCh <- dirty
case <-closeCh:
return
}
}
}

从优先队列移除消息
1
2
3
4
5
6
7
8
9
10
11
12
// FinishMessage successfully discards an in-flight message
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)
if c.e2eProcessingLatencyStream != nil {
c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
}
return nil
}

四、NSQ总结

疑问 答案
消费模式 push
消息发送可靠性保证 未保证,tcp连接异常会导致投递消息丢失
消息消费可靠性保证 至少投递一次
消息堆积 + 持久化 内存队列+磁盘队列
消息丢失 nsqd异常宕机会导致消息丢失
时序 非时序
分布式实现 去中心化
延迟队列、优先级队列等高级特性 支持延迟队列

欢迎关注我的其它发布渠道