1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| func (t *Topic) messagePump() { var msg *Message var buf []byte var err error var chans []*Channel var memoryMsgChan chan *Message var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel() //这里就是要等到startChan完成后才能往下走, for { select { case <-t.channelUpdateChan: continue case <-t.pauseChan: continue case <-t.exitChan: goto exit case <-t.startChan: //也就是要等到topic执行完GetChannel()之后才会接着往下走 } break } t.RLock() //将所有channel通道放在chans中 for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan //backendChan就是backend暴露给外部的readChan //参考: https://www.cnblogs.com/werben/p/14517781.html backendChan = t.backend.ReadChan() }
// main message loop //这里是守护协程的主体了,也就是这个for会一直跑 for { select { case msg = <-memoryMsgChan: //如果topic有收到新消息 case buf = <-backendChan: //如果消息是从diskqueue里来的,还要解码反序列化成msg msg, err = decodeMessage(buf) if err != nil { t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } case <-t.channelUpdateChan: //如果有新的channel通道 chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.pauseChan: //如果channel通道暂停 if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.exitChan: goto exit }
//遍历每一个channel通道,将消息投递过去 for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { //如果是延时消息则将延时消息丢给channel channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } //将消息则将延时消息丢给channel err := channel.PutMessage(chanMsg) if err != nil { t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } }
exit: t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) }
|