查看原文
其他

入门 Node.js Net 模块构建 TCP 网络服务

五月君 Nodejs技术栈 2022-06-19

愿每次回忆,对生活都不感到负疚。——郭小川

想做一个简单的 Web API,这个时候就需要搭建一个 Web 服务器,在 ASP.NET 中需要 IIS 来搭建服务器,PHP 中需要借助 Apache/Nginx 来实现,对于新手在还没开始之前看到这么多步骤,也许就要放弃了,但是在 Node.js 中开启一个 Web 服务器是 So Easy 的,我们利用 Net、Dgram、HTTP、HTTPS 等模块通过几行简单的代码就可实现。

作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注公众号 Nodejs技术栈 和 Github 开源项目 https://www.nodejs.red

面试指南

  • 什么是 TCP 协议?什么情况下又会选择 TCP 协议呢?参考正文 Interview1

  • TCP 粘包是什么?该怎么解决?参考正文 Interview2

网络模型

大多数同学对于 HTTP、HTTPS 会很熟悉,通常用于浏览器与服务端交互,或者服务端与服务端的交互,另外两个 Net 与 Dgram 也许会相对陌生,这两个是基于网络模型的传输层来实现的,分别对应于 TCP、UDP 协议,下面一图看明白 OSI 七层模型 与 TCP/IP 五层模型之间的关系,中间使用虚线标注了传输层,对于上层应用层(HTTP/HTTPS等)也都是基于这一层的 TCP 协议来实现的,所以想使用 Node.js 做服务端开发,Net 模块也是你必须要掌握的,这也是我们本篇要讲解的重点。

初识 TCP 协议

Interview1: 有些概念还是要弄清楚的,什么是 TCP 协议?什么情况下又会选择 TCP 协议呢?

TCP 是传输控制协议,大多数情况下我们都会使用这个协议,因为它是一个更可靠的数据传输协议,具有如下三个特点:

  • 面向链接: 需要对方主机在线,并建立链接。

  • 面向字节流: 你给我一堆字节流的数据,我给你发送出去,但是每次发送多少是我说了算,每次选出一段字节发送的时候,都会带上一个序号,这个序号就是发送的这段字节中编号最小的字节的编号。

  • 可靠: 保证数据有序的到达对方主机,每发送一个数据就会期待收到对方的回复,如果在指定时间内收到了对方的回复,就确认为数据到达,如果超过一定时间没收到对方回复,就认为对方没收到,在重新发送一遍。

上面三个特点说到 TCP 是面向链接和可靠的,其一个显著特征是在传输之前会有一个 3 次握手,实现过程如下所示:

在一次 TCP 三次握手的过程中,客户端与服务端会分别提供一个套接字来形成一个链接。之后客户端与服务端通过这个链接来互相发送数据。

Net 模块构建一个 TCP 服务

以上了解了 TCP 的一些概念之后,我们开始创建一个 TCP 服务端与客户端实例,这里我们需要使用 Node.js 的 Net 模块,它提供了一些用于底层通信的接口,该模块可以用于创建基于流的 TCP 或 IPC 的服务器(net.createServer())与客户端(net.createConnection())。

创建 TCP 服务

可以使用 new net.Server 创建一个 TCP 服务端链接,也可以通过工厂函数 net.createServer() 的方式,createServer() 的内部实现也是内部调用了 Server 构造函数来创建一个 TCP 对象,和 new net.Server 是一样的,代码如下所示:

https://github.com/nodejs/node/blob/v12.x/lib/net.js#L145

  1. function createServer(options, connectionListener) {

  2. return new Server(options, connectionListener);

  3. }

https://github.com/nodejs/node/blob/v12.x/lib/net.js#L1142

  1. function Server(options, connectionListener) {

  2. if (!(this instanceof Server))

  3. return new Server(options, connectionListener);


  4. // Server 类内部还是继承了 EventEmitter,这个不在本节范围

  5. EventEmitter.call(this);


  6. ...

TCP 服务事件

在开始代码之前,先了解下其相关事件,参考官网 http://nodejs.cn/api/net.html,这里也不会把所有的都介绍,下面介绍一些常用的,并且通过代码示例,进行讲解,可以在这个基础之上在去参考官网,实践一些其它的事件或方法。

TCP 服务器事件

  • listening: ,也就是 server.listen();

  • connection: 新链接建立时触发,也就是每次收到客户端回调,参数 socket 为 net.createServer 实例,也可以写在 net.createServer(function(socket) {}) 方法里

  • close:当 server 关闭的时候触发(server.close())。如果有连接存在,直到所有的连接结束才会触发这个事件

  • error:捕获错误,例如监听一个已经存在的端口就会报 Error: listen EADDRINUSE 错误

TCP 链接事件方法

  • data: 一端调用 write() 方法发送数据时,另一端会通过 socket.on('data') 事件接收到,可以理解为读取数据

  • end: 每次 socket 链接会出现一次,例如客户端发送消息之后执行 Ctrl + C 终端,就会收到

  • error: 监听 socket 的错误信息

  • write:write 是一个方法(socket.write())上面的 data 事件是读数据,write 方法在这里就为写数据到另一端,

TCP 服务端代码实现

  1. const net = require('net');

  2. const HOST = '127.0.0.1';

  3. const PORT = 3000;


  4. // 创建一个 TCP 服务实例

  5. const server = net.createServer();


  6. // 监听端口

  7. server.listen(PORT, HOST);


  8. server.on('listening', () => {

  9. console.log(`服务已开启在 ${HOST}:${PORT}`);

  10. });


  11. server.on('connection', socket => {

  12. // data 事件就是读取数据

  13. socket.on('data', buffer => {

  14. const msg = buffer.toString();

  15. console.log(msg);


  16. // write 方法写入数据,发回给客户端

  17. socket.write(Buffer.from('你好 ' + msg));

  18. });

  19. })


  20. server.on('close', () => {

  21. console.log('Server Close!');

  22. });


  23. server.on('error', err => {

  24. if (err.code === 'EADDRINUSE') {

  25. console.log('地址正被使用,重试中...');


  26. setTimeout(() => {

  27. server.close();

  28. server.listen(PORT, HOST);

  29. }, 1000);

  30. } else {

  31. console.error('服务器异常:', err);

  32. }

  33. });

TCP 客户端代码实现

  1. const net = require('net');

  2. const client = net.createConnection({

  3. host: '127.0.0.1',

  4. port: 3000

  5. });


  6. client.on('connect', () => {

  7. // 向服务器发送数据

  8. client.write('Nodejs 技术栈');


  9. setTimeout(() => {

  10. client.write('JavaScript ');

  11. client.write('TypeScript ');

  12. client.write('Python ');

  13. client.write('Java ');

  14. client.write('C ');

  15. client.write('PHP ');

  16. client.write('ASP.NET ');

  17. }, 1000);

  18. })


  19. client.on('data', buffer => {

  20. console.log(buffer.toString());

  21. });


  22. // 例如监听一个未开启的端口就会报 ECONNREFUSED 错误

  23. client.on('error', err => {

  24. console.error('服务器异常:', err);

  25. });


  26. client.on('close', err => {

  27. console.log('客户端链接断开!', err);

  28. });

源码实现地址

  1. https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-1-client-server

客户端与服务端 Demo 测试

首先启动服务端,之后在启动客户端,客户端调用三次,打印结果如下所示:

服务端

  1. $ node server.js

  2. 服务已开启在 127.0.0.1:3000

  3. # 第一次

  4. Nodejs 技术栈

  5. JavaScript

  6. TypeScript Python Java C PHP ASP.NET

  7. # 第二次

  8. Nodejs 技术栈

  9. JavaScript TypeScript Python Java C PHP ASP.NET

客户端

  1. $ node client.js

  2. # 第一次

  3. 你好 Nodejs 技术栈

  4. 你好 JavaScript

  5. 你好 TypeScript Python Java C PHP ASP.NET


  6. # 第二次

  7. 你好 Nodejs 技术栈

  8. 你好 JavaScript TypeScript Python Java C PHP ASP.NET

在客户端我使用 client.write() 发送了多次数据,但是只有 setTimeout 之外的是正常的,setTimeout 里面连续发送的似乎并不是每一次一返回,而是会随机合并返回了,为什么呢?且看下面 TCP 的粘包问题介绍。

TCP 粘包问题

Interview2: TCP 粘包是什么?该怎么解决?

上面的例子最后抛出了一个问题,为什么客户端连续向服务端发送数据,会收到合并返回呢?这也是在 TCP 中常见的粘包问题,客户端(发送的一端)在发送之前会将短时间有多个发送的数据块缓冲到一起(发送端缓冲区),形成一个大的数据块一并发送,同样接收端也有一个接收端缓冲区收到的数据先存放接收端缓冲区,然后程序从这里读取部分数据进行消费,这样做也是为了减少 I/O 消耗达到性能优化。

问题思考:数据到达缓冲区什么时间开始发送?

这个取决于 TCP 拥塞控制,是任何时刻内确定能被发送出去的字节数的控制因素之一,是阻止发送方至接收方之间的链路变得拥塞的手段,参考维基百科:https://zh.wikipedia.org/wiki/TCP拥塞控制

TCP 粘包解决方案?

  • 方案一:延迟发送

  • 方案二:关闭 Nagle 算法

  • 方案三:封包/拆包

方案一:延迟发送

一种最简单的方案是设置延迟发送,sleep 休眠一段时间的方式,但是这个方案虽然简单,同时缺点也显而易见,传输效率大大降低,对于交互频繁的场景显然是不适用的,第一次改造如下:

  1. client.on('connect', () => {

  2. client.setNoDelay(true);

  3. // 向服务器发送数据

  4. client.write('Nodejs 技术栈');


  5. const arr = [

  6. 'JavaScript ',

  7. 'TypeScript ',

  8. 'Python ',

  9. 'Java ',

  10. 'C ',

  11. 'PHP ',

  12. 'ASP.NET '

  13. ]


  14. for (let i=0; i<arr.length; i++) {

  15. (function(val, k){

  16. setTimeout(() => {

  17. client.write(val);

  18. }, 1000 * (k+1))

  19. }(arr[i], i));

  20. }

  21. })

控制台执行 node client.js 命令,似乎一切 ok 了没有在出现粘包的情况,但是这种情况仅使用于交互频率很低的场景。

  1. $ node client.js

  2. 你好 Nodejs 技术栈

  3. 你好 JavaScript

  4. 你好 TypeScript

  5. 你好 Python

  6. 你好 Java

  7. 你好 C

  8. 你好 PHP

  9. 你好 ASP.NET

源码实现地址

  1. https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-2-delay

方案二:Nagle 算法

Nagle 算法是一种改善网络传输效率的算法,避免网络中充斥着大量小的数据块,它所期望的是尽可能发送大的数据块,因此在每次请求一个数据块给 TCP 发送时,TCP 并不会立即执行发送,而是等待一小段时间进行发送。

当网络中充斥着大量小数据块时,Nagle 算法能将小的数据块集合起来一起发送减少了网络拥堵,这个还是很有帮助的,但也并不是所有场景都需要这样,例如,REPL 终端交互,当用户输入单个字符以获取响应,所以在 Node.js 中可以设置 socket.setNoDelay() 方法来关闭 Nagle 算法。

  1. const server = net.createServer();


  2. server.on('connection', socket => {

  3. socket.setNoDelay(true);

  4. })

关闭 Nagle 算法并不总是有效的,因为其是在服务端完成合并,TCP 接收到数据会先存放于自己的缓冲区中,然后通知应用接收,应用层因为网络或其它的原因若不能及时从 TCP 缓冲区中取出数据,也会造成 TCP 缓冲区中存放多段数据块,就又会形成粘包。

方案三:封包/拆包

前面两种方案都不是特别理想的,这里介绍第三种封包/拆包,也是目前业界用的比较多的,这里使用长度编码的方式,通信双方约定好格式,将消息分为定长的消息头(Header)和不定长的消息体(Body),在解析时读取消息头获取到内容占用的长度,之后读取到的消息体内容字节数等于字节头的字节数时,我们认为它是一个完整的包。

消息头序号 (Header)消息体长度 (Header)消息体 (Body)
SerialNumberbodyLengthbody
2(字节)2(字节)N(字节)

预先知识 Buffer

下面会通过编码实现,但是在开始之前希望你能了解一下 Buffer,可参考我之前写的 Buffer 文章 Node.js 中的缓冲区(Buffer)究竟是什么?,下面我列出本次需要用到的 Buffer 做下说明,对于不了解 Buffer 的同学是有帮助的。

  • Buffer.alloc(size[, fill[, encoding]]):初始化一个 size 大小的 Buffer 空间,默认填充 0,也可以指定 fill 进行自定义填充

  • buf.writeInt16BE(value[, offset]):value 为要写入的 Buffer 值,offset 为偏移量从哪个位置开始写入

  • buf.writeInt32BE(value[, offset]):参数同 writeInt16BE,不同的是 writeInt16BE 表示高位优先写入一个 16 位整型,而 writeInt32BE 表示高位优先写入一个 32 位整型

  • buf.readInt16BE([offset]):高位优先读取 16 位整型,offset 为读取之前要跳过的字节数

  • buf.readInt32BE([offset]):高位优先读取 32 位整型,offset 为读取之前要跳过的字节数

编码/解码实现

TCP 底层传输是基于二进制数据,但是我们应用层通常是易于表达的字符串、数字等,这里第一步在编码的实现中,就需要先将我们的数据通过 Buffer 转为二进制数据,取出的时候同样也需要解码操作,一切尽在代码里,实现如下:

  1. // transcoder.js


  2. class Transcoder {

  3. constructor () {

  4. this.packageHeaderLen = 4; // 包头长度

  5. this.serialNumber = 0; // 定义包序号

  6. this.packageSerialNumberLen = 2; // 包序列号所占用的字节

  7. }


  8. /**

  9. * 编码

  10. * @param { Object } data Buffer 对象数据

  11. * @param { Int } serialNumber 包序号,客户端编码时自动生成,服务端解码之后在编码时需要传入解码的包序列号

  12. */

  13. encode(data, serialNumber) {

  14. const body = Buffer.from(data);


  15. const header = Buffer.alloc(this.packageHeaderLen);

  16. header.writeInt16BE(serialNumber || this.serialNumber);

  17. header.writeInt16BE(body.length, this.packageSerialNumberLen); // 跳过包序列号的前两位


  18. if (serialNumber === undefined) {

  19. this.serialNumber++;

  20. }


  21. return Buffer.concat([header, body]);

  22. }


  23. /**

  24. * 解码

  25. * @param { Object } buffer

  26. */

  27. decode(buffer) {

  28. const header = buffer.slice(0, this.packageHeaderLen); // 获取包头

  29. const body = buffer.slice(this.packageHeaderLen); // 获取包尾部


  30. return {

  31. serialNumber: header.readInt16BE(),

  32. bodyLength: header.readInt16BE(this.packageSerialNumberLen), // 因为编码阶段写入时跳过了前两位,解码同样也要跳过

  33. body: body.toString(),

  34. }

  35. }


  36. /**

  37. * 获取包长度两种情况:

  38. * 1. 如果当前 buffer 长度数据小于包头,肯定不是一个完整的数据包,因此直接返回 0 不做处理(可能数据还未接收完等等)

  39. * 2. 否则返回这个完整的数据包长度

  40. * @param {*} buffer

  41. */

  42. getPackageLength(buffer) {

  43. if (buffer.length < this.packageHeaderLen) {

  44. return 0;

  45. }


  46. return this.packageHeaderLen + buffer.readInt16BE(this.packageSerialNumberLen);

  47. }

  48. }


  49. module.exports = Transcoder;

客户端改造

  1. const net = require('net');

  2. const Transcoder = require('./transcoder');

  3. const transcoder = new Transcoder();

  4. const client = net.createConnection({

  5. host: '127.0.0.1',

  6. port: 3000

  7. });


  8. let overageBuffer=null; // 上一次 Buffer 剩余数据


  9. client.on('data', buffer => {

  10. if (overageBuffer) {

  11. buffer = Buffer.concat([overageBuffer, buffer]);

  12. }


  13. let packageLength = 0;


  14. while (packageLength = transcoder.getPackageLength(buffer)) {

  15. const package = buffer.slice(0, packageLength); // 取出整个数据包

  16. buffer = buffer.slice(packageLength); // 删除已经取出的数据包,这里采用的方法是把缓冲区(buffer)已取出的包给截取掉


  17. const result = transcoder.decode(package); // 解码

  18. console.log(result);

  19. }


  20. overageBuffer=buffer; // 记录剩余不完整的包

  21. }).on('error', err => { // 例如监听一个未开启的端口就会报 ECONNREFUSED 错误

  22. console.error('服务器异常:', err);

  23. }).on('close', err => {

  24. console.log('客户端链接断开!', err);

  25. });


  26. client.write(transcoder.encode('0 Nodejs 技术栈'));


  27. const arr = [

  28. '1 JavaScript ',

  29. '2 TypeScript ',

  30. '3 Python ',

  31. '4 Java ',

  32. '5 C ',

  33. '6 PHP ',

  34. '7 ASP.NET '

  35. ]


  36. setTimeout(function() {

  37. for (let i=0; i<arr.length; i++) {

  38. console.log(arr[i]);


  39. client.write(transcoder.encode(arr[i]));

  40. }

  41. }, 1000);

服务端改造

  1. const net = require('net');

  2. const Transcoder = require('./transcoder');

  3. const transcoder = new Transcoder();

  4. const HOST = '127.0.0.1';

  5. const PORT = 3000;

  6. let overageBuffer=null; // 上一次 Buffer 剩余数据


  7. const server = net.createServer();


  8. server.listen(PORT, HOST);


  9. server.on('listening', () => {

  10. console.log(`服务已开启在 ${HOST}:${PORT}`);

  11. }).on('connection', socket => {

  12. // data 事件就是读取数据

  13. socket

  14. .on('data', buffer => {

  15. if (overageBuffer) {

  16. buffer = Buffer.concat([overageBuffer, buffer]);

  17. }


  18. let packageLength = 0;


  19. while (packageLength = transcoder.getPackageLength(buffer)) {

  20. const package = buffer.slice(0, packageLength); // 取出整个数据包

  21. buffer = buffer.slice(packageLength); // 删除已经取出的数据包,这里采用的方法是把缓冲区(buffer)已取出的包给截取掉


  22. const result = transcoder.decode(package); // 解码

  23. console.log(result);

  24. socket.write(transcoder.encode(result.body, result.serialNumber));

  25. }


  26. overageBuffer=buffer; // 记录剩余不完整的包

  27. })

  28. .on('end', function(){

  29. console.log('socket end')

  30. })

  31. .on('error',function(error){

  32. console.log('socket error', error);

  33. });

  34. }).on('close', () => {

  35. console.log('Server Close!');

  36. }).on('error', err => {

  37. if (err.code === 'EADDRINUSE') {

  38. console.log('地址正被使用,重试中...');


  39. setTimeout(() => {

  40. server.close();

  41. server.listen(PORT, HOST);

  42. }, 1000);

  43. } else {

  44. console.error('服务器异常:', err);

  45. }

  46. });

运行测试

控制台执行 node server.js 开启服务端,之后执行 node client.js 开启客户端测试,输出结果如下所示:

  1. $ node client.js

  2. { serialNumber: 0, bodyLength: 18, body: '0 Nodejs 技术栈' }

  3. 1 JavaScript

  4. 2 TypeScript

  5. 3 Python

  6. 4 Java

  7. 5 C

  8. 6 PHP

  9. 7 ASP.NET

  10. { serialNumber: 1, bodyLength: 13, body: '1 JavaScript ' }

  11. { serialNumber: 2, bodyLength: 13, body: '2 TypeScript ' }

  12. { serialNumber: 3, bodyLength: 9, body: '3 Python ' }

  13. { serialNumber: 4, bodyLength: 7, body: '4 Java ' }

  14. { serialNumber: 5, bodyLength: 4, body: '5 C ' }

  15. { serialNumber: 6, bodyLength: 6, body: '6 PHP ' }

  16. { serialNumber: 7, bodyLength: 10, body: '7 ASP.NET ' }

以上结果中,setTimeout 函数里我们同一时间先发送多条数据,之后一一返回,同时打印了包消息头定义的包序列号、消息体长度和包消息体,且是一一对应的,上面提的粘包问题也得到了解决。封包/拆包这块是有点复杂的,以上代码也已经尽可能简单的介绍了实现思路,下面给出实现代码地址,可以做为参照自己也可以使用不同的方式去实现

  1. https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-3-package


往期精彩回顾
消息中间件 RabbitMQ 入门篇
重磅 | OpenJS 基金会推出 Node.js 专业认证考试
分享 10 道 Nodejs EventLoop 和事件相关面试题
Docker 容器环境下 Node.js 应用程序的优雅退出
Node.js 服务 Docker 容器化应用实践
Node.js 是什么?我为什么选择它?
分享 10 道 Nodejs 进程相关面试题
Node.js进阶之进程与线程
Node.js 中的缓冲区(Buffer)究竟是什么?
Node.js 内存管理和 V8 垃圾回收机制
浅谈 Node.js 模块机制及常见面试问题解答
蚂蚁金服 Node.js 开荒史 - 摸爬滚打才不负功名尘土


在看点这里

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存