nsq源码分析(2):nsqlookup之启动和停止

xiaoxiao2021-02-28  96

nsq源码分析(2):nsqlookup之启动和停止

nsqlookup使用 go-svc 包控制进程的启动和停止

program结构体有三个方法:

Init:守护进程启动之前被执行 Start:守护进程的启动操作 Stop:守护进程的停止操作

nsqlookup options配置

* nsqlookupd/options.go *

type Options struct { Verbose bool `flag:"verbose"` // 日志输出格式的前缀 LogPrefix string `flag:"log-prefix"` // 监听的tcp服务地址及端口 TCPAddress string `flag:"tcp-address"` // 监听的http服务地址及端口 HTTPAddress string `flag:"http-address"` // 广播地址,默认主机名 BroadcastAddress string `flag:"broadcast-address"` // 生产者超时时间,默认300秒 InactiveProducerTimeout time.Duration `flag:"inactive-producer-timeout"` // TombstoneLifetime time.Duration `flag:"tombstone-lifetime"` Logger Logger }

nsqlookup 结构体

* nsqlookupd/nsqlookupd.go *

type NSQLookupd struct { // 读写锁 sync.RWMutex // nsqlookup配置 opts *Options // 监听的tcp服务 tcpListener net.Listener // 监听的http服务 httpListener net.Listener // 用于等待goroutine结束 waitGroup util.WaitGroupWrapper // 注册数据库 DB *RegistrationDB }

nsqlookup启动

* apps/nsqlookupd/nsqlookupd.go *

func main() { prg := &program{} // 守护进程监听SIGINT(程序终止信号ctrl+c)和SIGTERM(程序结束信号 kill -15),当接收到这两个信号则守护进程退出 if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } } func (p *program) Start() error { // 初始化nsqlookup配置 opts := nsqlookupd.NewOptions() // 解析用户flag传参 flagSet := nsqlookupdFlagSet(opts) flagSet.Parse(os.Args[1:]) // 输出版本并退出 if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqlookupd")) os.Exit(0) } // 解析配置文件参数 var cfg map[string]interface{} configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } // 将用户传入的参数和配置文件参数合并,并实例化NSQLookupd对象 options.Resolve(opts, flagSet, cfg) daemon := nsqlookupd.New(opts) // 运行nsqlookup守护进程 daemon.Main() p.nsqlookupd = daemon return nil }

启动tcp服务及http服务

func (l *NSQLookupd) Main() { // ctx中包含nsqlookup的所有配置信息 ctx := &Context{l} // 启动tcp服务 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} // 封装的waitGroup,内部使用goroutine启动该服务,使用waitGroup守护改协程直到退出 l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger) }) // 启动http服务 httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err) os.Exit(1) } l.Lock() l.httpListener = httpListener l.Unlock() httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) }) }

nsqlookup停止

// 当守护进程收到SIGINT和SIGTERM信号则执行Stop清理工作 func (p *program) Stop() error { if p.nsqlookupd != nil { p.nsqlookupd.Exit() } return nil } func (l *NSQLookupd) Exit() { // 关闭监听的tcp服务 if l.tcpListener != nil { l.tcpListener.Close() } // 关闭监听的http服务 if l.httpListener != nil { l.httpListener.Close() } // 等待所有的goroutine执行完成并退出主程序 l.waitGroup.Wait() }
转载请注明原文地址: https://www.6miu.com/read-41951.html

最新回复(0)