其他
OneFlow源码解析:Global Tensor
# 终端一
export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=0 LOCAL_RANK=0
# 终端二
export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=1 LOCAL_RANK=1
import oneflow as flow
p = flow.placement("cpu", ranks=[0, 1])
sbp = flow.sbp.split(0)
x = flow.tensor([[1,2,3],[4,5,6]], placement=p, sbp=sbp)
print(x.shape)
print(x.to_local())
# 终端一
oneflow.Size([2, 3])
tensor([[1, 2, 3]], dtype=oneflow.int64)
# 终端二
oneflow.Size([2, 3])
tensor([[4, 5, 6]], dtype=oneflow.int64)
export xxx环境变量告诉oneflow环境用于通信的IP和Port,以及全局共有2个rank(WORLD_SIZE=2),终端一所在的是rank0,终端二所在的是rank1。 p = flow.placement("cpu", ranks=[0, 1]) 设置了global tensor将会被放置于rank0和rank1。sbp = flow.sbp.split(0) 设置了global tensor的sbp属性为split,即按第0维度进行切分。x = flow.tensor([[1,2,3],[4,5,6]], placement=p, sbp=sbp) 从python list数据配合sbp和placement构造了一个global tensor x。
split表示global tensor在各个rank(物理设备)都存在分片,每个分片可以看作是将global tensor沿着某一维度切分得到的本rank分量(rank由placement指定)。 broadcast表示global tensor在每个rank上完全一样,等价于从某个rank复制并广播至所有rank。 partial表示global tensor与物理设备上的tensor的形状相同,但是物理设备上的值,只是global tensor的一部分,global tensor的值需要这些rank上的local tensor进行 sum、max、mean等类似操作。
machine_id=rank / NumOfProcessPerNode
(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/rpc/lib/global_process_ctx.cpp#L56)
device_id=rank % NumOfProcessPerNode (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/rpc/lib/global_process_ctx.cpp#L84)
DataConsistencyCheck(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L251)会在tensor的placement涉及的各个节点间拷贝数据、校验数据是否一致。 functional::Empty (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L256)会根据shape和dtype构造一个local tensor,并等待随后填充数据(这里和之前讨论local tensor的过程一致)。 SwitchCopyLocalTensorFromUntypedArray(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L257)为empty的local tensor填充数据,数据既可以是本例中的python list,也可以是numpy的ndarray。 functional::Cast (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L267)进行数据类型dtype的转换。 functional::LocalToGlobal (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L272-L274)把local tensor转为global tensor,但这个只是用于broadcast 至指定placement的临时的global tensor(sbp list全部为broadcast,用于广播)。 functional::ToGlobal (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/api/python/utils/tensor_utils.cpp#L277-L279)将临时的global tensor根据placement和sbp,ToGlobal转换为最终的global tensor。
# 终端一
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=0 LOCAL_RANK=0
# 终端二
# export MASTER_ADDR=127.0.0.1 MASTER_PORT=17789 WORLD_SIZE=2 RANK=1 LOCAL_RANK=1
import oneflow as flow
p = flow.placement("cpu", ranks=[0, 1])
sbp = flow.sbp.split(0)
x = flow.randn(4, 5, placement=p, sbp=sbp)
print(x.shape) # (4,5)
print(x.to_local().shape) # (2,5)
randn op在local和global下分别对应着不同的functor实现:
# oneflow/core/functional/functional_api.yaml
- name: "randn"
signature: [
"Tensor (Shape size, *, DataType dtype=None, Device device=None,
Generator generator=None, Bool requires_grad=False) => RandN",
"Tensor (Shape size, *, Placement placement, SbpList sbp, DataType dtype=None,
Generator generator=None, Bool requires_grad=False) => GlobalRandN",
]
bind_python: True
GlobalRandNFunctor (https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/functional/impl/random_functor.cpp#L194)中主要dispatch了"normal" op,在Eager Global的mode下, 会交给 EagerGlobalInterpreter 进行各种推导和准备工作(Interpret[https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L110]),并在Interpret 方法里通过PhysicalRun ,将normal op执行的指令交给虚拟机调度并执行。EagerGlobalTensorImpl::New(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/op_interpreter/eager_global_op_interpreter.cpp#L138)时会调用GetPhysicalShape(https://github.com/Oneflow-Inc/oneflow/blob/fca713f45a2f55379eb4284848a8f62d0f266283/oneflow/core/framework/tensor_impl.cpp#L207)获取local tensor的shape。
参考资料:
OneFlow源码
(https://github.com/Oneflow-Inc/oneflow/commit/fca713f45a2f55379eb4284848a8f62d0f266283)
Global Tensor:https://docs.oneflow.org/master/parallelism/03_consistent_tensor.html
集群的全局视角:https://docs.oneflow.org/master/parallelism/02_sbp.html
其他人都在看
OneFlow的大模型分片保存和加载策略 开源吞噬AI界?从Stable Diffusion的爆火说起 九大深度学习库;谷歌文字生成视频的两大利器 OneEmbedding:单卡训练TB级推荐模型不是梦 大模型训练难?效率超群、易用的“李白”模型库来了