func(n *NSQD)GetTopic(topicName string) *Topic { // most likely, we already have this topic, so try read lock first. n.RLock() t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } n.Lock() t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.logf("TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our // channels from lookupd. This blocks concurrent PutMessages to this topic. t.Lock() n.Unlock() /* 下面省略从nsqlookupd获取topic信息代码,因为这个nsqd实例可能是新加的机器,所以需要执行nsqlookupd查询 */ return t }
func(t *Topic)messagePump() { /* ....省略.... */ for { select { case msg = <-memoryMsgChan: case buf = <-backendChan: msg, err = decodeMessage(buf) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } } for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } /* ....省略.... */ }
// queueScanLoop runs in a single goroutine to process // It copies Redis's probabilistic expiration algorithm: it wakes up every // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount // (default: 20) channels from a locally cached list (refreshed every // QueueScanRefreshInterval (default: 5s)).in-flight and deferred func(n *NSQD)queueScanLoop() { /* ....省略.... */ workTicker := time.NewTicker(n.getOpts().QueueScanInterval) //default 100ms refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //default 5s
channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: iflen(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit }
num := n.getOpts().QueueScanSelectionCount //向workCh投递channel loop: for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] }
numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } //存在过期消息比例高于标定值,继续一轮扫描 iffloat64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } /* ....省略.... */
func(p *protocolV2)FIN(client *clientV2, params [][]byte)([]byte, error) { state := atomic.LoadInt32(&client.State) if state != stateSubscribed && state != stateClosing { returnnil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state") }
iflen(params) < 2 { returnnil, protocol.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params") }
// queueScanWorker receives work (in the form of a channel) from queueScanLoop // and processes the deferred and in-flight queues func(n *NSQD)queueScanWorker(workCh chan *Channel, responseCh chanbool, closeCh chanint) { for { select { case c := <-workCh: //判断两个队列中是否存在到期消息 now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } //将判断结果发送到responseCh responseCh <- dirty case <-closeCh: return } } }
从优先队列移除消息
1 2 3 4 5 6 7 8 9 10 11 12
// FinishMessage successfully discards an in-flight message func(c *Channel)FinishMessage(clientID int64, id MessageID)error { msg, err := c.popInFlightMessage(clientID, id) if err != nil { return err } c.removeFromInFlightPQ(msg) if c.e2eProcessingLatencyStream != nil { c.e2eProcessingLatencyStream.Insert(msg.Timestamp) } returnnil }