支撑百万并发的“零拷贝”技术,你了解吗?
零拷贝(Zero-copy)技术指在计算机执行操作时,CPU 不需要先将数据从一个内存区域复制到另一个内存区域,从而可以减少上下文切换以及 CPU 的拷贝时间。
图片来自 Pexels
它的作用是在数据报从网络设备到用户程序空间传递的过程中,减少数据拷贝次数,减少系统调用,实现 CPU 的零参与,彻底消除 CPU 在这方面的负载。
实现零拷贝用到的最主要技术是 DMA 数据传输技术和内存区域映射技术:
零拷贝机制可以减少数据在内核缓冲区和用户进程缓冲区之间反复的 I/O 拷贝操作。
零拷贝机制可以减少用户进程地址空间和内核地址空间之间因为上下文切换而带来的 CPU 开销。
物理内存和虚拟内存
物理内存
虚拟内存
下面给出两个进程 A、B 各自的虚拟内存空间以及对应的物理内存之间的地址映射示意图:
用户进程向操作系统发出内存申请请求。
系统会检查进程的虚拟地址空间是否被用完,如果有剩余,给进程分配虚拟地址。
系统为这块虚拟地址创建内存映射(Memory Mapping),并将它放进该进程的页表(Page Table)。
系统返回虚拟地址给用户进程,用户进程开始访问该虚拟地址。
CPU 根据虚拟地址在此进程的页表(Page Table)中找到了相应的内存映射(Memory Mapping),但是这个内存映射(Memory Mapping)没有和物理内存关联,于是产生缺页中断。
操作系统收到缺页中断后,分配真正的物理内存并将它关联到页表相应的内存映射(Memory Mapping)。中断处理完成后,CPU 就可以访问内存了
当然缺页中断不是每次都会发生,只有系统觉得有必要延迟分配内存的时候才用的着,也即很多时候在上面的第 3 步系统会分配真正的物理内存并和内存映射(Memory Mapping)进行关联。
地址空间:提供更大的地址空间,并且地址空间是连续的,使得程序编写、链接更加简单。
进程隔离:不同进程的虚拟地址之间没有关系,所以一个进程的操作不会对其他进程造成影响。
数据保护:每块虚拟内存都有相应的读写属性,这样就能保护程序的代码段不被修改,数据块不能被执行等,增加了系统的安全性。
内存映射:有了虚拟内存之后,可以直接映射磁盘上的文件(可执行文件或动态库)到虚拟地址空间。
这样可以做到物理内存延时分配,只有在需要读相应的文件的时候,才将它真正的从磁盘上加载到内存中来,而在内存吃紧的时候又可以将这部分内存清空掉,提高物理内存利用效率,并且所有这些对应用程序都是透明的。
共享内存:比如动态库只需要在内存中存储一份,然后将它映射到不同进程的虚拟地址空间中,让进程觉得自己独占了这个文件。
进程间的内存共享也可以通过映射同一块物理内存到进程的不同虚拟地址空间来实现共享。
物理内存管理:物理地址空间全部由操作系统管理,进程无法直接分配和回收,从而系统可以更好的利用内存,平衡进程间对内存的需求。
内核空间和用户空间
下图是一个进程的用户空间和内核空间的内存布局:
内核空间
进程私有的虚拟内存:每个进程都有单独的内核栈、页表、task 结构以及 mem_map 结构等。
进程共享的虚拟内存:属于所有进程共享的内存区域,包括物理存储器、内核数据和内核代码区域。
用户空间
运行时栈:由编译器自动释放,存放函数的参数值,局部变量和方法返回值等。每当一个函数被调用时,该函数的返回类型和一些调用的信息被存储到栈顶,调用结束后调用信息会被弹出并释放掉内存。
栈区是从高地址位向低地址位增长的,是一块连续的内在区域,最大容量是由系统预先定义好的,申请的栈空间超过这个界限时会提示溢出,用户能从栈中获取的空间较小。
运行时堆:用于存放进程运行中被动态分配的内存段,位于 BSS 和栈中间的地址位。由卡发人员申请分配(malloc)和释放(free)。堆是从低地址位向高地址位增长,采用链式存储结构。
频繁地 malloc/free 造成内存空间的不连续,产生大量碎片。当申请堆空间时,库函数按照一定的算法搜索可用的足够大的空间。因此堆的效率比栈要低的多。
代码段:存放 CPU 可以执行的机器指令,该部分内存只能读不能写。通常代码区是共享的,即其他执行程序可调用它。假如机器中有数个进程运行相同的一个程序,那么它们就可以使用同一个代码段。
未初始化的数据段:存放未初始化的全局变量,BSS 的数据在程序开始执行之前被初始化为 0 或 NULL。
已初始化的数据段:存放已初始化的全局变量,包括静态全局变量、静态局部变量以及常量。
内存映射区域:例如将动态库,共享内存等虚拟空间的内存映射到物理空间的内存,一般是 mmap 函数所分配的虚拟内存空间。
Linux 的内部层级结构
内核态可以执行任意命令,调用系统的一切资源,而用户态只能执行简单的运算,不能直接调用系统资源。用户态必须通过系统接口(System Call),才能向内核发出指令。
内核空间可以访问所有的 CPU 指令和所有的内存空间、I/O 空间和硬件设备。
用户空间只能访问受限的资源,如果需要特殊权限,可以通过系统调用获取相应的资源。
用户空间允许页面中断,而内核空间则不允许。
内核空间和用户空间是针对线性地址空间的。
x86 CPU 中用户空间是 0-3G 的地址范围,内核空间是 3G-4G 的地址范围。
x86_64 CPU 用户空间地址范围为0x0000000000000000–0x00007fffffffffff,内核地址空间为 0xffff880000000000-最大地址。
所有内核进程(线程)共用一个地址空间,而用户进程都有各自的地址空间。
有了用户空间和内核空间的划分后,Linux 内部层级结构可以分为三部分,从最底层到最上层依次是硬件、内核空间和用户空间,如下图所示:
Linux I/O 读写方式
I/O 中断原理
在 DMA 技术出现之前,应用程序与磁盘之间的 I/O 操作都是通过 CPU 的中断完成的。
用户进程向 CPU 发起 read 系统调用读取数据,由用户态切换为内核态,然后一直阻塞等待数据的返回。
CPU 在接收到指令以后对磁盘发起 I/O 请求,将磁盘数据先放入磁盘控制器缓冲区。
数据准备完成以后,磁盘向 CPU 发起 I/O 中断。
CPU 收到 I/O 中断以后将磁盘缓冲区中的数据拷贝到内核缓冲区,然后再从内核缓冲区拷贝到用户缓冲区。
用户进程由内核态切换回用户态,解除阻塞状态,然后等待 CPU 的下一个执行时间钟。
DMA 传输原理
目前大多数的硬件设备,包括磁盘控制器、网卡、显卡以及声卡等都支持 DMA 技术。
这样在大部分时间里,CPU 计算和 I/O 操作都处于并行操作,使整个计算机系统的效率大大提高。
用户进程向 CPU 发起 read 系统调用读取数据,由用户态切换为内核态,然后一直阻塞等待数据的返回。
CPU 在接收到指令以后对 DMA 磁盘控制器发起调度指令。
DMA 磁盘控制器对磁盘发起 I/O 请求,将磁盘数据先放入磁盘控制器缓冲区,CPU 全程不参与此过程。
数据读取完成后,DMA 磁盘控制器会接受到磁盘的通知,将数据从磁盘控制器缓冲区拷贝到内核缓冲区。
DMA 磁盘控制器向 CPU 发出数据读完的信号,由 CPU 负责将数据从内核缓冲区拷贝到用户缓冲区。
用户进程由内核态切换回用户态,解除阻塞状态,然后等待 CPU 的下一个执行时间钟。
传统 I/O 方式
伪代码如下:
read(file_fd, tmp_buf, len);
write(socket_fd, tmp_buf, len);
下图分别对应传统 I/O 操作的数据读写流程,整个过程涉及 2 次 CPU 拷贝、2 次 DMA 拷贝,总共 4 次拷贝,以及 4 次上下文切换。
上下文切换:当用户程序向内核发起系统调用时,CPU 将用户进程从用户态切换到内核态;当系统调用返回时,CPU 将用户进程从内核态切换回用户态。
CPU 拷贝:由 CPU 直接处理数据的传送,数据拷贝时会一直占用 CPU 的资源。
DMA 拷贝:由 CPU 向DMA磁盘控制器下达指令,让 DMA 控制器来处理数据的传送,数据传送完毕再把信息反馈给 CPU,从而减轻了 CPU 资源的占有率。
传统读操作
如果数据不存在,则先将数据从磁盘加载数据到内核空间的读缓存(read buffer)中,再从读缓存拷贝到用户进程的页内存中。
read(file_fd, tmp_buf, len);
用户进程通过 read() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
CPU 将读缓冲区(read buffer)中的数据拷贝到用户空间(user space)的用户缓冲区(user buffer)。
上下文从内核态(kernel space)切换回用户态(user space),read 调用执行返回。
传统写操作
当应用程序准备好数据,执行 write 系统调用发送网络数据时,先将数据从用户空间的页缓存拷贝到内核空间的网络缓冲区(socket buffer)中,然后再将写缓存中的数据拷贝到网卡设备完成数据发送。
write(socket_fd, tmp_buf, len);
用户进程通过 write() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 将用户缓冲区(user buffer)中的数据拷贝到内核空间(kernel space)的网络缓冲区(socket buffer)。
CPU 利用 DMA 控制器将数据从网络缓冲区(socket buffer)拷贝到网卡进行数据传输。
上下文从内核态(kernel space)切换回用户态(user space),write 系统调用执行返回。
零拷贝方式
用户态直接 I/O:应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。
这种方式依旧存在用户空间和内核空间的上下文切换,硬件上的数据直接拷贝至了用户空间,不经过内核空间。因此,直接 I/O 不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。
减少数据拷贝次数:在数据传输过程中,避免数据在用户空间缓冲区和系统内核空间缓冲区之间的 CPU 拷贝,以及数据在系统内核空间内的 CPU 拷贝,这也是当前主流零拷贝技术的实现思路。
写时复制技术:写时复制指的是当多个进程共享同一块数据时,如果其中一个进程需要对这份数据进行修改,那么将其拷贝到自己的进程地址空间中,如果只是数据读取操作则不需要进行拷贝操作。
用户态直接 I/O
数据直接跨过内核进行传输,内核在数据传输过程除了进行必要的虚拟存储配置工作之外,不参与任何其他工作,这种方式能够直接绕过内核,极大提高了性能。
mmap+write
mmap 是 Linux 提供的一种内存映射文件方法,即将一个进程的地址空间中的一段虚拟地址映射到磁盘文件地址,mmap+write 的伪代码如下:
tmp_buf = mmap(file_fd, len);
write(socket_fd, tmp_buf, len);
然而内核读缓冲区(read buffer)仍需将数据拷贝到内核写缓冲区(socket buffer),大致的流程如下图所示:
用户进程通过 mmap() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
将用户进程的内核空间的读缓冲区(read buffer)与用户空间的缓存区(user buffer)进行内存地址映射。
CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
上下文从内核态(kernel space)切换回用户态(user space),mmap 系统调用执行返回。
用户进程通过 write() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 将读缓冲区(read buffer)中的数据拷贝到网络缓冲区(socket buffer)。
CPU 利用 DMA 控制器将数据从网络缓冲区(socket buffer)拷贝到网卡进行数据传输。
上下文从内核态(kernel space)切换回用户态(user space),write 系统调用执行返回。
Sendfile
Sendfile 系统调用的引入,不仅减少了 CPU 拷贝的次数,还减少了上下文切换的次数,它的伪代码如下:
sendfile(socket_fd, file_fd, len);
与 mmap 内存映射方式不同的是, Sendfile 调用中 I/O 数据对用户空间是完全不可见的。也就是说,这是一次完全意义上的数据传输过程。
用户进程通过 sendfile() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
CPU 将读缓冲区(read buffer)中的数据拷贝到的网络缓冲区(socket buffer)。
CPU 利用 DMA 控制器将数据从网络缓冲区(socket buffer)拷贝到网卡进行数据传输。
上下文从内核态(kernel space)切换回用户态(user space),Sendfile 系统调用执行返回。
Sendfile+DMA gather copy
这样就省去了内核空间中仅剩的 1 次 CPU 拷贝操作,Sendfile 的伪代码如下:
sendfile(socket_fd, file_fd, len);
这样 DMA 引擎直接利用 gather 操作将页缓存中数据打包发送到网络中即可,本质就是和虚拟内存映射的思路类似。
用户进程通过 sendfile() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
CPU 把读缓冲区(read buffer)的文件描述符(file descriptor)和数据长度拷贝到网络缓冲区(socket buffer)。
基于已拷贝的文件描述符(file descriptor)和数据长度,CPU 利用 DMA 控制器的 gather/scatter 操作直接批量地将数据从内核的读缓冲区(read buffer)拷贝到网卡进行数据传输。
上下文从内核态(kernel space)切换回用户态(user space),Sendfile 系统调用执行返回。
Splice
Splice 的伪代码如下:
splice(fd_in, off_in, fd_out, off_out, len, flags);
Splice 系统调用可以在内核空间的读缓冲区(read buffer)和网络缓冲区(socket buffer)之间建立管道(pipeline),从而避免了两者之间的 CPU 拷贝操作。
用户进程通过 splice() 函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
CPU 在内核空间的读缓冲区(read buffer)和网络缓冲区(socket buffer)之间建立管道(pipeline)。
CPU 利用 DMA 控制器将数据从网络缓冲区(socket buffer)拷贝到网卡进行数据传输。
上下文从内核态(kernel space)切换回用户态(user space),Splice 系统调用执行返回。
写时复制
缓冲区共享
fbuf 的思想是每个进程都维护着一个缓冲区池,这个缓冲区池能被同时映射到用户空间(user space)和内核态(kernel space),内核和用户共享这个缓冲区池,这样就避免了一系列的拷贝操作。
Linux 零拷贝对比
下面从 CPU 拷贝次数、DMA 拷贝次数以及系统调用几个方面总结一下上述几种 I/O 拷贝方式的差别:
Java NIO 零拷贝实现
在 Java NIO 中的通道(Channel)就相当于操作系统的内核空间(kernel space)的缓冲区。
而缓冲区(Buffer)对应的相当于操作系统的用户空间(user space)中的用户缓冲区(user buffer):
通道(Channel)是全双工的(双向传输),它既可能是读缓冲区(read buffer),也可能是网络缓冲区(socket buffer)。
缓冲区(Buffer)分为堆内存(HeapBuffer)和堆外内存(DirectBuffer),这是通过 malloc() 分配出来的用户态内存。
MappedByteBuffer
抽象方法 map() 方法在 FileChannel 中的定义如下:
public abstract MappedByteBuffer map(MapMode mode, long position, long size)
throws IOException;
Mode:限定内存映射区域(MappedByteBuffer)对内存映像文件的访问模式,包括只可读(READ_ONLY)、可读可写(READ_WRITE)和写时拷贝(PRIVATE)三种模式。
Position:文件映射的起始地址,对应内存映射区域(MappedByteBuffer)的首地址。
Size:文件映射的字节长度,从 Position 往后的字节数,对应内存映射区域(MappedByteBuffer)的大小。
MappedByteBuffer 相比 ByteBuffer 新增了三个重要的方法:
fore():对于处于 READ_WRITE 模式下的缓冲区,把对缓冲区内容的修改强制刷新到本地文件。
load():将缓冲区的内容载入物理内存中,并返回这个缓冲区的引用。
isLoaded():如果缓冲区的内容在物理内存中,则返回 true,否则返回 false。
下面给出一个利用 MappedByteBuffer 对文件进行读写的使用示例:
private final static String CONTENT = "Zero copy implemented by MappedByteBuffer";
private final static String FILE_NAME = "/mmap.txt";
private final static String CHARSET = "UTF-8";
@Test
public void writeToFileByMappedByteBuffer() {
Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_WRITE, 0, bytes.length);
if (mappedByteBuffer != null) {
mappedByteBuffer.put(bytes);
mappedByteBuffer.force();
}
} catch (IOException e) {
e.printStackTrace();
}
}
读文件数据:打开文件通道 fileChannel 并提供只读权限,通过 fileChannel 映射到一个只可读的内存缓冲区 mappedByteBuffer,读取 mappedByteBuffer 中的字节数组即可得到文件数据。
@Test
public void readFromFileByMappedByteBuffer() {
Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
int length = CONTENT.getBytes(Charset.forName(CHARSET)).length;
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_ONLY, 0, length);
if (mappedByteBuffer != null) {
byte[] bytes = new byte[length];
mappedByteBuffer.get(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
assertEquals(content, "Zero copy implemented by MappedByteBuffer");
}
} catch (IOException e) {
e.printStackTrace();
}
}
下面介绍 map() 方法的底层实现原理。map() 方法是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现。
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
int pagePosition = (int)(position % allocationGranularity);
long mapPosition = position - pagePosition;
long mapSize = size + pagePosition;
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException y) {
Thread.currentThread().interrupt();
}
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError y) {
throw new IOException("Map failed", y);
}
}
int isize = (int)size;
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
} else {
return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
}
}
map() 方法通过本地方法 map0() 为文件分配一块虚拟内存,作为它的内存映射区域,然后返回这块内存映射区域的起始地址:
文件映射需要在 Java 堆中创建一个 MappedByteBuffer 的实例。如果第一次文件映射导致 OOM,则手动触发垃圾回收,休眠 100ms 后再尝试映射,如果失败则抛出异常。
通过 Util 的 newMappedByteBuffer(可读可写)方法或者 newMappedByteBufferR(仅读)方法反射创建一个 DirectByteBuffer 实例,其中 DirectByteBuffer 是 MappedByteBuffer 的子类。
这样一定程度上替代了 read() 或 write() 方法,底层直接采用 sun.misc.Unsafe 类的 getByte() 和 putByte() 方法对数据进行读写。
private native long map0(int prot, long position, long mapSize) throws IOException;
这个 native 函数(Java_sun_nio_ch_FileChannelImpl_map0)的实现位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c 这个源文件里面。
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
jint prot, jlong off, jlong len)
{
void *mapAddress = 0;
jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
jint fd = fdval(env, fdo);
int protections = 0;
int flags = 0;
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
if (mapAddress == MAP_FAILED) {
if (errno == ENOMEM) {
JNU_ThrowOutOfMemoryError(env, "Map failed");
return IOS_THROWN;
}
return handle(env, -1, "Map failed");
}
return ((jlong) (unsigned long) mapAddress);
}
void *mmap64(void *addr, size_t len, int prot, int flags, int fd, off64_t offset);
下面详细介绍一下 mmap64() 函数各个参数的含义以及参数可选值:
addr:文件在用户进程空间的内存映射区中的起始地址,是一个建议的参数,通常可设置为 0 或 NULL,此时由内核去决定真实的起始地址。
当 flags 为 MAP_FIXED 时,addr 就是一个必选的参数,即需要提供一个存在的地址。
len:文件需要进行内存映射的字节长度。
prot:控制用户进程对内存映射区的访问权限:
PROT_READ:读权限。
PROT_WRITE:写权限。
PROT_EXEC:执行权限。
PROT_NONE:无权限。
flags:控制内存映射区的修改是否被多个进程共享:
MAP_PRIVATE:对内存映射区数据的修改不会反映到真正的文件,数据修改发生时采用写时复制机制。
MAP_SHARED:对内存映射区的修改会同步到真正的文件,修改对共享此内存映射区的进程是可见的。
MAP_FIXED:不建议使用,这种模式下 addr 参数指定的必须提供一个存在的 addr 参数。
fd:文件描述符。每次 map 操作会导致文件的引用计数加 1,每次 unmap 操作或者结束进程会导致引用计数减 1。
offset:文件偏移量。进行映射的文件位置,从文件起始地址向后的位移量。
下面总结一下 MappedByteBuffer 的特点和不足之处:
MappedByteBuffer 使用是堆外的虚拟内存,因此分配(map)的内存大小不受 JVM 的 -Xmx 参数限制,但是也是有大小限制的。
如果当文件超出 Integer.MAX_VALUE 字节限制时,可以通过 position 参数重新 map 文件后面的内容。
MappedByteBuffer 在处理大文件时性能的确很高,但也存在内存占用、文件关闭不确定等问题,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
MappedByteBuffer 提供了文件映射内存的 mmap() 方法,也提供了释放映射内存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法,无法直接显示调用。
因此,用户程序需要通过 Java 反射的调用 sun.misc.Cleaner 类的 clean() 方法手动释放映射占用的内存区域。
public static void clean(final Object buffer) throws Exception {
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
try {
Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
getCleanerMethod.setAccessible(true);
Cleaner cleaner = (Cleaner) getCleanerMethod.invoke(buffer, new Object[0]);
cleaner.clean();
} catch(Exception e) {
e.printStackTrace();
}
});
}
DirectByteBuffer
DirectByteBuffer 的对象引用位于 Java 内存模型的堆里面,JVM 可以对 DirectByteBuffer 的对象进行内存分配和回收管理。
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
DirectByteBuffer 内部的字节缓冲区位在于堆外的(用户态)直接内存,它是通过 Unsafe 的本地方法 allocateMemory() 进行内存分配,底层调用的是操作系统的 malloc() 函数。
DirectByteBuffer(int cap) {
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
private static class Deallocator implements Runnable {
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
return;
}
unsafe.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
由于使用 DirectByteBuffer 分配的是系统本地的内存,不在 JVM 的管控范围之内,因此直接内存的回收和堆内存的回收不同,直接内存如果使用不当,很容易造成 OutOfMemoryError。
说了这么多,那么 DirectByteBuffer 和零拷贝有什么关系?前面有提到在 MappedByteBuffer 进行内存映射时,它的 map() 方法会通过 Util.newMappedByteBuffer() 来创建一个缓冲区实例。
static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd,
Runnable unmapper) {
MappedByteBuffer dbb;
if (directByteBufferConstructor == null)
initDBBConstructor();
try {
dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
new Object[] { new Integer(size), new Long(addr), fd, unmapper });
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new InternalError(e);
}
return dbb;
}
private static void initDBBRConstructor() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
try {
Class<?> cl = Class.forName("java.nio.DirectByteBufferR");
Constructor<?> ctor = cl.getDeclaredConstructor(
new Class<?>[] { int.class, long.class, FileDescriptor.class,
Runnable.class });
ctor.setAccessible(true);
directByteBufferRConstructor = ctor;
} catch (ClassNotFoundException | NoSuchMethodException |
IllegalArgumentException | ClassCastException x) {
throw new InternalError(x);
}
return null;
}});
}
DirectByteBuffer 是 MappedByteBuffer 的具体实现类。
protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) {
super(-1, 0, cap, cap, fd);
address = addr;
cleaner = Cleaner.create(this, unmapper);
att = null;
}
因此,除了允许分配操作系统的直接内存以外,DirectByteBuffer 本身也具有文件内存映射的功能,这里不做过多说明。
我们需要关注的是,DirectByteBuffer 在 MappedByteBuffer 的基础上提供了内存映像文件的随机读取 get() 和写入 write() 的操作。
public byte get() {
return ((unsafe.getByte(ix(nextGetIndex()))));
}
public byte get(int i) {
return ((unsafe.getByte(ix(checkIndex(i)))));
}
public ByteBuffer put(byte x) {
unsafe.putByte(ix(nextPutIndex()), ((x)));
return this;
}
public ByteBuffer put(int i, byte x) {
unsafe.putByte(ix(checkIndex(i)), ((x)));
return this;
}
private long ix(int i) {
return address + ((long)i << 0);
}
FileChannel
FileChannel 是一个用于文件读写、映射和操作的通道,同时它在并发环境下是线程安全的。
基于 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 getChannel() 方法可以创建并打开一个文件通道。
FileChannel 定义了 transferFrom() 和 transferTo() 两个抽象方法,它通过在通道和通道之间建立连接实现数据传输的。
transferTo():通过 FileChannel 把文件里面的源数据写入一个 WritableByteChannel 的目的通道。
public abstract long transferTo(long position, long count, WritableByteChannel target)
throws IOException;
public abstract long transferFrom(ReadableByteChannel src, long position, long count)
throws IOException;
private static final String CONTENT = "Zero copy implemented by FileChannel";
private static final String SOURCE_FILE = "/source.txt";
private static final String TARGET_FILE = "/target.txt";
private static final String CHARSET = "UTF-8";
@Before
public void setup() {
Path source = Paths.get(getClassPath(SOURCE_FILE));
byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
try (FileChannel fromChannel = FileChannel.open(source, StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
fromChannel.write(ByteBuffer.wrap(bytes));
} catch (IOException e) {
e.printStackTrace();
}
}
对于 transferTo() 方法而言,目的通道 toChannel 可以是任意的单向字节写通道 WritableByteChannel;而对于 transferFrom() 方法而言,源通道 fromChannel 可以是任意的单向字节读通道 ReadableByteChannel。
其中,FileChannel、SocketChannel 和 DatagramChannel 等通道实现了 WritableByteChannel 和 ReadableByteChannel 接口,都是同时支持读写的双向通道。
为了方便测试,下面给出基于 FileChannel 完成 channel-to-channel 的数据传输示例。
public void transferTo() throws Exception {
try (FileChannel fromChannel = new RandomAccessFile(
getClassPath(SOURCE_FILE), "rw").getChannel();
FileChannel toChannel = new RandomAccessFile(
getClassPath(TARGET_FILE), "rw").getChannel()) {
long position = 0L;
long offset = fromChannel.size();
fromChannel.transferTo(position, offset, toChannel);
}
}
public void transferFrom() throws Exception {
try (FileChannel fromChannel = new RandomAccessFile(
getClassPath(SOURCE_FILE), "rw").getChannel();
FileChannel toChannel = new RandomAccessFile(
getClassPath(TARGET_FILE), "rw").getChannel()) {
long position = 0L;
long offset = fromChannel.size();
toChannel.transferFrom(fromChannel, position, offset);
}
}
下面介绍 transferTo() 和 transferFrom() 方法的底层实现原理,这两个方法也是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现。
private static volatile boolean transferSupported = true;
private static volatile boolean pipeSupported = true;
private static volatile boolean fileSupported = true;
transferSupported:用于标记当前的系统内核是否支持 sendfile() 调用,默认为 true。
pipeSupported:用于标记当前的系统内核是否支持文件描述符(fd)基于管道(pipe)的 sendfile() 调用,默认为 true。
fileSupported:用于标记当前的系统内核是否支持文件描述符(fd)基于文件(file)的 sendfile() 调用,默认为 true。
下面以 transferTo() 的源码实现为例。FileChannelImpl 首先执行 transferToDirectly() 方法,以 Sendfile 的零拷贝方式尝试数据拷贝。
如果系统内核不支持 Sendfile,进一步执行 transferToTrustedChannel() 方法,以 mmap 的零拷贝方式进行内存映射,这种情况下目的通道必须是 FileChannelImpl 或者 SelChImpl 类型。
public long transferTo(long position, long count, WritableByteChannel target)
throws IOException {
// 计算文件的大小
long sz = size();
// 校验起始位置
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
// 校验偏移量
if ((sz - position) < icount)
icount = (int)(sz - position);
long n;
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
return transferToArbitraryChannel(position, icount, target);
}
接下来重点分析一下 transferToDirectly() 方法的实现,也就是 transferTo() 通过 Sendfile 实现零拷贝的精髓所在。
private long transferToDirectly(long position, int icount, WritableByteChannel target)
throws IOException {
// 省略从target获取targetFD的过程
if (nd.transferToDirectlyNeedsPositionLock()) {
synchronized (positionLock) {
long pos = position();
try {
return transferToDirectlyInternal(position, icount,
target, targetFD);
} finally {
position(pos);
}
}
} else {
return transferToDirectlyInternal(position, icount, target, targetFD);
}
}
最终由 transferToDirectlyInternal() 调用本地方法 transferTo0() ,尝试以 Sendfile 的方式进行数据传输。
如果系统内核完全不支持 Sendfile,比如 Windows 操作系统,则返回 UNSUPPORTED 并把 transferSupported 标识为 false。
private long transferToDirectlyInternal(long position, int icount,
WritableByteChannel target,
FileDescriptor targetFD) throws IOException {
assert !nd.transferToDirectlyNeedsPositionLock() ||
Thread.holdsLock(positionLock);
long n = -1;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = transferTo0(fd, position, icount, targetFD);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
pipeSupported = false;
if (target instanceof FileChannelImpl)
fileSupported = false;
return IOStatus.UNSUPPORTED_CASE;
}
if (n == IOStatus.UNSUPPORTED) {
transferSupported = false;
return IOStatus.UNSUPPORTED;
}
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end (n > -1);
}
}
本地方法(native method)transferTo0() 通过 JNI(Java Native Interface)调用底层 C 的函数。
这个 native 函数(Java_sun_nio_ch_FileChannelImpl_transferTo0)同样位于 JDK 源码包下的 native/sun/nio/ch/FileChannelImpl.c 源文件里面。
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
jobject srcFDO,
jlong position, jlong count,
jobject dstFDO)
{
jint srcFD = fdval(env, srcFDO);
jint dstFD = fdval(env, dstFDO);
off64_t offset = (off64_t)position;
jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
return n;
result = sendfilev64(dstFD, &sfv, 1, &numBytes);
return result;
result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
return result;
}
ssize_t sendfile64(int out_fd, int in_fd, off_t *offset, size_t count);
下面简单介绍一下 sendfile64() 函数各个参数的含义:
out_fd:待写入的文件描述符。
in_fd:待读取的文件描述符。
offset:指定 in_fd 对应文件流的读取位置,如果为空,则默认从起始位置开始。
count:指定在文件描述符 in_fd 和 out_fd 之间传输的字节数。
其它的零拷贝实现
Netty 零拷贝
Netty 通过 DefaultFileRegion 类对 java.nio.channels.FileChannel 的 tranferTo() 方法进行包装,在文件传输时可以将文件缓冲区的数据直接发送到目的通道(Channel)。
ByteBuf 可以通过 wrap 操作把字节数组、ByteBuf、ByteBuffer 包装成一个 ByteBuf 对象, 进而避免了拷贝操作。
ByteBuf 支持 Slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免了内存的拷贝。
Netty 提供了 CompositeByteBuf 类,它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。
RocketMQ 和 Kafka 对比
但是值得注意的一点是,Kafka 的索引文件使用的是 mmap+write 方式,数据文件使用的是 Sendfile 方式。
总结
作者:陈林
简介:五年研发与架构经验,曾任职 SAP 中国研发中心后端研发、上海冰鉴科技信息科技有限公司架构师助理,目前担任成都 ThoughtWorks 有限公司高级咨询师与研发人员。熟悉大数据、高并发、负载均衡、缓存、数据库、消息中间件、搜索引擎、容器和自动化等领域。个人学习能力强,技术热情高,热爱开源和写技术博客,善于沟通和分享。
编辑:陶家龙、孙淑娟
征稿:有投稿、寻求报道意向技术人请联络 editor@51cto.com
精彩文章推荐: