
带你从零看清 Node 源码 createServer 和负载均衡整个过程

var http = require('http');http.createServer(function (request, response) {// 发送 HTTP 头部 // HTTP 状态值: 200 : OK// 内容类型: text/plainresponse.writeHead(200, {'Content-Type': 'text/plain'});// 发送响应数据 "Hello World"response.end('Hello World\n');}).listen(8888);// 终端打印如下信息console.log('Server running at');




发现createServer真正返回的是new Server,而 Server来自_http_server


function Server(options, requestListener) {if (!(this instanceof Server)) return new Server(options, requestListener);if (typeof options === 'function') { requestListener = options; options = {}; } else if (options == null || typeof options === 'object') { options = { ...options }; } else {throw new ERR_INVALID_ARG_TYPE('options', 'object', options); }this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;this[kServerResponse] = options.ServerResponse || ServerResponse; net.Server.call(this, { allowHalfOpen: true });if (requestListener) {this.on('request', requestListener); }


  • 参数控制有点像redux源码里的initState和reducer,根据传入类型不同,做响应的处理

    this.on('request', requestListener);}
  •  每次有请求,就会调用requestListener这个回调函数

  • 至于IncomingMessage和ServerResponse,请求是流,响应也是流,请求是可读流,响应是可写流,当时写那个静态资源服务器时候有提到过

  • 那么怎么可以链式调用?有人可能会有疑惑。Node.js源码遵循commonJs规范,大都挂载在prototype上,所以函数开头有,就是确保可以链式调用

if (!(this instanceof Server)) return new Server(options, requestListener);


传统的链式调用,像JQ源码是return this , 手动实现A+规范的Promise则是返回一个全新的Promise,然后Promise原型上有then方法,于是可以链式调用


net.Server.call(this, { allowHalfOpen: true });

 allowHalfOpen实验结论: 这里TCP的知识不再做过度的讲解



function Server(options, connectionListener) { if (!(this instanceof Server)) return new Server(options, connectionListener);
if (typeof options === 'function') { connectionListener = options; options = {}; this.on('connection', connectionListener); } else if (options == null || typeof options === 'object') { options = { ...options };
if (typeof connectionListener === 'function') { this.on('connection', connectionListener); } } else { throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); }
this._connections = 0;
Object.defineProperty(this, 'connections', { get: deprecate(() => {
if (this._usingWorkers) { return null; } return this._connections; }, 'Server.connections property is deprecated. ' + 'Use Server.getConnections method instead.', 'DEP0020'), set: deprecate((val) => (this._connections = val), 'Server.connections property is deprecated.', 'DEP0020'), configurable: true, enumerable: false });
this[async_id_symbol] = -1; this._handle = null; this._usingWorkers = false; this._workers = []; this._unref = false;
this.allowHalfOpen = options.allowHalfOpen || false; this.pauseOnConnect = !!options.pauseOnConnect;}


this._handle = null 这里是因为Node.js考虑到多进程问题,所以会hack掉这个属性,因为.listen方法最终会调用_handle中的方法,多个进程只会启动一个真正进程监听端口,然后负责分发给不同进程,这个后面会讲


  1. 遵循conmonjs规范,很多方法挂载到prototype上了

  2. 很多object.definepropoty数据劫持

  3. this指向的修改,配合第一个进行链式调用

  4. 自带自定义事件模块,很多内置的函数都继承或通过Object.setPrototypeOf去封装了一些自定义事件

  5. 代码模块互相依赖比较多,一个.listen过程就很麻烦,初学代码者很容易睡着

  6. 源码学习,本就枯燥。没什么好说的了


Server.prototype.listen = function(...args) { const normalized = normalizeArgs(args); var options = normalized[0]; const cb = normalized[1];
if (this._handle) { throw new ERR_SERVER_ALREADY_LISTEN(); }
if (cb !== null) { this.once('listening', cb); } const backlogFromArgs = // (handle, backlog) or (path, backlog) or (port, backlog) toNumber(args.length > 1 && args[1]) || toNumber(args.length > 2 && args[2]); // (port, host, backlog)
options = options._handle || options.handle || options; const flags = getFlags(options.ipv6Only); // (handle[, backlog][, cb]) where handle is an object with a handle if (options instanceof TCP) { this._handle = options; this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); return this; } // (handle[, backlog][, cb]) where handle is an object with a fd if (typeof options.fd === 'number' && options.fd >= 0) { listenInCluster(this, null, null, null, backlogFromArgs, options.fd); return this; }
// ([port][, host][, backlog][, cb]) where port is omitted, // that is, listen(), listen(null), listen(cb), or listen(null, cb) // or (options[, cb]) where options.port is explicitly set as undefined or // null, bind to an arbitrary unused port if (args.length === 0 || typeof args[0] === 'function' || (typeof options.port === 'undefined' && 'port' in options) || options.port === null) { options.port = 0; } // ([port][, host][, backlog][, cb]) where port is specified // or (options[, cb]) where options.port is specified // or if options.port is normalized as 0 before var backlog; if (typeof options.port === 'number' || typeof options.port === 'string') { if (!isLegalPort(options.port)) { throw new ERR_SOCKET_BAD_PORT(options.port); } backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; }
// (path[, backlog][, cb]) or (options[, cb]) // where path or options.path is a UNIX domain socket or Windows pipe if (options.path && isPipeName(options.path)) { var pipeName = this._pipeName = options.path; backlog = options.backlog || backlogFromArgs; listenInCluster(this, pipeName, -1, -1, backlog, undefined, options.exclusive);
if (!this._handle) { // Failed and an error shall be emitted in the next tick. // Therefore, we directly return. return this; }
let mode = 0; if (options.readableAll === true) mode |= PipeConstants.UV_READABLE; if (options.writableAll === true) mode |= PipeConstants.UV_WRITABLE; if (mode !== 0) { const err = this._handle.fchmod(mode); if (err) { this._handle.close(); this._handle = null; throw errnoException(err, 'uv_pipe_chmod'); } } return this; }
if (!(('port' in options) || ('path' in options))) { throw new ERR_INVALID_ARG_VALUE('options', options, 'must have the property "port" or "path"'); }
throw new ERR_INVALID_OPT_VALUE('options', inspect(options));};



function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; }
const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, };
// Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle);
if (err) { var ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); }
// Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); }}


Server.prototype._listen2 = setupListenHandle;


function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd);
// If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle');
var rval = null;
// Try to bind to the unspecified IPv6 address, see if IPv6 is available if (!address && typeof fd !== 'number') { rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
if (typeof rval === 'number') { rval = null; address = DEFAULT_IPV4_ADDR; addressType = 4; } else { address = DEFAULT_IPV6_ADDR; addressType = 6; } }
if (rval === null) rval = createServerHandle(address, port, addressType, fd, flags);
if (typeof rval === 'number') { var error = uvExceptionWithHostPort(rval, 'listen', address, port); process.nextTick(emitErrorNT, this, error); return; } this._handle = rval; }
this[async_id_symbol] = getNewAsyncId(this._handle); this._handle.onconnection = onconnection; this._handle[owner_symbol] = this;
// Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511);
if (err) { var ex = uvExceptionWithHostPort(err, 'listen', address, port); this._handle.close(); this._handle = null; defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, emitErrorNT, this, ex); return; }
// Generate connection key, this should be unique to the connection this._connectionKey = addressType + ':' + address + ':' + port;
// Unref the handle if the server was unref'ed prior to listening if (this._unref) this.unref();
defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, emitListeningNT, this);}


function createServerHandle(address, port, addressType, fd, flags) { var err = 0; // Assign handle in listen, and clean up if bind or listen fails var handle;
var isTCP = false; if (typeof fd === 'number' && fd >= 0) { try { handle = createHandle(fd, true); } catch (e) { // Not a fd we can listen on. This will trigger an error. debug('listen invalid fd=%d:', fd, e.message); return UV_EINVAL; }
err = handle.open(fd); if (err) return err;
assert(!address && !port); } else if (port === -1 && addressType === -1) { handle = new Pipe(PipeConstants.SERVER); if (process.platform === 'win32') { var instances = parseInt(process.env.NODE_PENDING_PIPE_INSTANCES); if (!Number.isNaN(instances)) { handle.setPendingInstances(instances); } } } else { handle = new TCP(TCPConstants.SERVER); isTCP = true; }
if (address || port || isTCP) { debug('bind to', address || 'any'); if (!address) { // Try binding to ipv6 first err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags); if (err) { handle.close(); // Fallback to ipv4 return createServerHandle(DEFAULT_IPV4_ADDR, port); } } else if (addressType === 6) { err = handle.bind6(address, port, flags); } else { err = handle.bind(address, port); } }
if (err) { handle.close(); return err; }
return handle;}



handle.bind6(address, port, flags); 或者 handle.bind(address, port);




'use strict';
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';module.exports = require(`internal/cluster/${childOrMaster}`);


// `obj` is a net#Server or a dgram#Socket object.cluster._getServer = function(obj, options, cb) { let address = options.address;
// Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address);
const indexesKey = [address, options.port, options.addressType, options.fd ].join(':');
let index = indexes.get(indexesKey);
if (index === undefined) index = 0; else index++;
indexes.set(indexesKey, index);
const message = { act: 'queryServer', index, data: null, ...options };
message.address = address;
// Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData();
send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data);
if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. });
obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); });};





if (obj._getServerData) message.data = obj._getServerData();
send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data);
if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. });


function send(message, cb) { return sendHelper(process, message, null, cb);}


function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false;
// Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = { cmd: 'NODE_CLUSTER', ...message, seq };
if (typeof cb === 'function') callbacks.set(seq, cb);
seq += 1; return proc.send(message, handle);}

这里要看清楚,我们调用sendHelper传入的第三个参数是null  !!!


send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data);
if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. });




if (typeof cb === 'function') callbacks.set(seq, cb);
seq += 1;

function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null);
var key = message.key;
function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; }
function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return;
send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; }
function getsockname(out) { if (key) Object.assign(out, message.sockname);
return 0; }
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. }
assert(handles.has(key) === false); handles.set(key, handle); cb(0, handle);}


此时的cb还是net.js模块的setupListenHandle即 -  _listen2方法


Faux handle. Mimics a TCPWrap with just enough fidelity to get away

