查看原文
其他

深入理解 Node.js 的 Inspector

陈跃标 ByteDance Web Infra 2024-03-28

Node.js 提供的 Inspector 非常强大,不仅可以用来调试 Node.js 代码,还可以实时收集 Node.js 进程的 Heap SnapshotCpu Profile 等数据,同时支持静态、动态开启,是一个非常强大的工具,也是我们调试和诊断 Node.js 进程非常好的方式。本文从使用和原理详细讲解 Node.jsInspector

Node.js 的文档中对 Inspector 的描述很少,但是如果深入探索,其实里面的内容还是挺多的。我们先看一下 Inspector 的使用。

1 Inspector 的使用

1.1  本地调试

我们先从一个例子开始,下面是一个简单的 HTTP 服务器。

const http = require('http');
http.createServer((req, res) => {
    res.end('ok');
}).listen(80);

然后我们以 node --inspect httpServer.js 的方式启动。我们可以看到以下输出。

Debugger listening on ws://127.0.0.1:9229/fbbd9d8f-e088-48cc-b1e0-e16bfe58db44
For help, see: https://nodejs.org/en/docs/inspector

9229 端口是 Node.js 默认选择的端口,当然我们也可以自定义,具体可参考 Node.js 官方文档。这时候我们去浏览器打开开发者工具,菜单栏多了一个调试 Node.js 的按钮。

点击这个按钮。我们可以看到以下界面(点击切换到 Sources Tab)。

我们可以选择某一行代码打断点,比如我在第三行,这时候我们访问 80 端口,开发者工具就会停留在断点处。这时候我们可以看到一些执行上下文。

1.2 远程调试

但很多时候我们可能需要远程调试。比如我在一台云服务器上部署以上服务器代码。然后执行

node --inspect=0.0.0.0:8888 httpServer.js

我们打开开发者工具发现按钮置灰或者找不到我们远程服务器的信息。这时候我们需要用另一种方式,通过在浏览器url输入框输入:

devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws={host}:{port}/{path} 

的方式(替换 {} 里面的内容为你执行 Node.js 时输出的信息),浏览器就会去连接指定的地址,比如执行上面的命令输出的是 ws://0.0.0.0:8888/f6e42278-d915-48dc-af4d-453a23d330ab,假设公网IP是 1.1.1.1。那么最后浏览器url输入框里就填入 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=1.1.1.1:8888/f6e42278-d915-48dc-af4d-453a23d330ab 就可以开始调试了,这种方式比较适合于常用的场景。

1.3 自动探测

如果是我们自己调试的话,1.2 这种方式看起来就有点麻烦,我们可以使用浏览器提供的自动探测功能。

  1. URL 输入框输入 chrome://inspect/#devices 我们会看到以下界面
  1. 点击 configure 按钮,在弹出的弹框里输入你远程服务器的地址
  1. 配置完毕后,我们会看到界面变成这样了(或者打开新的 Tab,我们看到开发者工具的调试按钮也变亮了)。
  1. 这时候我们点击 inspect 按钮、Open dedicated DevTools for Node 按钮或者打开新 Tab 的开发者工具,就可以开始调试,而且还可以调试 Node.js 的原生 JS 模块。

1.4 收集数据

V8 Inspector 是一个非常强大的工具,调试只是它其中一个能力,他还可以获取 Heap SnapshotCPU Profile 等数据,具体能力请参考文章后面列出的指令文档和 Chrome Dev Tools

  1. 收集 Cpu Profile 信息
  1. 获取 Heap Snapshop

1.5 动态开启 Inspector

默认打开 Inspector 能力是不安全的,这意味着能连上服务器的客户端都能通过协议控制 Node.js 进程(虽然 URL 并不容易猜对),通常我们是在 Node.js 进程出现问题的时候,动态开启 Inspector,我们看一下下面的例子。

const inspector = require('inspector');
const http = require('http');

let isOpend = false;

function getHTML() {
    return `<html>
      <meta charset="utf-8" />
      <body>
        复制到新 Tab 打开该 URL 开始调试 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=${inspector.url().replace("ws://", '')}
      </body>
    </html>`
;
}

http.createServer((req, res) => {
  if (req.url == '/debug/open') {
        // 还没开启则开启
        if (!isOpend) {
          isOpend = true;
          // 打开调试器
          inspector.open();
        }
        // 返回给前端的内容
        const html = getHTML() ;
        res.end(html);
  } else if (req.url == '/debug/close') {
        // 如果开启了则关闭
        if (isOpend) {
          inspector.close();
          isOpend = false;
        } 
        res.end('ok');
  } else {
    res.end('ok');
  }
}).listen(80);

当我们需要调试的时候,通过访问 /debug/open 打开调试器。前端界面可以看到以下输出。

复制到新 Tab 打开该 URL 开始调试 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=127.0.0.1:9229/9efd4c80-956a-4422-b23c-4348e6613304

接着新开一个 Tab,然后复制上面的 URL,粘贴到浏览器 URL 地址栏访问,我们就可以看到调试页面。

然后打个断点,接着新开一个 Tab 访问 http://localhost 就可以进入调试,调试完成后访问 /debug/close 关闭调试器。浏览器界面就会显示断开连接了。

以上方式支持调试和收集数据,如果我们只是需要收集数据,还有另一种动态开启 Inspector 的方式

const http = require('http');
const inspector = require('inspector');
const fs = require('fs');

function getCpuprofile(req, res) {
    // 打开一个和 V8 Inspector 的会话
    const session = new inspector.Session();
    session.connect();
    // 向V8 Inspector 提交命令,开启 Cpu Profile 并收集数据
    session.post('Profiler.enable', () => {
    session.post('Profiler.start', () => {
      // 收集一段时间后提交停止收集命令
      setTimeout(() => {
        session.post('Profiler.stop', (err, { profile }) => {
          // 把数据写入文件
          if (!err) {
            fs.writeFileSync('./profile.cpuprofile'JSON.stringify(profile));
          }
          // 断开会话
          session.disconnect();
          // 回复客户端
          res.end('ok');
        });
      }, 3000)
    });
  });
}

http.createServer((req, res) => {
  if (req.url == '/debug/getCpuprofile') {
        getCpuprofile(req, res);
  } else {
        res.end('ok');
  }
}).listen(80);

我们可以通过 Inspector Session 的能力,实时和 V8 Inspector 交互而不需要启动一个 WebSocket 服务。本地调试时还可以在 VSCode 里点击 Profile 文件直接看到效果。

2 Inspector 调试的原理

下面以通过 URL 的方式调试(可以看到 Network ),来看看调试的时候都发生了什么,浏览器和远程服务器建立连接后,是通过 WebSocket 协议通信的,下面是一次通信的信息。

我们看一下这命令是什么意思(具体可以参考 Inspector 协议文档)。

Debugger.scriptParsed # Fired when virtual machine parses script. This event is also fired for all known and uncollected scripts upon enabling debugger.

从说明中我们看到,当 V8 解析脚本的时候就会触发这个事件,告诉浏览器相关的信息。

我们发现返回的都是一些元数据,没有脚本的具体代码内容,这时候浏览器会再次发起请求(点击对应脚本对应的 JS 文件时),

我们看到这个脚本的 scriptId 是 103。所以请求里带了这个 scriptId。对应的请求 id 是 11。接着看一下响应。

至此,我们了解了获取脚本内容的过程,然后我们看看调试的时候是怎样的过程。当我们在浏览器上点击某一行设置断点的时候,浏览器就会发送一个请求。

这个命令的意义顾名思义,我们看一下具体定义:

Debugger.setBreakpointByUrl # Sets JavaScript breakpoint at given location specified either by URL or URL regex. Once this command is issued, all existing parsed scripts will have breakpoints resolved and returned in locations property. Further matching script parsing will result in subsequent breakpointResolved events issued. This logical breakpoint will survive page reloads.

接着服务返回响应。

这时候我们从另外一个 Tab 访问 80 端口,服务器就会在我们设置的断点处停留,并且通知浏览器。

我们看一下这个命令的意思。

这个命令就是当服务器执行到断点时通知浏览器,并且返回执行的一些上下文,比如执行到哪个断点停留了。这时候浏览器侧也会停留在对应的地方,当我们 hover 某个变量时,就会看到对应的上下文。这些都是通过具体的命令获取的数据。就不一一分析了。

3 Node.js Inspector 的实现

大致了解了浏览器和服务器的交互过程和协议后,我们再来深入了解一下关于 Inspector 的一些实现。当然这里不是分析 V8 中 Inspector 的实现,而是分析如何使用 V8 的 Inspector 以及 Node.js 中关于 Inspector 的实现部分。

当我们以以下方式执行应用时

node --inspect app.js

3.1 初始化

Node.js 在启动的过程中,就会初始化 Inspector 相关的逻辑。

inspector_agent_ = std::make_unique<inspector::Agent>(this);

Agent 是负责和 V8 Inspector 通信的对象,创建完后接着执行 env->InitializeInspector({}) 启动 Agent

inspector_agent_->Start(...);

Start 继续执行 Agent::StartIoThread

bool Agent::StartIoThread() {
  io_ = InspectorIo::Start(client_->getThreadHandle(), ...);
  return true;
}

StartIoThread 中的 client_->getThreadHandle() 是重要的逻辑,我们先来分析该函数。

std::shared_ptr<MainThreadHandle> getThreadHandle() {
    if (!interface_) {
      interface_ = std::make_shared<MainThreadInterface>(env_->inspector_agent(), ...);
    }
    return interface_->GetHandle();
}

getThreadHandle 首先创建来一个 MainThreadInterface 对象,接着又调用了他的 GetHandle 方法,我们看一下该方法的逻辑。

std::shared_ptr<MainThreadHandle> MainThreadInterface::GetHandle() {
  if (handle_ == nullptr)
    handle_ = std::make_shared<MainThreadHandle>(this);
  return handle_;
}

GetHandle 了创建了一个 MainThreadHandle 对象,最终结构如下所示。

分析完后我们继续看 Agent::StartIoThreadInspectorIo::Start 的逻辑。

std::unique_ptr<InspectorIo> InspectorIo::Start(std::shared_ptr<MainThreadHandle> main_thread, ...) {
  auto io = std::unique_ptr<InspectorIo>(new InspectorIo(main_thread, ...));
  return io;
}

InspectorIo::Star 里新建了一个 InspectorIo 对象,我们看看 InspectorIo 构造函数的逻辑。

InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread, ...)
    : 
    // 初始化 main_thread_
    main_thread_(main_thread)) {
  // 新建一个子线程,子线程中执行 InspectorIo::ThreadMain
  uv_thread_create(&thread_, InspectorIo::ThreadMain, this);
}

这时候结构如下:

InspectorIo 创建了一个子线程, Inspector 在子线程里启动的原因主要有两个。

  1. 如果在主线程里运行,那么当我们断点调试的时候,Node.js 主线程就会被停住,也就无法处理客户端发过来的调试指令。
  2. 如果主线程陷入死循环,我们就无法实时抓取进程的 Profile 数据来分析原因。

接着继续看一下子线程里执行 InspectorIo::ThreadMain 的逻辑:

void InspectorIo::ThreadMain(void* io) {
  static_cast<InspectorIo*>(io)->ThreadMain();
}

void InspectorIo::ThreadMain() {
  uv_loop_t loop;
  loop.data = nullptr;
  // 在子线程开启一个新的事件循环
  int err = uv_loop_init(&loop);
  std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop), ...);
  // 新建一个 delegate,用于处理请求
  std::unique_ptr<InspectorIoDelegate> delegate(
      new InspectorIoDelegate(queue, main_thread_, ...)
  )
;
  InspectorSocketServer server(std::move(delegate), ...);
  server.Start();
  // 进入事件循环
  uv_run(&loop, UV_RUN_DEFAULT);
}

ThreadMain 主要有三个逻辑:

  1. 创建一个 delegate 对象,该对象是核心的对象,后面我们会看到有什么作用。
  2. 创建一个服务器并启动。
  3. 开启事件循环。

接下来看一下服务器的逻辑,首先看一下创建服务器的逻辑:

InspectorSocketServer::InspectorSocketServer(std::unique_ptr<SocketServerDelegate> delegate, ...)
    : // 保存 delegate
      delegate_(std::move(delegate)),
      // 初始化 sessionId
      next_session_id_(0) {
  // 设置 delegate 的 server 为当前服务器
  delegate_->AssignServer(this);
}

执行完后形成以下结构:

接着我们看启动服务器的逻辑:

bool InspectorSocketServer::Start() {
  // DNS 解析,比如输入的是localhost
  struct addrinfo hints;
  memset(&hints, 0sizeof(hints));
  hints.ai_flags = AI_NUMERICSERV;
  hints.ai_socktype = SOCK_STREAM;
  uv_getaddrinfo_t req;
  const std::string port_string = std::to_string(port_);
  uv_getaddrinfo(loop_, &req, nullptr, host_.c_str(),
                           port_string.c_str(), &hints);
  // 监听解析到的 IP 列表                 
  for (addrinfo* address = req.addrinfo; 
         address != nullptr;
       address = address->ai_next) {

    auto server_socket = ServerSocketPtr(new ServerSocket(this));
    err = server_socket->Listen(address->ai_addr, loop_);
    if (err == 0)
      server_sockets_.push_back(std::move(server_socket));

  }

  return true;
}

首先根据参数做 DNS 解析,然后根据拿到的 IP 列表(通常是一个),创建对应个数的 ServerSocket 对象,并执行它的 Listen 方法。ServerSocket 表示一个监听 socket,看一下 ServerSocket 的构造函数:

ServerSocket(InspectorSocketServer* server) : 
    tcp_socket_(uv_tcp_t()), server_(server) {}

执行完后结构如下:

接着看一下 ServerSocketListen 方法:

int ServerSocket::Listen(sockaddr* addr, uv_loop_t* loop) {
  uv_tcp_t* server = &tcp_socket_;
  uv_tcp_init(loop, server)
  uv_tcp_bind(server, addr, 0);
  uv_listen(reinterpret_cast<uv_stream_t*>(server), 
            511,
            ServerSocket::SocketConnectedCallback);
}

Listen 调用 Libuv 的接口完成服务器的启动。至此,Inspector 提供的 Weboscket 服务器启动了。

3.2 处理连接

从刚才分析中可以看到,当有连接到来时执行回调 ServerSocket::SocketConnectedCallback

void ServerSocket::SocketConnectedCallback(uv_stream_t* tcp_socket,
                                           int status) 
{
  if (status == 0) {
    // 根据 Libuv handle 找到对应的 ServerSocket 对象
    ServerSocket* server_socket = ServerSocket::FromTcpSocket(tcp_socket);
    // Socket 对象的 server_ 字段保存了所在的 InspectorSocketServer
    server_socket->server_->Accept(server_socket->port_, tcp_socket);
  }
}

接着看 InspectorSocketServerAccept 是如何处理连接的:

void InspectorSocketServer::Accept(int server_port,
                                   uv_stream_t* server_socket) 
{

  std::unique_ptr<SocketSession> session(
      new SocketSession(this, next_session_id_++, server_port)
  )
;

  InspectorSocket::DelegatePointer delegate =
      InspectorSocket::DelegatePointer(
          new SocketSession::Delegate(this, session->id())
      );

  InspectorSocket::Pointer inspector =
      InspectorSocket::Accept(server_socket, std::move(delegate));

  if (inspector) {
    session->Own(std::move(inspector));
    connected_sessions_[session->id()].second = std::move(session);
  }
}

Accept 的首先创建里一个 SocketSessionSocketSession::Delegate 对象。然后调用 InspectorSocket::Accept,从代码中可以看到 InspectorSocket::Accept 会返回一个 InspectorSocket 对象。InspectorSocket 是对通信 socket 的封装(和客户端通信的 socket,区别于服务器的监听 socket)。然后记录 session 对象对应的 InspectorSocket 对象,同时记录 sessionIdsession 的映射关系。结构如下图所示:

接着看一下 InspectorSocket::Accept 返回 InspectorSocket 的逻辑:

InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server,
                                                 DelegatePointer delegate) 
{
  auto tcp = TcpHolder::Accept(server, std::move(delegate));
  InspectorSocket* inspector = new InspectorSocket();
  inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp)));
  return InspectorSocket::Pointer(inspector);
}

InspectorSocket::Accept 的代码不多,但是逻辑还是挺多的:

  1. InspectorSocket::Accept 再次调用 TcpHolder::Accept 获取一个 TcpHolder 对象。
TcpHolder::Pointer TcpHolder::Accept(
    uv_stream_t* server,
    InspectorSocket::DelegatePointer delegate) 
{
    
  // 新建一个 TcpHolder 对象,TcpHolder 是对 uv_tcp_t 和 delegate 的封装
  TcpHolder* result = new TcpHolder(std::move(delegate));
  // 拿到 TcpHolder 对象的 uv_tcp_t 结构体
  uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_);
  // 初始化
  int err = uv_tcp_init(server->loop, &result->tcp_);
  // 摘取一个 TCP 连接对应的 fd 保存到 TcpHolder 的 uv_tcp_t 结构体中(即第二个参数的 tcp 字段)
  uv_accept(server, tcp);
  // 注册等待可读事件,有数据时执行 OnDataReceivedCb 回调
  uv_read_start(tcp, allocate_buffer, OnDataReceivedCb);
  return TcpHolder::Pointer(result);
}
  1. 新建一个 HttpHandler 对象:
explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
                     : ProtocolHandler(inspector, std::move(tcp))
{

  llhttp_init(&parser_, HTTP_REQUEST, &parser_settings);
  llhttp_settings_init(&parser_settings);
  parser_settings.on_header_field = OnHeaderField;
  // ...
}

ProtocolHandler::ProtocolHandler(InspectorSocket* inspector,
                                 TcpHolder::Pointer tcp)
                                 : inspector_(inspector), tcp_(std::move(tcp)) {
  // 设置 TCP 数据的 handler,TCP 是只负责传输,数据的解析交给 handler 处理                               
  tcp_->SetHandler(this);
}

HttpHandler 是对 TcpHolder 的封装,主要通过 HTTP 解析器 llhttp 对 HTTP 协议进行解析。

  1. 调用 inspector->SwitchProtocol() 切换当前协议处理器为 HTTP,建立 TCP 连接后,首先要经过一个 HTTP 请求从 HTTP 协议升级到 WebSocket 协议,升级成功后就使用 Websocket 协议进行通信.

我们看一下这时候的结构图:

至此,就完成了连接处理的分析!(撒花,你学废了么)

3.3 协议升级

完成了 TCP 连接的处理后,接下来要完成协议升级,因为 Inspector 是通过 WebSocket 协议和客户端通信的,所以需要通过一个 HTTP 请求来完成 HTTP 到 WebSocekt 协议的升级。从刚才的分析中看当有数据到来时会执行 OnDataReceivedCb 回调:

void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread,
                                 const uv_buf_t* buf) 
{
  TcpHolder* holder = From(tcp);
  holder->ReclaimUvBuf(buf, nread);
  // 调用 handler 的 onData,目前 handler 是 HTTP 协议
  holder->handler_->OnData(&holder->buffer);
}

TCP 层收到数据后交给应用层解析,直接调用上层的 OnData 回调。

void OnData(std::vector<char>* data) override {
    // 解析 HTTP 协议
    llhttp_execute(&parser_, data->data(), data->size());
    // 解析完并且是升级协议的请求则调用 delegate 的回调 OnSocketUpgrade
    delegate()->OnSocketUpgrade(event.host, event.path, event.ws_key);
}

OnData 可能会被多次回调,并通过 llhttp_execute 解析收到的 HTTP 报文,当发现是一个协议升级的请求后,就调用 OnSocketUpgrade 回调。delegate 是一个 SocketSession::Delegate 对象。来看一下该对象的 OnSocketUpgrade 方法:

void SocketSession::Delegate::OnSocketUpgrade(const std::string& host,
                                              const std::string& path,
                                              const std::string& ws_key) {
  std::string id = path.empty() ? path : path.substr(1);
  server_->SessionStarted(session_id_, id, ws_key);
}

OnSocketUpgrade 又调用了 server_InspectorSocketServer 对象)的 SessionStarted

void InspectorSocketServer::SessionStarted(int session_id,
                                           const std::string& id,
                                           const std::string& ws_key) 
{
  // 找到对应的 session 对象                                           
  SocketSession* session = Session(session_id);
  connected_sessions_[session_id].first = id;
  session->Accept(ws_key);
  delegate_->StartSession(session_id, id);
}

首先通过 session_id 找到建立 TCP 连接时分配的 SocketSession 对象:

  1. 执行 session->Accept(ws_key) 回复客户端同意协议升级:
void Accept(const std::string& ws_key) {
  ws_socket_->AcceptUpgrade(ws_key);
}

从结构图我们可以看到 ws_socket_ 是一个 InspectorSocket 对象:

void AcceptUpgrade(const std::string& accept_key) override {
    char accept_string[ACCEPT_KEY_LENGTH];
    generate_accept_string(accept_key, &accept_string);
    const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
                                    "Upgrade: websocket\r\n"
                                    "Connection: Upgrade\r\n"
                                    "Sec-WebSocket-Accept: ";
    // ...
    // 回复 101 给客户端             
    WriteRaw(reply, WriteRequest::Cleanup);
    // 切换 handler 为 WebSocket handler
    inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_)));
}

AcceptUpgradeh 首先回复客户端 101 表示同意升级到 WebSocket 协议,然后切换数据处理器为 WsHandler,即后续的数据按照 WebSocket 协议处理。

  1. 执行 delegate_->StartSession(session_id, id) 建立和 V8 Inspector 的会话。delegate_  是 InspectorIoDelegate 对象:
void InspectorIoDelegate::StartSession(int session_id,
                                       const std::string& target_id) 
{
  auto session = main_thread_->Connect(
      std::unique_ptr<InspectorSessionDelegate>(
          new IoSessionDelegate(request_queue_->handle(), session_id)
      ), 
      true);
  if (session) {
    sessions_[session_id] = std::move(session);
    fprintf(stderr"Debugger attached.\n");
  }
}

首先通过 main_thread_->Connect 拿到一个 session,并在 InspectorIoDelegate 中记录映射关系。结构图如下:

接下来看一下 main_thread_->Connect 的逻辑(main_thread_MainThreadHandle 对象):

std::unique_ptr<InspectorSession> MainThreadHandle::Connect(
    std::unique_ptr<InspectorSessionDelegate> delegate,
    bool prevent_shutdown) 
{

  return std::unique_ptr<InspectorSession>(
      new CrossThreadInspectorSession(++next_session_id_,
                                      shared_from_this(),
                                      std::move(delegate),
                                      prevent_shutdown));
}

Connect 函数新建了一个 CrossThreadInspectorSession 对象。CrossThreadInspectorSession 构造函数如下:

 CrossThreadInspectorSession(...) {
    // 执行 MainThreadSessionState::Connect                             
    state_.Call(&MainThreadSessionState::Connect, std::move(delegate));
 }

继续看 MainThreadSessionState::Connect

void Connect(std::unique_ptr<InspectorSessionDelegate> delegate) {
    Agent* agent = thread_->inspector_agent();
    session_ = agent->Connect(std::move(delegate), prevent_shutdown_);
}

继续调 agent->Connect

std::unique_ptr<InspectorSession> Agent::Connect(
    std::unique_ptr<InspectorSessionDelegate> delegate,
    bool prevent_shutdown) 
{

  int session_id = client_->connectFrontend(std::move(delegate),
                                            prevent_shutdown);
  return std::unique_ptr<InspectorSession>(
      new SameThreadInspectorSession(session_id, client_));
}

继续调 connectFrontend

  int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate,
                      bool prevent_shutdown) 
{
    int session_id = next_session_id_++;
    channels_[session_id] = std::make_unique<ChannelImpl>(env_,
                                                          client_,
                                                          getWorkerManager(),
                                                          std::move(delegate),
                                                          getThreadHandle(),
                                                          prevent_shutdown);
    return session_id;
  }

connectFrontend 创建了一个 ChannelImpl 并且在 channels_ 中保存了映射关系。看看 ChannelImpl 的构造函数:

explicit ChannelImpl(Environment* env,
                     const std::unique_ptr<V8Inspector>& inspector,
                     std::unique_ptr<InspectorSessionDelegate> delegate, ...)
      : delegate_(std::move(delegate)) 
{

    session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView());
}

ChannelImpl 调用 inspector->connect 建立了一个和 V8 Inspector 的会话。结构图大致如下:

客户端到 Node.jsV8 Inspector 的整体架构如下:

3.4 客户端到 V8 Inspector 的数据处理

TCP 连接建立了,协议升级也完成了,接下来就可以开始处理业务数据。从前面的分析中我们已经知道数据到来时会执行 TcpHoldlerhandler_->OnData 回调。因为已经完成了协议升级,所以这时候的 handler 变成了 WeSocket handler

  void OnData(std::vector<char>* data) override 
    int processed 
0;
    do {
      processed = ParseWsFrames(*data);
      // ...
    } while (processed > 0 && !data->empty());
  }

OnData 通过 ParseWsFrames 解析 WebSocket 协议:

int ParseWsFrames(const std::vector<char>& buffer) {
    int bytes_consumed = 0;
    std::vector<char> output;
    bool compressed = false;
    // 解析WebSocket协议
    ws_decode_result r =  decode_frame_hybi17(buffer,
                                              true /* client_frame */,
                                              &bytes_consumed, &output,
                                              &compressed);
    // 执行delegate的回调                                        
    delegate()->OnWsFrame(output);
    return bytes_consumed;
  }

前面已经分析过 delegateTcpHoldlerdelegate,即 SocketSession::Delegate 对象:

void SocketSession::Delegate::OnWsFrame(const std::vector<char>& data) {
  server_->MessageReceived(session_id_,
                           std::string(data.data(), 
                           data.size()));
}

继续回调 server_->MessageReceived。从结构图可以看到 server_InspectorSocketServer 对象:

void MessageReceived(int session_id, const std::string& message) {
  delegate_->MessageReceived(session_id, message);
}

继续回调 delegate_->MessageReceivedInspectorSocketServerdelegate_InspectorIoDelegate 对象:

void InspectorIoDelegate::MessageReceived(int session_id,
                                          const std::string& message) 
{
  auto session = sessions_.find(session_id);
  if (session != sessions_.end())
    session->second->Dispatch(Utf8ToStringView(message)->string());
}

首先通过 session_id 找到对应的 sessionsession 是一个 CrossThreadInspectorSession 对象。看看他的 Dispatch 方法:

 void Dispatch(const StringView& message) override {
    state_.Call(&MainThreadSessionState::Dispatch,
                StringBuffer::create(message));
  }

执行 MainThreadSessionState::Dispatch

void Dispatch(std::unique_ptr<StringBuffer> message) {
  session_->Dispatch(message->string());
}

session_SameThreadInspectorSession 对象:

void SameThreadInspectorSession::Dispatch(
    const v8_inspector::StringView& message) 
{
  auto client = client_.lock();
  if (client)
    client->dispatchMessageFromFrontend(session_id_, message);
}

继续调 client->dispatchMessageFromFrontend

 void dispatchMessageFromFrontend(int session_id, const StringView& message) {
   channels_[session_id]->dispatchProtocolMessage(message);
 }

通过 session_id 找到对应的 ChannelImpl,继续调 ChannelImpldispatchProtocolMessage

 voiddispatchProtocolMessage(const StringView& message) {
   session_->dispatchProtocolMessage(message);
 }

最终调用和 V8 Inspector 的会话对象把数据发送给 V8。至此客户端到 V8 Inspector 的通信过程就完成了。

3.5 V8 Inspector 到客户端的数据处理

接着看从 V8 inspector 到客户端的数据传递逻辑。V8 inspector 是通过 channelsendResponse 函数把数据传递给客户端的:

 void sendResponse(
      int callId,
      std::unique_ptr<v8_inspector::StringBuffer> message) override 
{

    sendMessageToFrontend(message->string());
  }

 void sendMessageToFrontend(const StringView& message) {
    delegate_->SendMessageToFrontend(message);
 }

delegate_IoSessionDelegate 对象:

void SendMessageToFrontend(const v8_inspector::StringView& message) override {
    request_queue_->Post(id_, TransportAction::kSendMessage,
                         StringBuffer::create(message));
  }

request_queue_ 是 RequestQueueData 对象。
 void Post(int session_id,
            TransportAction action,
            std::unique_ptr<StringBuffer> message) 
{

    Mutex::ScopedLock scoped_lock(state_lock_);
    bool notify = messages_.empty();
    // 消息入队
    messages_.emplace_back(action, session_id, std::move(message));
    if (notify) {
      CHECK_EQ(0, uv_async_send(&async_));
      incoming_message_cond_.Broadcast(scoped_lock);
    }
  }

Post 首先把消息入队,然后通过异步的方式通知 async_,接着看 async_ 的处理函数(在子线程的事件循环里执行):

uv_async_init(loop, &async_, [](uv_async_t* async) {
   // 拿到async对应的上下文
   RequestQueueData* wrapper = node::ContainerOf(&RequestQueueData::async_, async);
   // 执行RequestQueueData的DoDispatch
   wrapper->DoDispatch();
});

回调函数里调用了 wrapper->DoDispatch()

void DoDispatch() {
    for (const auto& request : GetMessages()) {
      request.Dispatch(server_);
    }
}

request 是 RequestToServer 对象。
  void Dispatch(InspectorSocketServer* server) const {
    switch (action_) {
      case TransportAction::kSendMessage:
        server->Send(
            session_id_,
            protocol::StringUtil::StringViewToUtf8(message_->string()));
        break;
    }
  }

接着看 InspectorSocketServerSend

void InspectorSocketServer::Send(int session_id, const std::string& message) {
  SocketSession* session = Session(session_id);
  if (session != nullptr) {
    session->Send(message);
  }
}

session 代表可客户端的一个连接:

void SocketSession::Send(const std::string& message) {
  ws_socket_->Write(message.data(), message.length());
}

接着调用 WebSocket handlerWrite

  void Write(const std::vector<char> data) override {
    std::vector<char> output = encode_frame_hybi17(data);
    WriteRaw(output, WriteRequest::Cleanup);
  }

WriteRaw 是基类 ProtocolHandler 实现的:

int ProtocolHandler::WriteRaw(const std::vector<char>& buffer,
                              uv_write_cb write_cb) 
{
  return tcp_->WriteRaw(buffer, write_cb);
}

最终是通过 TCP 连接返回给客户端:

int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) {
  // Freed in write_request_cleanup
  WriteRequest* wr = new WriteRequest(handler_, buffer);
  uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_);
  int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb);
  if (err < 0)
    delete wr;
  return err < 0;
}

新建一个写请求,socket 可写的时候发送数据给客户端。

4 总结

从以上介绍和分析中,我们了解了 Node.js Inspector 的工作原理和使用。它方便了我们对 Node.js 的调试和问题排查,提高开发效率。通过它可以收集 Node.js 进程的堆快照分析是否有内存泄漏,可以收集 CPU Profile 分析代码的性能瓶颈,从而帮助提高服务的可用性和性能。另外,它支持动态开启,降低了安全风险,同时支持对子线程进行调试,是一个非常强大的工具。

参考内容:1 Debugging Guide 2 inspector 3 开源的 inspector agent 实现 4 inspector 协议文档 5 Debugging Node.js with Chrome DevTools

- END -


继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存