带你从零看清 Node 源码 createServer 和负载均衡整个过程
(给前端大全加星标,提升前端技能)
作者: 前端巅峰 公号 / Peter 谭金杰
写在开头:
作为一名曾经重度使用Node.js作为即时通讯客户端接入层的开发人员,无法避免调试V8,配合开发addon。于是对Node.js源码产生了很大的兴趣~
顺便吐槽一句,Node的内存控制,由于是自动回收,我之前做的产品是20万人超级群的IM产品,像一秒钟1000条消息,持续时间长了内存和CPU占用还是会有一些问题
之前写过cluster模块源码分析、PM2原理等,感觉兴趣的可以去公众号翻一翻
Node.js的js部分源码基本看得差不多了,今天写一个createServer过程给大家,对于不怎么熟悉Node.js源码的朋友来说,可能是一个不错的开始,源码在gitHub上有,直接克隆即可,最近一直比较忙,公司和业余的工作也是,所以原创比较少。
原生Node.js创建一个基本的服务:
var http = require('http');
http.createServer(function (request, response) {
// 发送 HTTP 头部
// HTTP 状态值: 200 : OK
// 内容类型: text/plain
response.writeHead(200, {'Content-Type': 'text/plain'});
// 发送响应数据 "Hello World"
response.end('Hello World\n');
}).listen(8888);
// 终端打印如下信息
console.log('Server running at http://127.0.0.1:8888/');
我们目前只分析Node.js源码的js部分的
首先找到Node.js源码的lib文件夹
然后找到http.js文件
发现createServer真正返回的是new Server,而 Server来自_http_server
于是找到同目录下的_http_server.js文件,发现整个文件有800行的样子,全局搜索Server找到函数
function Server(options, requestListener) {
if (!(this instanceof Server)) return new Server(options, requestListener);
if (typeof options === 'function') {
requestListener = options;
options = {};
} else if (options == null || typeof options === 'object') {
options = { ...options };
} else {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options);
}
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
net.Server.call(this, { allowHalfOpen: true });
if (requestListener) {
this.on('request', requestListener);
}
createServer函数解析:
参数控制有点像redux源码里的initState和reducer,根据传入类型不同,做响应的处理
this.on('request', requestListener);}
每次有请求,就会调用requestListener这个回调函数
至于IncomingMessage和ServerResponse,请求是流,响应也是流,请求是可读流,响应是可写流,当时写那个静态资源服务器时候有提到过
那么怎么可以链式调用?有人可能会有疑惑。Node.js源码遵循commonJs规范,大都挂载在prototype上,所以函数开头有,就是确保可以链式调用
if (!(this instanceof Server))
return new Server(options, requestListener);
上面已经将onrequest事件触发回调函数讲清楚了,那么链式调用listen方法,监听端口是怎么回事呢?
传统的链式调用,像JQ源码是return this , 手动实现A+规范的Promise则是返回一个全新的Promise,然后Promise原型上有then方法,于是可以链式调用
怎么实现.listen链式调用,重点在这行代码:
net.Server.call(this, { allowHalfOpen: true });
allowHalfOpen实验结论: 这里TCP的知识不再做过度的讲解
(1)allowHalfOpen为true,一端发送FIN报文:
进程结束了,那么肯定会发送FIN报文;
进程未结束,不会发送FIN报文
(2)allowHalfOpen为false,一端发送FIN报文:
进程结束了,肯定发送FIN报文;
进程未结束,也会发送FIN报文;
于是找到net.js文件模块中的Server函数
function Server(options, connectionListener) {
if (!(this instanceof Server))
return new Server(options, connectionListener);
EventEmitter.call(this);
if (typeof options === 'function') {
connectionListener = options;
options = {};
this.on('connection', connectionListener);
} else if (options == null || typeof options === 'object') {
options = { ...options };
if (typeof connectionListener === 'function') {
this.on('connection', connectionListener);
}
} else {
throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
}
this._connections = 0;
Object.defineProperty(this, 'connections', {
get: deprecate(() => {
if (this._usingWorkers) {
return null;
}
return this._connections;
}, 'Server.connections property is deprecated. ' +
'Use Server.getConnections method instead.', 'DEP0020'),
set: deprecate((val) => (this._connections = val),
'Server.connections property is deprecated.',
'DEP0020'),
configurable: true, enumerable: false
});
this[async_id_symbol] = -1;
this._handle = null;
this._usingWorkers = false;
this._workers = [];
this._unref = false;
this.allowHalfOpen = options.allowHalfOpen || false;
this.pauseOnConnect = !!options.pauseOnConnect;
}
这里巧妙的通过.call调用net模块Server函数,保证了this指向一致
this._handle = null 这里是因为Node.js考虑到多进程问题,所以会hack掉这个属性,因为.listen方法最终会调用_handle中的方法,多个进程只会启动一个真正进程监听端口,然后负责分发给不同进程,这个后面会讲
Node.js源码的几个特色:
遵循conmonjs规范,很多方法挂载到prototype上了
很多object.definepropoty数据劫持
this指向的修改,配合第一个进行链式调用
自带自定义事件模块,很多内置的函数都继承或通过Object.setPrototypeOf去封装了一些自定义事件
代码模块互相依赖比较多,一个.listen过程就很麻烦,初学代码者很容易睡着
源码学习,本就枯燥。没什么好说的了
我在net.js文件模块中发现了一个原型上.listen的方法:
Server.prototype.listen = function(...args) {
const normalized = normalizeArgs(args);
var options = normalized[0];
const cb = normalized[1];
if (this._handle) {
throw new ERR_SERVER_ALREADY_LISTEN();
}
if (cb !== null) {
this.once('listening', cb);
}
const backlogFromArgs =
// (handle, backlog) or (path, backlog) or (port, backlog)
toNumber(args.length > 1 && args[1]) ||
toNumber(args.length > 2 && args[2]); // (port, host, backlog)
options = options._handle || options.handle || options;
const flags = getFlags(options.ipv6Only);
// (handle[, backlog][, cb]) where handle is an object with a handle
if (options instanceof TCP) {
this._handle = options;
this[async_id_symbol] = this._handle.getAsyncId();
listenInCluster(this, null, -1, -1, backlogFromArgs);
return this;
}
// (handle[, backlog][, cb]) where handle is an object with a fd
if (typeof options.fd === 'number' && options.fd >= 0) {
listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
return this;
}
// ([port][, host][, backlog][, cb]) where port is omitted,
// that is, listen(), listen(null), listen(cb), or listen(null, cb)
// or (options[, cb]) where options.port is explicitly set as undefined or
// null, bind to an arbitrary unused port
if (args.length === 0 || typeof args[0] === 'function' ||
(typeof options.port === 'undefined' && 'port' in options) ||
options.port === null) {
options.port = 0;
}
// ([port][, host][, backlog][, cb]) where port is specified
// or (options[, cb]) where options.port is specified
// or if options.port is normalized as 0 before
var backlog;
if (typeof options.port === 'number' || typeof options.port === 'string') {
if (!isLegalPort(options.port)) {
throw new ERR_SOCKET_BAD_PORT(options.port);
}
backlog = options.backlog || backlogFromArgs;
// start TCP server listening on host:port
if (options.host) {
lookupAndListen(this, options.port | 0, options.host, backlog,
options.exclusive, flags);
} else { // Undefined host, listens on unspecified address
// Default addressType 4 will be used to search for master server
listenInCluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
}
return this;
}
// (path[, backlog][, cb]) or (options[, cb])
// where path or options.path is a UNIX domain socket or Windows pipe
if (options.path && isPipeName(options.path)) {
var pipeName = this._pipeName = options.path;
backlog = options.backlog || backlogFromArgs;
listenInCluster(this, pipeName, -1, -1,
backlog, undefined, options.exclusive);
if (!this._handle) {
// Failed and an error shall be emitted in the next tick.
// Therefore, we directly return.
return this;
}
let mode = 0;
if (options.readableAll === true)
mode |= PipeConstants.UV_READABLE;
if (options.writableAll === true)
mode |= PipeConstants.UV_WRITABLE;
if (mode !== 0) {
const err = this._handle.fchmod(mode);
if (err) {
this._handle.close();
this._handle = null;
throw errnoException(err, 'uv_pipe_chmod');
}
}
return this;
}
if (!(('port' in options) || ('path' in options))) {
throw new ERR_INVALID_ARG_VALUE('options', options,
'must have the property "port" or "path"');
}
throw new ERR_INVALID_OPT_VALUE('options', inspect(options));
};
这个就是我们要找的listen方法,可是里面很多ipv4和ipv6的处理,最重要的方法是listenInCluster
这个函数需要好好看一下,只有几十行
function listenInCluster(server, address, port, addressType,
backlog, fd, exclusive, flags) {
exclusive = !!exclusive;
if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// Get the master's server handle, and listen on it
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
err = checkBindError(err, port, handle);
if (err) {
var ex = exceptionWithHostPort(err, 'bind', address, port);
return server.emit('error', ex);
}
// Reuse master's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
如果是主进程,那么就直接调用_.listen2方法了
Server.prototype._listen2 = setupListenHandle;
找到setupListenHandle函数
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (this._handle) {
debug('setupListenHandle: have a handle already');
} else {
debug('setupListenHandle: create a handle');
var rval = null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available
if (!address && typeof fd !== 'number') {
rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
if (typeof rval === 'number') {
rval = null;
address = DEFAULT_IPV4_ADDR;
addressType = 4;
} else {
address = DEFAULT_IPV6_ADDR;
addressType = 6;
}
}
if (rval === null)
rval = createServerHandle(address, port, addressType, fd, flags);
if (typeof rval === 'number') {
var error = uvExceptionWithHostPort(rval, 'listen', address, port);
process.nextTick(emitErrorNT, this, error);
return;
}
this._handle = rval;
}
this[async_id_symbol] = getNewAsyncId(this._handle);
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
const err = this._handle.listen(backlog || 511);
if (err) {
var ex = uvExceptionWithHostPort(err, 'listen', address, port);
this._handle.close();
this._handle = null;
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitErrorNT,
this,
ex);
return;
}
// Generate connection key, this should be unique to the connection
this._connectionKey = addressType + ':' + address + ':' + port;
// Unref the handle if the server was unref'ed prior to listening
if (this._unref)
this.unref();
defaultTriggerAsyncIdScope(this[async_id_symbol],
process.nextTick,
emitListeningNT,
this);
}
里面的createServerHandle是重点
function createServerHandle(address, port, addressType, fd, flags) {
var err = 0;
// Assign handle in listen, and clean up if bind or listen fails
var handle;
var isTCP = false;
if (typeof fd === 'number' && fd >= 0) {
try {
handle = createHandle(fd, true);
} catch (e) {
// Not a fd we can listen on. This will trigger an error.
debug('listen invalid fd=%d:', fd, e.message);
return UV_EINVAL;
}
err = handle.open(fd);
if (err)
return err;
assert(!address && !port);
} else if (port === -1 && addressType === -1) {
handle = new Pipe(PipeConstants.SERVER);
if (process.platform === 'win32') {
var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES);
if (!Number.isNaN(instances)) {
handle.setPendingInstances(instances);
}
}
} else {
handle = new TCP(TCPConstants.SERVER);
isTCP = true;
}
if (address || port || isTCP) {
debug('bind to', address || 'any');
if (!address) {
// Try binding to ipv6 first
err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
if (err) {
handle.close();
// Fallback to ipv4
return createServerHandle(DEFAULT_IPV4_ADDR, port);
}
} else if (addressType === 6) {
err = handle.bind6(address, port, flags);
} else {
err = handle.bind(address, port);
}
}
if (err) {
handle.close();
return err;
}
return handle;
}
已经可以看到TCP了,离真正的绑定监听端口,更近了一步
最终通过下面的方法绑定监听端口
handle.bind6(address, port, flags);
或者
handle.bind(address, port);
首选ipv6绑定,是因为ipv6可以接受到ipv4的套接字,而ipv4不可以接受ipv6的套接字,当然也有方法可以接收,就是麻烦了一点
上面的内容,请你认真看,因为下面会更复杂,设计到Node.js的多进程负载均衡原理
如果不是主进程,就调用cluster._getServer,找到cluster源码
'use strict';
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
找到_getServer函数源码
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
let address = options.address;
// Resolve unix socket paths to absolute paths
if (options.port < 0 && typeof address === 'string' &&
process.platform !== 'win32')
address = path.resolve(address);
const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
let index = indexes.get(indexesKey);
if (index === undefined)
index = 0;
else
index++;
indexes.set(indexesKey, index);
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
obj.once('listening', () => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = (address && address.port) || options.port;
send(message);
});
};
我们之前传入了三个参数给它,分别是
server,serverQuery,listenOnMasterHandle
这里是比较复杂的,曾经我也在这里迷茫过一段时间,但是想着还是看下去吧。坚持下,大家如果看到这里看不下去了,先休息下,保存着。后面等心情平复了再静下来接下去看
首先我们传入了Server、serverQuery和cb(回调函数listenOnMasterHandle),整个cluster模块的_getServer中最重要的就是:
if (obj._getServerData)
message.data = obj._getServerData();
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
首先我们会先获取server上的data数据,然后调用send函数
function send(message, cb) {
return sendHelper(process, message, null, cb);
}
send函数调用的是cluster模块的utills文件内的函数,传入了一个默认值process
function sendHelper(proc, message, handle, cb) {
if (!proc.connected)
return false;
// Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
message = { cmd: 'NODE_CLUSTER', ...message, seq };
if (typeof cb === 'function')
callbacks.set(seq, cb);
seq += 1;
return proc.send(message, handle);
}
这里要看清楚,我们调用sendHelper传入的第三个参数是null !!!
那么主进程返回也是null
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
所以我们会进入rr函数调用的这个判断,这里调用rr传入的cb就是在net.js模块定义的listenOnMasterHandle函数
Node.js的负载均衡算法是轮询,官方给出的解释是简单粗暴效率高
上面的sendHelper函数就是做到了这点,每次+1
if (typeof cb === 'function')
callbacks.set(seq, cb);
seq += 1;
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
function getsockname(out) {
if (key)
Object.assign(out, message.sockname);
return 0;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle);
}
此时的handle已经被重写,listen方法调用会返回0,不会再占用端口了。所以这样Node.js多个进程也只是一个进程监听端口而已
此时的cb还是net.js模块的setupListenHandle即 - _listen2方法。
官方的注释:
Faux handle. Mimics a TCPWrap with just enough fidelity to get away
仿句柄。以足够的保真度来模拟TCPWrap
觉得本文对你有帮助?请分享给更多人
关注「前端大全」加星标,提升前端技能
好文章,我在看❤️