其他
说说Flink DataStream的八种物理分区逻辑
By 大数据技术与架构
GlobalPartitioner
// dataStream.global()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
ShufflePartitioner
private Random random = new Random();
// dataStream.shuffle()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
RebalancePartitioner
private int nextChannelToSendTo;
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
// dataStream.rebalance()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
KeyGroupStreamPartitioner
private final KeySelector<T, K> keySelector;
private int maxParallelism;
// dataStream.keyBy()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
BroadcastPartitioner
// dataStream.broadcast()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
return true;
}
RescalePartitioner
private int nextChannelToSendTo = -1;
// dataStream.rescale()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
resultPartitionType);
ForwardPartitioner
// dataStream.forward()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
CustomPartitionerWrapper
Partitioner<K> partitioner;
KeySelector<T, K> keySelector;
// dataStream.partitionCustom()
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
}
return partitioner.partition(key, numberOfChannels);
}
sourceStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;
}
}, 0);
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! 👇