本章涉及db的读写操作,请参考 nsqlookup之RegistrationDB数据库
本章内容涉及tcp协议的封包解包内容,请参考 nsq tcp协议规范
[ ][ ][V][1]client连接后,客户端必须发送一个4 字节的 “magic” 标识码来选择通讯协议的版本。
nsqlookupd/nsqlookupd.go
// 启动tcp服务 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} // 封装的waitGroup,内部使用goroutine启动该服务,使用waitGroup守护改协程直到退出 l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger) })internal/protocol/tcp_server.go
type TCPHandler interface { Handle(net.Conn) } func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) { l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr())) // 接收client连接并回调handle方法 for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err)) // runtime.Gosched让出cpu时间片,让另一时间片处理 runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err)) } break } // 这里的handle方法虽然是TCPHandler的接口类型,实际回调的是nsqlookupd/tcp.go中Handle方法 go handler.Handle(clientConn) } l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr())) }解析协议版本 nsqlookupd/tcp.go
// 回调handle:解析协议版本 func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqlookupd.logf("TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... // 读取tcp流协议的前四个字节,用于选择使用哪种协议 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqlookupd.logf("ERROR: failed to read protocol version - %s", err) return } protocolMagic := string(buf) p.ctx.nsqlookupd.logf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) // 作者为了nsq的可扩展性,可支持多种协议,目前只支持V1版本 var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // 其他协议则发送E_BAD_PROTOCOL并关闭连接并 protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqlookupd.logf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } // 使用对应的协议处理client请求 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqlookupd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err) return } }解析命令 nsqlookupd/lookup_protocol_v1.go
// IOLoop中不断的for循环等待用户的消息 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string // 实例化V1协议 client := NewClientV1(conn) reader := bufio.NewReader(client) for { // 每次读取一行 line, err = reader.ReadString('\n') if err != nil { break } // 解析出数据中的params line = strings.TrimSpace(line) params := strings.Split(line, " ") var response []byte response, err = p.Exec(client, reader, params) if err != nil { // Exec过程中出现err则将err发送给client ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx) _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection // 如果err为Fatal类型的错误则break后强制关闭该连接 if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } // 将Exec结果返回给client if response != nil { _, err = protocol.SendResponse(client, response) if err != nil { break } } } // for循环退出后则关闭client连接,并从db中删除client信息 conn.Close() p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err }nsqlookupd/lookup_protocol_v1.go v1协议支持如下命令
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": return p.PING(client, params) case "IDENTIFY": return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": return p.UNREGISTER(client, reader, params[1:]) } // 非以上命令则返回E_INVALID错误 return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }