深入理解 Node.js 的 Inspector
Node.js
提供的Inspector
非常强大,不仅可以用来调试Node.js
代码,还可以实时收集Node.js
进程的Heap Snapshot
、Cpu Profile
等数据,同时支持静态、动态开启,是一个非常强大的工具,也是我们调试和诊断Node.js
进程非常好的方式。本文从使用和原理详细讲解Node.js
的Inspector
。
Node.js
的文档中对 Inspector
的描述很少,但是如果深入探索,其实里面的内容还是挺多的。我们先看一下 Inspector
的使用。
1 Inspector 的使用
1.1 本地调试
我们先从一个例子开始,下面是一个简单的 HTTP 服务器。
const http = require('http');
http.createServer((req, res) => {
res.end('ok');
}).listen(80);
然后我们以 node --inspect httpServer.js
的方式启动。我们可以看到以下输出。
Debugger listening on ws://127.0.0.1:9229/fbbd9d8f-e088-48cc-b1e0-e16bfe58db44
For help, see: https://nodejs.org/en/docs/inspector
9229
端口是 Node.js
默认选择的端口,当然我们也可以自定义,具体可参考 Node.js
官方文档。这时候我们去浏览器打开开发者工具,菜单栏多了一个调试 Node.js 的按钮。
点击这个按钮。我们可以看到以下界面(点击切换到 Sources Tab)。
我们可以选择某一行代码打断点,比如我在第三行,这时候我们访问 80
端口,开发者工具就会停留在断点处。这时候我们可以看到一些执行上下文。
1.2 远程调试
但很多时候我们可能需要远程调试。比如我在一台云服务器上部署以上服务器代码。然后执行
node --inspect=0.0.0.0:8888 httpServer.js
我们打开开发者工具发现按钮置灰或者找不到我们远程服务器的信息。这时候我们需要用另一种方式,通过在浏览器url输入框输入:
devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws={host}:{port}/{path}
的方式(替换 {}
里面的内容为你执行 Node.js
时输出的信息),浏览器就会去连接指定的地址,比如执行上面的命令输出的是 ws://0.0.0.0:8888/f6e42278-d915-48dc-af4d-453a23d330ab
,假设公网IP是 1.1.1.1。那么最后浏览器url输入框里就填入 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=1.1.1.1:8888/f6e42278-d915-48dc-af4d-453a23d330ab
就可以开始调试了,这种方式比较适合于常用的场景。
1.3 自动探测
如果是我们自己调试的话,1.2 这种方式看起来就有点麻烦,我们可以使用浏览器提供的自动探测功能。
URL 输入框输入 chrome://inspect/#devices
我们会看到以下界面
点击 configure
按钮,在弹出的弹框里输入你远程服务器的地址
配置完毕后,我们会看到界面变成这样了(或者打开新的 Tab,我们看到开发者工具的调试按钮也变亮了)。
这时候我们点击 inspect
按钮、Open dedicated DevTools for Node
按钮或者打开新 Tab 的开发者工具,就可以开始调试,而且还可以调试Node.js
的原生 JS 模块。
1.4 收集数据
V8 Inspector
是一个非常强大的工具,调试只是它其中一个能力,他还可以获取 Heap Snapshot
、CPU Profile
等数据,具体能力请参考文章后面列出的指令文档和 Chrome Dev Tools
。
收集 Cpu Profile 信息
获取 Heap Snapshop
1.5 动态开启 Inspector
默认打开 Inspector
能力是不安全的,这意味着能连上服务器的客户端都能通过协议控制 Node.js 进程(虽然 URL 并不容易猜对),通常我们是在 Node.js 进程出现问题的时候,动态开启 Inspector,我们看一下下面的例子。
const inspector = require('inspector');
const http = require('http');
let isOpend = false;
function getHTML() {
return `<html>
<meta charset="utf-8" />
<body>
复制到新 Tab 打开该 URL 开始调试 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=${inspector.url().replace("ws://", '')}
</body>
</html>`;
}
http.createServer((req, res) => {
if (req.url == '/debug/open') {
// 还没开启则开启
if (!isOpend) {
isOpend = true;
// 打开调试器
inspector.open();
}
// 返回给前端的内容
const html = getHTML() ;
res.end(html);
} else if (req.url == '/debug/close') {
// 如果开启了则关闭
if (isOpend) {
inspector.close();
isOpend = false;
}
res.end('ok');
} else {
res.end('ok');
}
}).listen(80);
当我们需要调试的时候,通过访问 /debug/open 打开调试器。前端界面可以看到以下输出。
复制到新 Tab 打开该 URL 开始调试 devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=127.0.0.1:9229/9efd4c80-956a-4422-b23c-4348e6613304
接着新开一个 Tab,然后复制上面的 URL,粘贴到浏览器 URL 地址栏访问,我们就可以看到调试页面。
然后打个断点,接着新开一个 Tab 访问 http://localhost
就可以进入调试,调试完成后访问 /debug/close 关闭调试器。浏览器界面就会显示断开连接了。
以上方式支持调试和收集数据,如果我们只是需要收集数据,还有另一种动态开启 Inspector
的方式
const http = require('http');
const inspector = require('inspector');
const fs = require('fs');
function getCpuprofile(req, res) {
// 打开一个和 V8 Inspector 的会话
const session = new inspector.Session();
session.connect();
// 向V8 Inspector 提交命令,开启 Cpu Profile 并收集数据
session.post('Profiler.enable', () => {
session.post('Profiler.start', () => {
// 收集一段时间后提交停止收集命令
setTimeout(() => {
session.post('Profiler.stop', (err, { profile }) => {
// 把数据写入文件
if (!err) {
fs.writeFileSync('./profile.cpuprofile', JSON.stringify(profile));
}
// 断开会话
session.disconnect();
// 回复客户端
res.end('ok');
});
}, 3000)
});
});
}
http.createServer((req, res) => {
if (req.url == '/debug/getCpuprofile') {
getCpuprofile(req, res);
} else {
res.end('ok');
}
}).listen(80);
我们可以通过 Inspector Session
的能力,实时和 V8 Inspector
交互而不需要启动一个 WebSocket
服务。本地调试时还可以在 VSCode
里点击 Profile
文件直接看到效果。
2 Inspector 调试的原理
下面以通过 URL 的方式调试(可以看到 Network ),来看看调试的时候都发生了什么,浏览器和远程服务器建立连接后,是通过 WebSocket 协议通信的,下面是一次通信的信息。
我们看一下这命令是什么意思(具体可以参考 Inspector 协议文档)。
Debugger.scriptParsed # Fired when virtual machine parses script. This event is also fired for all known and uncollected scripts upon enabling debugger.
从说明中我们看到,当 V8 解析脚本的时候就会触发这个事件,告诉浏览器相关的信息。
我们发现返回的都是一些元数据,没有脚本的具体代码内容,这时候浏览器会再次发起请求(点击对应脚本对应的 JS 文件时),
我们看到这个脚本的 scriptId 是 103。所以请求里带了这个 scriptId。对应的请求 id 是 11。接着看一下响应。
至此,我们了解了获取脚本内容的过程,然后我们看看调试的时候是怎样的过程。当我们在浏览器上点击某一行设置断点的时候,浏览器就会发送一个请求。
这个命令的意义顾名思义,我们看一下具体定义:
Debugger.setBreakpointByUrl # Sets JavaScript breakpoint at given location specified either by URL or URL regex. Once this command is issued, all existing parsed scripts will have breakpoints resolved and returned in locations property. Further matching script parsing will result in subsequent breakpointResolved events issued. This logical breakpoint will survive page reloads.
接着服务返回响应。
这时候我们从另外一个 Tab 访问 80 端口,服务器就会在我们设置的断点处停留,并且通知浏览器。
我们看一下这个命令的意思。
这个命令就是当服务器执行到断点时通知浏览器,并且返回执行的一些上下文,比如执行到哪个断点停留了。这时候浏览器侧也会停留在对应的地方,当我们 hover 某个变量时,就会看到对应的上下文。这些都是通过具体的命令获取的数据。就不一一分析了。
3 Node.js Inspector 的实现
大致了解了浏览器和服务器的交互过程和协议后,我们再来深入了解一下关于 Inspector 的一些实现。当然这里不是分析 V8 中 Inspector 的实现,而是分析如何使用 V8 的 Inspector 以及 Node.js 中关于 Inspector 的实现部分。
当我们以以下方式执行应用时
node --inspect app.js
3.1 初始化
Node.js 在启动的过程中,就会初始化 Inspector 相关的逻辑。
inspector_agent_ = std::make_unique<inspector::Agent>(this);
Agent 是负责和 V8 Inspector
通信的对象,创建完后接着执行 env->InitializeInspector({})
启动 Agent
。
inspector_agent_->Start(...);
Start 继续执行 Agent::StartIoThread
。
bool Agent::StartIoThread() {
io_ = InspectorIo::Start(client_->getThreadHandle(), ...);
return true;
}
StartIoThread
中的 client_->getThreadHandle()
是重要的逻辑,我们先来分析该函数。
std::shared_ptr<MainThreadHandle> getThreadHandle() {
if (!interface_) {
interface_ = std::make_shared<MainThreadInterface>(env_->inspector_agent(), ...);
}
return interface_->GetHandle();
}
getThreadHandle
首先创建来一个 MainThreadInterface
对象,接着又调用了他的 GetHandle 方法,我们看一下该方法的逻辑。
std::shared_ptr<MainThreadHandle> MainThreadInterface::GetHandle() {
if (handle_ == nullptr)
handle_ = std::make_shared<MainThreadHandle>(this);
return handle_;
}
GetHandle
了创建了一个 MainThreadHandle
对象,最终结构如下所示。
分析完后我们继续看 Agent::StartIoThread
中 InspectorIo::Start
的逻辑。
std::unique_ptr<InspectorIo> InspectorIo::Start(std::shared_ptr<MainThreadHandle> main_thread, ...) {
auto io = std::unique_ptr<InspectorIo>(new InspectorIo(main_thread, ...));
return io;
}
InspectorIo::Star
里新建了一个 InspectorIo
对象,我们看看 InspectorIo
构造函数的逻辑。
InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread, ...)
:
// 初始化 main_thread_
main_thread_(main_thread)) {
// 新建一个子线程,子线程中执行 InspectorIo::ThreadMain
uv_thread_create(&thread_, InspectorIo::ThreadMain, this);
}
这时候结构如下:
InspectorIo
创建了一个子线程, Inspector
在子线程里启动的原因主要有两个。
如果在主线程里运行,那么当我们断点调试的时候, Node.js
主线程就会被停住,也就无法处理客户端发过来的调试指令。如果主线程陷入死循环,我们就无法实时抓取进程的 Profile
数据来分析原因。
接着继续看一下子线程里执行 InspectorIo::ThreadMain
的逻辑:
void InspectorIo::ThreadMain(void* io) {
static_cast<InspectorIo*>(io)->ThreadMain();
}
void InspectorIo::ThreadMain() {
uv_loop_t loop;
loop.data = nullptr;
// 在子线程开启一个新的事件循环
int err = uv_loop_init(&loop);
std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop), ...);
// 新建一个 delegate,用于处理请求
std::unique_ptr<InspectorIoDelegate> delegate(
new InspectorIoDelegate(queue, main_thread_, ...)
);
InspectorSocketServer server(std::move(delegate), ...);
server.Start();
// 进入事件循环
uv_run(&loop, UV_RUN_DEFAULT);
}
ThreadMain
主要有三个逻辑:
创建一个 delegate 对象,该对象是核心的对象,后面我们会看到有什么作用。 创建一个服务器并启动。 开启事件循环。
接下来看一下服务器的逻辑,首先看一下创建服务器的逻辑:
InspectorSocketServer::InspectorSocketServer(std::unique_ptr<SocketServerDelegate> delegate, ...)
: // 保存 delegate
delegate_(std::move(delegate)),
// 初始化 sessionId
next_session_id_(0) {
// 设置 delegate 的 server 为当前服务器
delegate_->AssignServer(this);
}
执行完后形成以下结构:
接着我们看启动服务器的逻辑:
bool InspectorSocketServer::Start() {
// DNS 解析,比如输入的是localhost
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_NUMERICSERV;
hints.ai_socktype = SOCK_STREAM;
uv_getaddrinfo_t req;
const std::string port_string = std::to_string(port_);
uv_getaddrinfo(loop_, &req, nullptr, host_.c_str(),
port_string.c_str(), &hints);
// 监听解析到的 IP 列表
for (addrinfo* address = req.addrinfo;
address != nullptr;
address = address->ai_next) {
auto server_socket = ServerSocketPtr(new ServerSocket(this));
err = server_socket->Listen(address->ai_addr, loop_);
if (err == 0)
server_sockets_.push_back(std::move(server_socket));
}
return true;
}
首先根据参数做 DNS
解析,然后根据拿到的 IP
列表(通常是一个),创建对应个数的 ServerSocket
对象,并执行它的 Listen
方法。ServerSocket
表示一个监听 socket
,看一下 ServerSocket
的构造函数:
ServerSocket(InspectorSocketServer* server) :
tcp_socket_(uv_tcp_t()), server_(server) {}
执行完后结构如下:
接着看一下 ServerSocket
的 Listen
方法:
int ServerSocket::Listen(sockaddr* addr, uv_loop_t* loop) {
uv_tcp_t* server = &tcp_socket_;
uv_tcp_init(loop, server)
uv_tcp_bind(server, addr, 0);
uv_listen(reinterpret_cast<uv_stream_t*>(server),
511,
ServerSocket::SocketConnectedCallback);
}
Listen
调用 Libuv
的接口完成服务器的启动。至此,Inspector
提供的 Weboscket
服务器启动了。
3.2 处理连接
从刚才分析中可以看到,当有连接到来时执行回调 ServerSocket::SocketConnectedCallback
。
void ServerSocket::SocketConnectedCallback(uv_stream_t* tcp_socket,
int status) {
if (status == 0) {
// 根据 Libuv handle 找到对应的 ServerSocket 对象
ServerSocket* server_socket = ServerSocket::FromTcpSocket(tcp_socket);
// Socket 对象的 server_ 字段保存了所在的 InspectorSocketServer
server_socket->server_->Accept(server_socket->port_, tcp_socket);
}
}
接着看 InspectorSocketServer
的 Accept
是如何处理连接的:
void InspectorSocketServer::Accept(int server_port,
uv_stream_t* server_socket) {
std::unique_ptr<SocketSession> session(
new SocketSession(this, next_session_id_++, server_port)
);
InspectorSocket::DelegatePointer delegate =
InspectorSocket::DelegatePointer(
new SocketSession::Delegate(this, session->id())
);
InspectorSocket::Pointer inspector =
InspectorSocket::Accept(server_socket, std::move(delegate));
if (inspector) {
session->Own(std::move(inspector));
connected_sessions_[session->id()].second = std::move(session);
}
}
Accept
的首先创建里一个 SocketSession
和 SocketSession::Delegate
对象。然后调用 InspectorSocket::Accept
,从代码中可以看到 InspectorSocket::Accept
会返回一个 InspectorSocket
对象。InspectorSocket
是对通信 socket
的封装(和客户端通信的 socket
,区别于服务器的监听 socket
)。然后记录 session
对象对应的 InspectorSocket
对象,同时记录 sessionId
和 session
的映射关系。结构如下图所示:
接着看一下 InspectorSocket::Accept
返回 InspectorSocket
的逻辑:
InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server,
DelegatePointer delegate) {
auto tcp = TcpHolder::Accept(server, std::move(delegate));
InspectorSocket* inspector = new InspectorSocket();
inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp)));
return InspectorSocket::Pointer(inspector);
}
InspectorSocket::Accept
的代码不多,但是逻辑还是挺多的:
InspectorSocket::Accept
再次调用TcpHolder::Accept
获取一个TcpHolder
对象。
TcpHolder::Pointer TcpHolder::Accept(
uv_stream_t* server,
InspectorSocket::DelegatePointer delegate) {
// 新建一个 TcpHolder 对象,TcpHolder 是对 uv_tcp_t 和 delegate 的封装
TcpHolder* result = new TcpHolder(std::move(delegate));
// 拿到 TcpHolder 对象的 uv_tcp_t 结构体
uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_);
// 初始化
int err = uv_tcp_init(server->loop, &result->tcp_);
// 摘取一个 TCP 连接对应的 fd 保存到 TcpHolder 的 uv_tcp_t 结构体中(即第二个参数的 tcp 字段)
uv_accept(server, tcp);
// 注册等待可读事件,有数据时执行 OnDataReceivedCb 回调
uv_read_start(tcp, allocate_buffer, OnDataReceivedCb);
return TcpHolder::Pointer(result);
}
新建一个 HttpHandler
对象:
explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
: ProtocolHandler(inspector, std::move(tcp)){
llhttp_init(&parser_, HTTP_REQUEST, &parser_settings);
llhttp_settings_init(&parser_settings);
parser_settings.on_header_field = OnHeaderField;
// ...
}
ProtocolHandler::ProtocolHandler(InspectorSocket* inspector,
TcpHolder::Pointer tcp)
: inspector_(inspector), tcp_(std::move(tcp)) {
// 设置 TCP 数据的 handler,TCP 是只负责传输,数据的解析交给 handler 处理
tcp_->SetHandler(this);
}
HttpHandler
是对 TcpHolder
的封装,主要通过 HTTP 解析器 llhttp
对 HTTP 协议进行解析。
调用 inspector->SwitchProtocol() 切换当前协议处理器为 HTTP,建立 TCP 连接后,首先要经过一个 HTTP 请求从 HTTP 协议升级到 WebSocket 协议,升级成功后就使用 Websocket 协议进行通信.
我们看一下这时候的结构图:
至此,就完成了连接处理的分析!(撒花,你学废了么)
3.3 协议升级
完成了 TCP 连接的处理后,接下来要完成协议升级,因为 Inspector
是通过 WebSocket
协议和客户端通信的,所以需要通过一个 HTTP 请求来完成 HTTP 到 WebSocekt
协议的升级。从刚才的分析中看当有数据到来时会执行 OnDataReceivedCb
回调:
void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread,
const uv_buf_t* buf) {
TcpHolder* holder = From(tcp);
holder->ReclaimUvBuf(buf, nread);
// 调用 handler 的 onData,目前 handler 是 HTTP 协议
holder->handler_->OnData(&holder->buffer);
}
TCP 层收到数据后交给应用层解析,直接调用上层的 OnData 回调。
void OnData(std::vector<char>* data) override {
// 解析 HTTP 协议
llhttp_execute(&parser_, data->data(), data->size());
// 解析完并且是升级协议的请求则调用 delegate 的回调 OnSocketUpgrade
delegate()->OnSocketUpgrade(event.host, event.path, event.ws_key);
}
OnData
可能会被多次回调,并通过 llhttp_execute
解析收到的 HTTP 报文,当发现是一个协议升级的请求后,就调用 OnSocketUpgrade
回调。delegate
是一个 SocketSession::Delegate
对象。来看一下该对象的 OnSocketUpgrade 方法:
void SocketSession::Delegate::OnSocketUpgrade(const std::string& host,
const std::string& path,
const std::string& ws_key) {
std::string id = path.empty() ? path : path.substr(1);
server_->SessionStarted(session_id_, id, ws_key);
}
OnSocketUpgrade
又调用了 server_
(InspectorSocketServer
对象)的 SessionStarted
:
void InspectorSocketServer::SessionStarted(int session_id,
const std::string& id,
const std::string& ws_key) {
// 找到对应的 session 对象
SocketSession* session = Session(session_id);
connected_sessions_[session_id].first = id;
session->Accept(ws_key);
delegate_->StartSession(session_id, id);
}
首先通过 session_id
找到建立 TCP 连接时分配的 SocketSession
对象:
执行 session->Accept(ws_key) 回复客户端同意协议升级:
void Accept(const std::string& ws_key) {
ws_socket_->AcceptUpgrade(ws_key);
}
从结构图我们可以看到 ws_socket_
是一个 InspectorSocket
对象:
void AcceptUpgrade(const std::string& accept_key) override {
char accept_string[ACCEPT_KEY_LENGTH];
generate_accept_string(accept_key, &accept_string);
const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: ";
// ...
// 回复 101 给客户端
WriteRaw(reply, WriteRequest::Cleanup);
// 切换 handler 为 WebSocket handler
inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_)));
}
AcceptUpgradeh
首先回复客户端 101 表示同意升级到 WebSocket
协议,然后切换数据处理器为 WsHandler
,即后续的数据按照 WebSocket
协议处理。
执行 delegate_->StartSession(session_id, id)
建立和V8 Inspector
的会话。delegate_
是InspectorIoDelegate
对象:
void InspectorIoDelegate::StartSession(int session_id,
const std::string& target_id) {
auto session = main_thread_->Connect(
std::unique_ptr<InspectorSessionDelegate>(
new IoSessionDelegate(request_queue_->handle(), session_id)
),
true);
if (session) {
sessions_[session_id] = std::move(session);
fprintf(stderr, "Debugger attached.\n");
}
}
首先通过 main_thread_->Connect
拿到一个 session
,并在 InspectorIoDelegate
中记录映射关系。结构图如下:
接下来看一下 main_thread_->Connect
的逻辑(main_thread_
是 MainThreadHandle
对象):
std::unique_ptr<InspectorSession> MainThreadHandle::Connect(
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
return std::unique_ptr<InspectorSession>(
new CrossThreadInspectorSession(++next_session_id_,
shared_from_this(),
std::move(delegate),
prevent_shutdown));
}
Connect
函数新建了一个 CrossThreadInspectorSession
对象。CrossThreadInspectorSession
构造函数如下:
CrossThreadInspectorSession(...) {
// 执行 MainThreadSessionState::Connect
state_.Call(&MainThreadSessionState::Connect, std::move(delegate));
}
继续看 MainThreadSessionState::Connect
:
void Connect(std::unique_ptr<InspectorSessionDelegate> delegate) {
Agent* agent = thread_->inspector_agent();
session_ = agent->Connect(std::move(delegate), prevent_shutdown_);
}
继续调 agent->Connect
:
std::unique_ptr<InspectorSession> Agent::Connect(
std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
int session_id = client_->connectFrontend(std::move(delegate),
prevent_shutdown);
return std::unique_ptr<InspectorSession>(
new SameThreadInspectorSession(session_id, client_));
}
继续调 connectFrontend
:
int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate,
bool prevent_shutdown) {
int session_id = next_session_id_++;
channels_[session_id] = std::make_unique<ChannelImpl>(env_,
client_,
getWorkerManager(),
std::move(delegate),
getThreadHandle(),
prevent_shutdown);
return session_id;
}
connectFrontend
创建了一个 ChannelImpl
并且在 channels_
中保存了映射关系。看看 ChannelImpl
的构造函数:
explicit ChannelImpl(Environment* env,
const std::unique_ptr<V8Inspector>& inspector,
std::unique_ptr<InspectorSessionDelegate> delegate, ...)
: delegate_(std::move(delegate)) {
session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView());
}
ChannelImpl
调用 inspector->connect
建立了一个和 V8 Inspector
的会话。结构图大致如下:
客户端到 Node.js
到 V8 Inspector
的整体架构如下:
3.4 客户端到 V8 Inspector 的数据处理
TCP 连接建立了,协议升级也完成了,接下来就可以开始处理业务数据。从前面的分析中我们已经知道数据到来时会执行 TcpHoldler
的 handler_->OnData
回调。因为已经完成了协议升级,所以这时候的 handler
变成了 WeSocket handler
:
void OnData(std::vector<char>* data) override
int processed = 0;
do {
processed = ParseWsFrames(*data);
// ...
} while (processed > 0 && !data->empty());
}
OnData
通过 ParseWsFrames
解析 WebSocket
协议:
int ParseWsFrames(const std::vector<char>& buffer) {
int bytes_consumed = 0;
std::vector<char> output;
bool compressed = false;
// 解析WebSocket协议
ws_decode_result r = decode_frame_hybi17(buffer,
true /* client_frame */,
&bytes_consumed, &output,
&compressed);
// 执行delegate的回调
delegate()->OnWsFrame(output);
return bytes_consumed;
}
前面已经分析过 delegate
是 TcpHoldler
的 delegate
,即 SocketSession::Delegate
对象:
void SocketSession::Delegate::OnWsFrame(const std::vector<char>& data) {
server_->MessageReceived(session_id_,
std::string(data.data(),
data.size()));
}
继续回调 server_->MessageReceived
。从结构图可以看到 server_
是 InspectorSocketServer
对象:
void MessageReceived(int session_id, const std::string& message) {
delegate_->MessageReceived(session_id, message);
}
继续回调 delegate_->MessageReceived
,InspectorSocketServer
的 delegate_
是 InspectorIoDelegate
对象:
void InspectorIoDelegate::MessageReceived(int session_id,
const std::string& message) {
auto session = sessions_.find(session_id);
if (session != sessions_.end())
session->second->Dispatch(Utf8ToStringView(message)->string());
}
首先通过 session_id
找到对应的 session
。session
是一个 CrossThreadInspectorSession
对象。看看他的 Dispatch
方法:
void Dispatch(const StringView& message) override {
state_.Call(&MainThreadSessionState::Dispatch,
StringBuffer::create(message));
}
执行 MainThreadSessionState::Dispatch
:
void Dispatch(std::unique_ptr<StringBuffer> message) {
session_->Dispatch(message->string());
}
session_
是 SameThreadInspectorSession
对象:
void SameThreadInspectorSession::Dispatch(
const v8_inspector::StringView& message) {
auto client = client_.lock();
if (client)
client->dispatchMessageFromFrontend(session_id_, message);
}
继续调 client->dispatchMessageFromFrontend
:
void dispatchMessageFromFrontend(int session_id, const StringView& message) {
channels_[session_id]->dispatchProtocolMessage(message);
}
通过 session_id
找到对应的 ChannelImpl
,继续调 ChannelImpl
的 dispatchProtocolMessage
:
voiddispatchProtocolMessage(const StringView& message) {
session_->dispatchProtocolMessage(message);
}
最终调用和 V8 Inspector
的会话对象把数据发送给 V8。至此客户端到 V8 Inspector
的通信过程就完成了。
3.5 V8 Inspector 到客户端的数据处理
接着看从 V8 inspector
到客户端的数据传递逻辑。V8 inspector
是通过 channel
的 sendResponse
函数把数据传递给客户端的:
void sendResponse(
int callId,
std::unique_ptr<v8_inspector::StringBuffer> message) override {
sendMessageToFrontend(message->string());
}
void sendMessageToFrontend(const StringView& message) {
delegate_->SendMessageToFrontend(message);
}
delegate_
是 IoSessionDelegate
对象:
void SendMessageToFrontend(const v8_inspector::StringView& message) override {
request_queue_->Post(id_, TransportAction::kSendMessage,
StringBuffer::create(message));
}
request_queue_ 是 RequestQueueData 对象。
void Post(int session_id,
TransportAction action,
std::unique_ptr<StringBuffer> message) {
Mutex::ScopedLock scoped_lock(state_lock_);
bool notify = messages_.empty();
// 消息入队
messages_.emplace_back(action, session_id, std::move(message));
if (notify) {
CHECK_EQ(0, uv_async_send(&async_));
incoming_message_cond_.Broadcast(scoped_lock);
}
}
Post
首先把消息入队,然后通过异步的方式通知 async_
,接着看 async_
的处理函数(在子线程的事件循环里执行):
uv_async_init(loop, &async_, [](uv_async_t* async) {
// 拿到async对应的上下文
RequestQueueData* wrapper = node::ContainerOf(&RequestQueueData::async_, async);
// 执行RequestQueueData的DoDispatch
wrapper->DoDispatch();
});
回调函数里调用了 wrapper->DoDispatch()
:
void DoDispatch() {
for (const auto& request : GetMessages()) {
request.Dispatch(server_);
}
}
request 是 RequestToServer 对象。
void Dispatch(InspectorSocketServer* server) const {
switch (action_) {
case TransportAction::kSendMessage:
server->Send(
session_id_,
protocol::StringUtil::StringViewToUtf8(message_->string()));
break;
}
}
接着看 InspectorSocketServer
的 Send
:
void InspectorSocketServer::Send(int session_id, const std::string& message) {
SocketSession* session = Session(session_id);
if (session != nullptr) {
session->Send(message);
}
}
session
代表可客户端的一个连接:
void SocketSession::Send(const std::string& message) {
ws_socket_->Write(message.data(), message.length());
}
接着调用 WebSocket handler
的 Write
:
void Write(const std::vector<char> data) override {
std::vector<char> output = encode_frame_hybi17(data);
WriteRaw(output, WriteRequest::Cleanup);
}
WriteRaw
是基类 ProtocolHandler
实现的:
int ProtocolHandler::WriteRaw(const std::vector<char>& buffer,
uv_write_cb write_cb) {
return tcp_->WriteRaw(buffer, write_cb);
}
最终是通过 TCP 连接返回给客户端:
int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) {
// Freed in write_request_cleanup
WriteRequest* wr = new WriteRequest(handler_, buffer);
uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_);
int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb);
if (err < 0)
delete wr;
return err < 0;
}
新建一个写请求,socket
可写的时候发送数据给客户端。
4 总结
从以上介绍和分析中,我们了解了 Node.js Inspector
的工作原理和使用。它方便了我们对 Node.js
的调试和问题排查,提高开发效率。通过它可以收集 Node.js
进程的堆快照分析是否有内存泄漏,可以收集 CPU Profile
分析代码的性能瓶颈,从而帮助提高服务的可用性和性能。另外,它支持动态开启,降低了安全风险,同时支持对子线程进行调试,是一个非常强大的工具。
参考内容:1 Debugging Guide 2 inspector 3 开源的 inspector agent 实现 4 inspector 协议文档 5 Debugging Node.js with Chrome DevTools
- END -