1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { ... //这里就是客户端生产消息的指令处理了 case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) ... case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) } ... return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error ... bodyLen, err := readLen(client.Reader, client.lenSlice) ... //读取消息主体 messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") }
... //将消息丢到topic.PutMessage() //PutMessage直接将消息丢到msgChan或者diskqueue了 //关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) }
client.PublishedMessage(topicName, 1)
return okBytes, nil }
|