我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com
目录
深入理解缓冲的本质
缓冲(Buffer)通过对数据进行暂存,然后批量进行传输或者操作,多采用顺序方式,来缓解不同设备之间次数频繁但速度缓慢的随机读写。
你可以把缓冲区,想象成一个蓄水池。放水的水龙头一直开着,如果池子里有水,它就以恒定的速度流淌,不需要暂停;供水的水龙头速度却不确定,有时候会快一些,有时候会特别慢。它通过判断水池里水的状态,就可以自由控制进水的速度。
或者再想象一下包饺子的过程,包馅的需要等着擀皮的。如果擀皮的每擀一个就交给包馅的,速度就会很慢;但如果中间放一个盆子,擀皮的只管往里扔,包馅的只管从盆里取,这个过程就快得多。许多工厂流水线也经常使用这种方法,可见“缓冲”这个理念的普及性和实用性。
从宏观上来说,JVM 的堆就是一个大的缓冲区,代码不停地在堆空间中生产对象,而垃圾回收器进程则在背后默默地进行垃圾回收。
通过上述比喻和释意,你可以发现缓冲区的好处:
缓冲双方能各自保持自己的操作节奏,操作处理顺序也不会打乱,可以 one by one 顺序进行;
以批量的方式处理,减少网络交互和繁重的 I/O 操作,从而减少性能损耗;
优化用户体验,比如常见的音频/视频缓冲加载,通过提前缓冲数据,达到流畅的播放效果。
Java 语言广泛应用了缓冲,在 IDEA 中搜索 Buffer,可以看到长长的类列表,其中最典型的就是文件读取和写入字符流。
文件读写流
接下来,本文将以文件读取和写入字符流为例进行讲解。
Java 的 I/O 流设计,采用的是装饰器模式,当需要给类添加新的功能时,就可以将被装饰者通过参数传递到装饰者,封装成新的功能方法。下图是装饰器模式的典型示意图,就增加功能来说,装饰模式比生成子类更为灵活。
在读取和写入流的 API 中,BufferedInputStream 和 BufferedReader 可以加快读取字符的速度,BufferedOutputStream 和 BufferedWriter 可以加快写入的速度。
下面是直接读取文件的代码实现:
int result = 0;
try(Reader reader=new FileReader(FILE_PATH)){
int value;
while((value=reader.read())!=-1){
result+=value;
}
}
return result;
int result = 0;
try (Reader reader = new BufferedReader(new FileReader(FILE_PATH))) {
int value;
while ((value = reader.read()) != -1) {
result += value;
}
}
return result;
//代码来自JDK
public synchronized int read () throws IOException {
if (pos >= count) {
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
//代码来自JDK
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) /* no room left in buffer */
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else { /* grow buffer */
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte[] nbuf = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!U.compareAndSetObject(this, BUF_OFFSET, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
package cn.wja;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
public class BenchmarkReader {
private static final String FILE_PATH = "F:\\农民工老王\\servertools.zip";
@Benchmark
public int bufferedReaderTest() throws Exception {
int result = 0;
try (Reader reader = new BufferedReader(new FileReader(FILE_PATH))) {
int value;
while ((value = reader.read()) != -1) {
result += value;
}
}
return result;
}
@Benchmark
public int fileReadTest() throws Exception {
int result = 0;
try (Reader reader = new FileReader(FILE_PATH)) {
int value;
while ((value = reader.read()) != -1) {
result += value;
}
}
return result;
}
public static void main(String[] args) throws Exception {
Options opts = new OptionsBuilder().include(BenchmarkReader.class.getSimpleName()).build();
new Runner(opts).run();
}
}
日志缓冲
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="."/>
<property name="ENCODER_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%10.10thread] [%X{X-B3-TraceId}] %logger{20} - %msg%n"/>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/test.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/backup/log.log.%d{yyyy-MM-dd}</fileNamePattern>
<maxHistory>100</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<appender name="FILE2" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/test2.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/backup/log2.log.%d{yyyy-MM-dd}</fileNamePattern>
<maxHistory>100</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<queueSize>512</queueSize>
<appender-ref ref="FILE"/>
</appender>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>${ENCODER_PATTERN}</pattern>
</layout>
</appender>
<logger name="cn.wja.log.synclog" level="INFO">
<appender-ref ref="ASYNC"/>
</logger>
<logger name="cn.wja.log.asynclog" level="INFO">
<appender-ref ref="FILE2"/>
</logger>
</configuration>
缓冲区优化思路
毫无疑问缓冲区是可以提高性能的,但它通常会引入一个异步的问题,使得编程模型变复杂。
通过文件读写流和 Logback 两个例子,我们来看一下对于缓冲区设计的一些常规操作。
如下图所示,资源 A 读取或写入一些操作到资源 B,这本是一个正常的操作流程,但由于中间插入了一个额外的存储层,所以这个流程被生生截断了,这时就需要你手动处理被截断两方的资源协调问题。
同步操作的编程模型相对简单,在一个线程中就可完成,你只需要控制缓冲区的大小,并把握处理的时机。比如,缓冲区大小达到阈值,或者缓冲区的元素在缓冲区的停留时间超时,这时就会触发批量操作。
由于所有的操作又都在单线程,或者同步方法块中完成,再加上资源 B 的处理能力有限,那么很多操作就会阻塞并等待在调用线程上。比如写文件时,需要等待前面的数据写入完毕,才能处理后面的请求。
异步操作就复杂很多。
缓冲区的生产者一般是同步调用,但也可以采用异步方式进行填充,一旦采用异步操作,就涉及缓冲区满了以后,生产者的一些响应策略。
此时,应该将这些策略抽象出来,根据业务的属性选择,比如直接抛弃、抛出异常,或者直接在用户的线程进行等待。你会发现它与线程池的饱和策略是类似的,这部分的详细概念将在后续的文章中讲解。
许多应用系统还会有更复杂的策略,比如在用户线程等待,设置一个超时时间,以及成功进入缓冲区之后的回调函数等。
对缓冲区的消费,一般采用开启线程的方式,如果有多个线程消费缓冲区,还会存在信息同步和顺序问题。
这里以一个常见的面试题来讲解上面的知识点:Kafka 的生产者,有可能会丢数据吗?
如图,要想解答这个问题,需要先了解 Kafka 对生产者的一些封装,其中有一个对性能影响非常大的点,就是缓冲。
生产者会把发送到同一个 partition 的多条消息,封装在一个 batch(缓冲区)中。当 batch 满了(参数 batch.size),或者消息达到了超时时间(参数 linger.ms),缓冲区中的消息就会被发送到 broker 上。
这个缓冲区默认是 16KB,如果生产者的业务突然断电,这 16KB 数据是没有机会发送出去的。此时,就造成了消息丢失。
解决的办法有两种:
把缓冲区设置得非常小,此时消息会退化成单条发送,这会严重影响性能;
消息发送前记录一条日志,消息发送成功后,通过回调再记录一条日志,通过扫描生成的日志,就可以判断哪些消息丢失了。
另外一个面试的问题是:Kafka 生产者会影响业务的高可用吗?
缓冲区的其他案例
缓冲区的注意事项
虽然缓冲区可以帮我们大大地提高应用程序的性能,但同时它也有不少问题,在我们设计时,要注意这些异常情况。
其中,比较严重就是缓冲区内容的丢失。即使你使用 addShutdownHook 做了优雅关闭,有些情形依旧难以防范避免,比如机器突然间断电,应用程序进程突然死亡等。这时,缓冲区内未处理完的信息便会丢失,尤其金融信息,电商订单信息的丢失都是比较严重的。
所以,内容写入缓冲区之前,需要先预写日志,故障后重启时,就会根据这些日志进行数据恢复。在数据库领域,文件缓冲的场景非常多,一般都是采用 WAL 日志(Write-Ahead Logging)解决。对数据完整性比较严格的系统,甚至会通过电池或者 UPS 来保证缓冲区的落地。这就是性能优化带来的新问题,必须要解决。
小结
可以看到,缓冲区优化是对正常的业务流程进行截断,然后加入缓冲组件的一个操作,它分为同步和异步方式,其中异步方式的实现难度相对更高。
大多数组件,从操作系统到数据库,从 Java 的 API 到一些中间件,都可以通过设置一些参数,来控制缓冲区大小,从而取得较大的性能提升。但需要注意的是,某些极端场景(断电、异常退出、kill -9等)可能会造成数据丢失,若你的业务对此容忍度较低,那么你需要花更多精力来应对这些异常。
在我们面试的时候,除了考察大家对知识细节的掌握程度,还会考察总结能力,以及遇到相似问题的分析能力。大家在平常的工作中,也要多多总结,多多思考,窥一斑而知全貌。如此回答,必会让面试官眼前一亮。
如喜欢本文,请点击右上角,把文章分享到朋友圈
如有想了解学习的技术点,请留言给若飞安排分享
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享
·END·
相关阅读:
作者:农民工老王
来源:blog.csdn.net/monarch91/article/details/123688424
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com