NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

jopen 9年前

nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。 nsqlookupd 有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。

NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询nsqlookupd来发现指定话题(topic)的生产者,并且nsqd节点广播话题(topic)和通道(channel)信息。

nsqlookupd有两个接口:TCP接口,nsqd用它来广播。HTTP接口,客户端用它来发现和管理。

nsqlookup struct分析

代码文件路径为nsq/nsqlookupd/nsqlookupd.go

type NSQLookupd struct {      sync.RWMutex               //读写锁      opts         *Options  //nsqlookupd 配置信息 定义文件路径为nsq/nsqlookupd/options.go      tcpListener  net.Listener       httpListener net.Listener       waitGroup    util.WaitGroupWrapper //WaitGroup 典型应用 用于开启两个goroutine,一个监听HTTP 一个监听TCP      DB           *RegistrationDB //product 注册数据库 具体分析后面章节再讲  }    //初始化NSQLookupd实例  func New(opts *Options) *NSQLookupd {      n := &NSQLookupd{          opts: opts,          DB:   NewRegistrationDB(), //初始化DB实例      }      n.logf(version.String("nsqlookupd"))      return n  }      func (l *NSQLookupd) Main() {      ctx := &Context{l} //初始化Context实例将NSQLookupd指针放入Context实例中 Context结构请参考文件nsq/nsqlookupd/context.go Context用于nsqlookupd中的tcpServer 和 httpServer中        tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) //开启TCP监听      if err != nil {          l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err)          os.Exit(1)      }      l.Lock()      l.tcpListener = tcpListener      l.Unlock()      tcpServer := &tcpServer{ctx: ctx} //创建一个tcpServer tcpServer 实现了nsq/internal/protocol包中的TCPHandler接口      l.waitGroup.Wrap(func() {              //protocol.TCPServer方法的过程就是tcpListener accept tcp的连接              //然后通过tcpServer中的Handle分析报文,然后处理相关的协议          protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)      }) //把tcpServer加入到waitGroup        httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) //开启HTTP监听      if err != nil {          l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err)          os.Exit(1)      }      l.Lock()      l.httpListener = httpListener      l.Unlock()      httpServer := newHTTPServer(ctx) //创建一个httpServer      l.waitGroup.Wrap(func() {          http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)      }) //把httpServer加入到waitGroup  }      //NSQLookupd退出  func (l *NSQLookupd) Exit() {      if l.tcpListener != nil {          l.tcpListener.Close() //关闭tcpListener      }        if l.httpListener != nil {          l.httpListener.Close() //关闭httpListener      }      l.waitGroup.Wait()  }

这一章节的代码就先分析到这里了,下一章节要分析的是nsqlookup中的tcpServer.