Flink原理 | Flink中的数据抽象及数据交换过程
By 大数据技术与架构
Flink的数据抽象
MemorySegment
Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。
这个MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。
如果说byte[]数组和direct memory是最底层的存储,那么memorysegment就是在其上覆盖的一层统一抽象。它定义了一系列抽象方法,用于控制和底层内存的交互,如:
public abstract class MemorySegment {
public abstract byte get(int index);
public abstract void put(int index, byte b);
public int size() ;
public abstract ByteBuffer wrap(int offset, int length);
......
}
我们可以看到,它在提供了诸多直接操作内存的方法外,还提供了一个wrap()方法,将自己包装成一个ByteBuffer,我们待会儿讲这个ByteBuffer。
Flink为MemorySegment提供了两个实现类:HeapMemorySegment和HybridMemorySegment。他们的区别在于前者只能分配堆内存,而后者能用来分配堆内和堆外内存。事实上,Flink框架里,只使用了后者。这是为什么呢?
如果HybridMemorySegment只能用于分配堆外内存的话,似乎更合常理。但是在JVM的世界中,如果一个方法是一个虚方法,那么每次调用时,JVM都要花时间去确定调用的到底是哪个子类实现的该虚方法(方法重写机制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而如果该方法虽然是个虚方法,但实际上整个JVM里只有一个实现(就是说只加载了一个子类进来),那么JVM会很聪明的把它去虚化处理,这样就不用每次调用方法时去找方法表了,能够大大提升性能。但是只分配堆内或者堆外内存不能满足我们的需要,所以就出现了HybridMemorySegment同时可以分配两种内存的设计。
我们可以看看HybridMemorySegment的构造代码:
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
其中,第一个构造函数的checkBufferAndGetAddress()方法能够得到direct buffer的内存地址,因此可以操作堆外内存。
ByteBuffer与NetworkBufferPool
在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。注意,这个Buffer是个flink接口,不是java.nio提供的那个Buffer抽象类。Flink在这一层面同时使用了这两个同名概念,用来存储对象,直接看代码时到处都是各种xxxBuffer很容易混淆:java提供的那个Buffer抽象类在这一层主要用于构建HeapByteBuffer,这个主要是当数据从jvm里的一个对象被序列化成字节数组时用的;
Flink的这个Buffer接口主要是一种flink层面用于传输数据和事件的统一抽象,其实现类是NetworkBuffer,是对MemorySegment的包装。Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。
因为Buffer的底层是MemorySegment,这可能不是JVM所管理的,所以为了知道什么时候一个Buffer用完了可以回收,Flink引入了引用计数的概念,当确认这个buffer没有人引用,就可以回收这一片MemorySegment用于别的地方了(JVM的垃圾回收为啥不用引用计数?读者思考一下):
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private volatile int refCnt = 1;
......
}
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
......
try {
this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
}
......
long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
}
由于NetworkBufferPool只是个工厂,实际的内存池是LocalBufferPool。每个TaskManager都只有一个NetworkBufferPool工厂,但是上面运行的每个task都要有一个和其他task隔离的LocalBufferPool池,这从逻辑上很好理解。另外,NetworkBufferPool会计算自己所拥有的所有内存分片数,在分配新的内存池时对每个内存池应该占有的内存分片数重分配,步骤是:首先,从整个工厂管理的内存片中拿出所有的内存池所需要的最少Buffer数目总和
如果正好分配完,就结束其次,把所有的剩下的没分配的内存片,按照每个LocalBufferPool内存池的剩余想要容量大小进行按比例分配
剩余想要容量大小是这么个东西:如果该内存池至少需要3个buffer,最大需要10个buffer,那么它的剩余想要容量就是7
实现代码如下:
private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
for (LocalBufferPool bufferPool : allBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
}
long totalCapacity = 0; // long to avoid int overflow
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}
// no capacity to receive additional buffers?
if (totalCapacity == 0) {
return; // necessary to avoid div by zero when nothing to re-distribute
}
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
// shortcut
if (excessMax == 0) {
continue;
}
totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
assert (totalPartsUsed == totalCapacity);
assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
接下来说说这个LocalBufferPool内存池。LocalBufferPool的逻辑想想无非是增删改查,值得说的是其fields:
/** 该内存池需要的最少内存片数目*/
private final int numberOfRequiredMemorySegments;
/**
* 当前已经获得的内存片中,还没有写入数据的空白内存片
*/
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
/**
* 注册的所有监控buffer可用性的监听器
*/
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();
/** 能给内存池分配的最大分片数*/
private final int maxNumberOfMemorySegments;
/** 当前内存池大小 */
private int currentPoolSize;
/**
* 所有经由NetworkBufferPool分配的,被本内存池引用到的(非直接获得的)分片数
*/
private int numberOfRequestedMemorySegments;
承接NetworkBufferPool的重分配方法,我们来看看LocalBufferPool的setNumBuffers()方法,代码很短,逻辑也相当简单,就不展开说了:public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}
returnExcessMemorySegments();
// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
}
}
}
RecordWriter与Record
我们接着往高层抽象走,刚刚提到了最底层内存抽象是MemorySegment,用于数据传输的是Buffer,那么,承上启下对接从Java对象转为Buffer的中间对象是什么呢?是StreamRecord。
从StreamRecord<T>这个类名字就可以看出来,这个类就是个wrap,里面保存了原始的Java对象。另外,StreamRecord还保存了一个timestamp。
那么这个对象是怎么变成LocalBufferPool内存池里的一个大号字节数组的呢?借助了StreamWriter这个类。
我们直接来看把数据序列化交出去的方法:
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
先说最后一行,如果配置为flushAlways,那么会立刻把元素发送出去,但是这样吞吐量会下降;Flink的默认设置其实也不是一个元素一个元素的发送,是单独起了一个线程,每隔固定时间flush一次所有channel,较真起来也算是mini batch了。
再说序列化那一句:SerializationResult result = serializer.addRecord(record);。在这行代码中,Flink把对象调用该对象所属的序列化器序列化为字节数组。
数据流转过程
上一节讲了各层数据的抽象,这一节讲讲数据在各个task之间exchange的过程。
整体过程
看这张图:
1.第一步必然是准备一个ResultPartition;
2.通知JobMaster;
3.JobMaster通知下游节点;如果下游节点尚未部署,则部署之;
4.下游节点向上游请求数据
5.开始传输数据
数据跨task传递
本节讲一下算子之间具体的数据传输过程。也先上一张图:
数据在task之间传递有如下几步:
1.数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。
2.每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer
3.接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment)
4.单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入
5.对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel
6.有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码
数据在不同机器的算子之间传递的步骤就是以上这些。
了解了步骤之后,再来看一下部分关键代码:
首先是把数据交给recordwriter。
//RecordWriterOutput.java
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
//这里可以看到把记录交给了recordwriter
pushToRecordWriter(record);
}
然后recordwriter把数据发送到对应的通道。//RecordWriter.java
public void emit(T record) throws IOException, InterruptedException {
//channelselector登场了
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
//选择序列化器并序列化数据
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
//写入channel
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
接下来是把数据推给底层设施(netty)的过程://ResultPartition.java
@Override
public void flushAll() {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.flush();
}
}
//PartitionRequestQueue.java
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
//这里交给了netty server线程去推
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.pipeline().fireUserEventTriggered(reader);
}
});
}
netty相关的部分://AbstractChannelHandlerContext.java
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
} else {
final AbstractChannelHandlerContext next = this.findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new OneTimeTask() {
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
return this;
}
}
最后真实的写入://PartittionRequesetQueue.java
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
......
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
markAsReleased(reader.getReceiverId());
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg = new ErrorResponse(
new ProducerFailedException(cause),
reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
BufferResponse msg = new BufferResponse(
next.buffer(),
reader.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
if (isEndOfPartitionEvent(next.buffer())) {
reader.notifySubpartitionConsumed();
reader.releaseAllResources();
markAsReleased(reader.getReceiverId());
}
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);
return;
}
......
}
上面这段代码里第二个方法中调用的writeAndFlush(msg)就是真正往netty的nio通道里写入的地方了。在这里,写入的是一个RemoteInputChannel,对应的就是下游节点的InputGate的channels。
有写就有读,nio通道的另一端需要读入buffer,代码如下:
//CreditBasedPartitionRequestClientHandler.java
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
decodeBufferOrEvent(inputChannel, bufferOrEvent);
}
......
}
插一句,Flink其实做阻塞和获取数据的方式非常自然,利用了生产者和消费者模型,当获取不到数据时,消费者自然阻塞;当数据被加入队列,消费者被notify。Flink的背压机制也是借此实现。
然后在这里又反序列化成StreamRecord:
//StreamElementSerializer.java
public StreamElement deserialize(DataInputView source) throws IOException {
int tag = source.readByte();
if (tag == TAG_REC_WITH_TIMESTAMP) {
long timestamp = source.readLong();
return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
}
else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
return new StreamRecord<T>(typeSerializer.deserialize(source));
}
else if (tag == TAG_WATERMARK) {
return new Watermark(source.readLong());
}
else if (tag == TAG_STREAM_STATUS) {
return new StreamStatus(source.readInt());
}
else if (tag == TAG_LATENCY_MARKER) {
return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
}
else {
throw new IOException("Corrupt stream, found tag: " + tag);
}
}
然后再次在StreamInputProcessor.processInput()循环中得到处理。
至此,数据在跨jvm的节点之间的流转过程就讲完了。
来源:jianshu.com/p/eaafc9db3f74
文章不错?点个【在看】吧! 👇