nsq源码-消息接收和发送完整流程
2023-05-03 10:47:41 # Go # 源码阅读

nsq源码-消息接收和发送完整流程

一、TCP Handler

nsqd里面的Main函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//nsqd.go
func (n *NSQD) Main() error {
//...
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
//...
}

//tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
//...
go func() {
handler.Handle(clientConn)
wg.Done()
}()
//...
}

n.tcpServer实现了TCPHandler接口
在nsqd.New()里面创建的tcpServer

1
2
3
4
5
6
//nsqd.go
func New(opts *Options) (*NSQD, error) {
//...
n.tcpServer = &tcpServer{}
//...
}

看下tcpServer实现的Handle接口里做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (p *tcpServer) Handle(clientConn net.Conn) {

//...
//从socket中读取数据
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
clientConn.Close()
return
}
protocolMagic := string(buf)

p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)

var prot protocol.Protocol
switch protocolMagic {
case " V2":
//这里是关键,创建了一个protocolV2对象
prot = &protocolV2{nsqd: p.nsqd}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}

p.conns.Store(clientConn.RemoteAddr(), clientConn)

//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程
//接收消息和发送消息给客户端,都在这里面处理了
err = prot.IOLoop(clientConn)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}

p.conns.Delete(clientConn.RemoteAddr())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//protocol_v2.go
//因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loop
func (p *protocolV2) IOLoop(conn net.Conn) error {
...
clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.nsqd)
p.nsqd.AddClient(client.ID, client)

...
// messagePump负责从channel的memoryMsgChan和
//backend.ReadChan()中读取消息并将消息推送给client

messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan

//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等
//主要逻辑在p.Exec()里
for {
...
//主要逻辑在这个Exec里面
response, err = p.Exec(client, params)
...
}

//...
}

二、接收消息

上面已经看到处理客户端消息主要在protocolV2.Exec()里
这段代码我觉得很好理解了,直接去protocolV2.PUB()
看客户端生产消息的逻辑,其他的指令先不看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
...
//这里就是客户端生产消息的指令处理了
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
...
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
}
...
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
var err error
...
bodyLen, err := readLen(client.Reader, client.lenSlice)
...
//读取消息主体
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
}

...
//将消息丢到topic.PutMessage()
//PutMessage直接将消息丢到msgChan或者diskqueue了
//关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html
topic := p.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

client.PublishedMessage(topicName, 1)

return okBytes, nil
}

三、发送消息

现在来搞清楚,服务器端又在哪里发送消息给consumer?
在这protocolV2.messagePump()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
var flusherChan <-chan time.Time
var sampleRate int32

//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里
//可以去看protocolV2.SUB()函数
subEventChan := client.SubEventChan

//鉴权Identify对应的chan,只能鉴权一次
identifyEventChan := client.IdentifyEventChan

//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)

heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
flushed := true
close(startedChan)

for {
// 检查订阅状态和消息是否可处理状态
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
//这个memoryMsgChan是channel将消息存在内存中的地方
memoryMsgChan = subChannel.memoryMsgChan
//这个backendMsgChan是channel将消息存在磁盘中的地方
backendMsgChan = subChannel.backend.ReadChan()

flusherChan = outputBufferTicker.C
}
fmt.Printf("werben subChannel nil: %t\n", subChannel == nil)

select {
case <-flusherChan:
//这个flusherChan就是outputBufferTicker
//250ms时间间隔Flush一次数据
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
//客户端Sub的时候,会将channel传到这个subEventChan通道,
//参考protocolV2.SUB()函数
subEventChan = nil
case identifyData := <-identifyEventChan:
//客户端提交identify时出发,只能提交一次identify,
//参考函数protocolV2.IDENTIFY()

//感觉这里就是在收到这个消息时
//重新启动心跳和flush同步的ticker
identifyEventChan = nil

outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}

heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}

if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}

msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
//心跳处理
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
...
//磁盘消息处理
client.SendingMessage()
...
err = p.SendMessage(client, msg)
...
flushed = false
case msg := <-memoryMsgChan:
//将内存消息发送给客户端
client.SendingMessage()
...
err = p.SendMessage(client, msg)
...
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
...
//结束时候关闭心跳和flush同步的ticke
heartbeatTicker.Stop()
outputBufferTicker.Stop()
...
}