欢迎光临
我们一直在努力

NSQLookupd 中Main方法的作用是什么

NSQLookupd 中Main方法的作用是什么,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

type NSQLookupd struct {

     options      *nsqlookupdOptions     // 命令行参数

     tcpAddr      *net.TCPAddr     // tcp 端口

     httpAddr     *net.TCPAddr     // http 端口

     tcpListener  net.Listener

     httpListener net.Listener

     waitGroup    util.WaitGroupWrapper

     DB           *RegistrationDB

}

这篇文章分析下 daemon.Main() 执行过程

1、创建 context,内部只有一个指向NSQLookupd的指针

context := &Context{l}

2、创建 listener;Listen 方法支持面向流的监听,包括 unix domain stream socket。

tcpListener, err := net.Listen("tcp", l.tcpAddr.String())

3、创建 tcpServer struct,内部只含有一个 context 指针,为什么需要对 NSQLookupd

tcpServer 实现了 Handle(net.Conn) 方法,表明实现了 TCPHandler 接口

tcpServer := &tcpServer{context: context}

4、分析 l.waitGroup.Wrap(func() { util.TCPServer(tcpListener, tcpServer) })

waitGroup 是结构体 util.WaitGroupWrapper 实例,此结构体继承自 sync.WaitGroup;WaitGroup 等待一组协程结束。主协程调用 Add 方法来设置等待协程的数目;func() { util.TCPServer(tcpListener, tcpServer) } 这是一个匿名函数。

总结:主协程调用 Wrap 方法,调用 Add,设置等待的协程数目为1,启动新的协程去调用 匿名函数,主协程最后会在 exitChan 通道上等待消息,如果收到中断信号,关闭 tcpListener, httpListner,最后在 waitGroup 上等待协程结束。需要 waitGroup 的目的,就是为了 主协程 和 监听协程之间的状态同步。

5、监听协程执行的逻辑如下,简化版本,忽略日志和错误处理。监听协程循环 Accept,然后分配新的协程调用 handler 处理接收的 clientConn。处理客户端连接的协程,我把它叫做 工作协程。

func TCPServer(listener net.Listener, handler TCPHandler) {

     for {

          clientConn, err := listener.Accept()

          go handler.Handle(clientConn)

     }

}

6、工作协程的处理逻辑,最终调用的是 tcpServer 的 Handle 方法,这是最重要的一个方法,重点分析

func (p *tcpServer) Handle(clientConn net.Conn)

buf := make([]byte, 4)

_, err := io.ReadFull(clientConn, buf)

读取客户端发送过来的前4个字节,里面含有协议的版本号。看到了没有,完全阻塞式编程,没有屎一样的事件驱动!

判断版本号后,最终调用 LookupProtocolV1 的 IOLoop 方法

7、方法分析 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error

client := NewClientV1(conn)     // ClientV1 struct 继承 net.Conn,封装一个 PeerInfo

reader := bufio.NewReader(client) // 创建一个带缓冲的 reader

for {     // 此部分代码有省略

    line, err = reader.ReadString('\n’)     // 按行来处理

    line = strings.TrimSpace(line)

    params := strings.Split(line, " ")

    response, err := p.Exec(client, reader, params)  // 执行命令

    if response != nil {

        _, err = util.SendResponse(client, response)     // 发送 response

    }

}

IO 出错,或者执行命令出错,会退出循环。

8、命令分派

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error)

switch params[0] {

case "PING":

    return p.PING(client, params)

// ………

}

根据第一个参数做命令区分,调用响应的指令

9、命令执行,以 ping 命令为例,看看命令如何执行

func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error)

atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) // 更新客户端的状态

return []byte("OK"), nil      // 返回 “OK”,经由第 7 步的 util.SendResponse 发送给 client

至此 tcpServer 的处理流程就介绍完了。

10、和 2,3 步对应的,还会创建 httpListener, httpServer, 主要负责 topic,channel 相关的操作

httpListener, err := net.Listen("tcp", l.httpAddr.String())

httpServer := &httpServer{context: context}

处理 httpServer 的协程, 会调用 func() { util.HTTPServer(httpListener, httpServer, "HTTP") }

11、分析 util.HTTPServer 函数

关键是如下两句

server := &http.Server{

    Handler: handler,

}

err := server.Serve(listener)

默认的 http handler 就是 httpServer 本身

12、分析 httpServer 的 ServeHTTP 方法

err := s.v1Router(w, req)     // 路由请求

13、分析 v1Router

switch req.URL.Path {

case "/ping”:

//……

}

按照请求路径分派

14、以 "/topics” 为例,看如何处理请求

执行函数 util.NegotiateAPIResponseWrapper(w, req,

               func() (interface{}, error) { return s.doTopics(req) })

关于NSQLookupd 中Main方法的作用是什么问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注云搜网行业资讯频道了解更多相关知识。

赞(0)
【声明】:本博客不参与任何交易,也非中介,仅记录个人感兴趣的主机测评结果和优惠活动,内容均不作直接、间接、法定、约定的保证。访问本博客请务必遵守有关互联网的相关法律、规定与规则。一旦您访问本博客,即表示您已经知晓并接受了此声明通告。