其他
云原生的弹性 AI 训练系列之二:PyTorch 1.9.0 弹性分布式训练的设计与实现
高策,腾讯高级工程师,Kubeflow 社区训练和自动机器学习工作组 Tech Lead,负责腾讯云 TKE 在 AI 场景的产品研发和支持工作。
背景
torch.distributed.run
。PyTorch 1.9.0 之前的设计
python -m torch.distributed.launch
--nnodes=NODE_SIZE
--nproc_per_node=TRAINERS_PER_NODE
--node_rank=NODE_RANK
--master_port=HOST_PORT
--master_addr=HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
nnodes
是参与训练的节点个数,nproc_per_node
是每个节点上运行的进程数量。node_rank
是当前节点的标识符,master_addr
和 master_port
是 master 监听的地址和端口。torch.distributed.launch
会设置一些环境变量,其中包括 WORLD_SIZE
和 MASTER_PORT
、MASTER_ADDR
等。TRAINERS_PER_NODE
个进程,这些进程组成了一个 local worker group。一共有 NODE_SIZE
个机器参与训练,一共有 NODE_SIZE * TRAINERS_PER_NODE
个进程。如果想要发起一个分布式训练任务,需要在所有的机器上执行相应的命令。PyTorch 1.9.0 中的新设计
torch.distributed.launch
即将被废弃[2],取而代之的是基于 pytorch/elastic[3] 的 torch.distributed.run
。这一新的方式与之前相比有一些使用上的改动,如下所示。python -m torch.distributed.run
--nnodes=MIN_SIZE:MAX_SIZE
--nproc_per_node=TRAINERS_PER_NODE
--rdzv_id=JOB_ID
--rdzv_backend=c10d
--rdzv_endpoint=HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
nnodes
的设置不再是一个固定的值,而是一个区间。训练任务可以容忍在这一区间范围内的 worker 数量变化。如果要支持弹性能力,训练代码也需要进行一些修改。def main():
args = parse_args(sys.argv[1:])
state = load_checkpoint(args.checkpoint_path)
initialize(state)
# torch.distributed.run ensure that this will work
# by exporting all the env vars needed to initialize the process group
torch.distributed.init_process_group(backend=args.backend)
for i in range(state.epoch, state.total_num_epochs)
for batch in iter(state.dataset)
train(batch, state.model)
state.epoch += 1
save_checkpoint(state)
torch.distributed.run
开始,介绍这些新的设计。def run(args):
if args.standalone:
args.rdzv_backend = "c10d"
args.rdzv_endpoint = "localhost:29400"
args.rdzv_id = str(uuid.uuid4())
log.info(
f"\n**************************************\n"
f"Rendezvous info:\n"
f"--rdzv_backend={args.rdzv_backend} "
f"--rdzv_endpoint={args.rdzv_endpoint} "
f"--rdzv_id={args.rdzv_id}\n"
f"**************************************\n"
)
config, cmd, cmd_args = config_from_args(args)
elastic_launch(
config=config,
entrypoint=cmd,
)(*cmd_args)
rdzv_backend
和 rdzv_endpoint
等。class elastic_launch:
...
def __call__(self, *args):
return launch_agent(self._config, self._entrypoint, list(args))
def launch_agent(
config: LaunchConfig,
entrypoint: Union[Callable, str, None],
args: List[Any],
) -> Dict[int, Any]:
...
agent = LocalElasticAgent(
spec=spec, start_method=config.start_method, log_dir=config.log_dir
)
...
result = agent.run()
...
return result.return_values
Elastic Agent 的设计:如何管理多个 worker 进程
rendezvous
进行 worker 之间的相互发现和对成员变动的同步。与此同时,通过对 worker 进程的监控,来捕获训练过程中的失效。其中核心的逻辑都包装在 LocalElasticAgent.run()
这一函数调用中。 def run(self, role: str = DEFAULT_ROLE) -> RunResult:
...
result = self._invoke_run(role)
return result
def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
...
self._initialize_workers(self._worker_group)
while True:
...
run_result = self._monitor_workers(self._worker_group)
state = run_result.state
...
if state == WorkerState.SUCCEEDED:
...
return run_result
elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
if self._remaining_restarts > 0:
...
self._restart_workers(self._worker_group)
else:
...
return run_result
elif state == WorkerState.HEALTHY:
...
if num_nodes_waiting > 0:
...
self._restart_workers(self._worker_group)
else:
raise Exception(f"[{role}] Worker group in {state.name} state")
_invoke_run
中。其中 _initialize_workers
执行了大部分初始化的工作,其中包括为每个 worker 分配 RANK 等。在默认的实现中 elastic agent 和 workers 进程在同一机器上,因此 self._monitor_workers(self._worker_group)
通过 multiprocessing
对 workers 的运行状态进行了监控。并且根据不同的状态,进行不同的处理。ElasticAgent
、SimpleElasticAgent
和 LocalElasticAgent
。ElasticAgent
是一个 Abstract Class,SimpleElasticAgent
对其中的某些函数进行了实现,而 LocalElasticAgent
则实现了管理单机上所有 worker 进程的 elastic agent。SimpleElasticAgent
这一个抽象主要是为了方便扩展新的 agent 实现,比如如果你想通过一个 agent 管理多机上所有的 worker,而不只是本机上的 worker,则可以通过扩展 SimpleElasticAgent
来实现。rendezvous 的设计:如何在不同的节点间确定 RANK
rendezvous
。为了实现弹性训练,worker 之间要能够动态地进行 membership 的变更。rendezvous
就是实现这一特性的用于同步的组件。rendezvous
最核心的方法是: @abstractmethod
def next_rendezvous(
self,
) -> Tuple[Store, int, int]:
"""Main entry-point into the rendezvous barrier.
Blocks until the rendezvous is complete and the current process is
included in the formed worker group, or a timeout occurs, or the
rendezvous was marked closed.
Returns:
A tuple of :py:class:`torch.distributed.Store`, ``rank``, and
``world size``.
Raises:
RendezvousClosedError:
The rendezvous is closed.
RendezvousConnectionError:
The connection to the rendezvous backend has failed.
RendezvousStateError:
The rendezvous state is corrupt.
RendezvousTimeoutError:
The rendezvous did not complete on time.
"""
rendezvous
一共有四个实现,分别是 etcd
、etcd-v2
、c10d
和 static
。class EtcdRendezvousHandler(RendezvousHandler):
def next_rendezvous(self):
rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier()
log.info("Creating EtcdStore as the c10d::Store implementation")
store = self._rdzv_impl.setup_kv_store(rdzv_version)
return store, rank, world_size
etcd
相关的是之前推荐使用的实现,在 c10d
出现后就不再推荐了。etcd
的实现中,不同 worker 之间的状态通过 etcd 的 KV 接口存储。RANK
的过程如下图所示。/rdzv/active_version
下尝试写一个值 status: setup
。在整个过程中,/rdzv/active_version
会作为存储 rendezvous
过程中间状态的 KV store,以及 rendezvous
过程中的排他锁来使用。rendezvous
过程正在进行中。/rdzv/version_counter
为原值加一。然后会创建一个目录 /rdzv/v_${version_counter}
。这些操作做完后,会将 /rdzv/active_version
的状态写为 joinable,这时就进入了 join 阶段。/rdzv/active_version
下的 paticipants
,分配到递增的 rank,这里的 rank 并不是每个 worker 进程分配到的 global rank,而是 agent 自己的 rank。worker 进程的 rank 会根据 agent rank 经过一定的计算得到。这也是一个非常容易混淆的设计,窃以为有优化的空间。 def init_phase(self):
try:
active_version = self.try_create_rendezvous()
state = json.loads(active_version.value)
log.info("New rendezvous state created: " + str(state))
except etcd.EtcdAlreadyExist:
# 已经有了一个新的 rendezvous 过程
active_version, state = self.get_rdzv_state()
# Note: it is possible for above query to fail (etcd.EtcdKeyNotFound),
# but this is ok for us - just means we'll restart from beginning.
log.info("Observed existing rendezvous state: " + str(state))
if state["status"] == "closed":
raise RendezvousClosedError()
if state["status"] == "joinable":
return self.join_phase(state["version"])
if state["status"] == "final":
self.handle_existing_rendezvous(state["version"])
raise EtcdRendezvousRetryImmediately()
self.try_wait_for_state_change(etcd_index=active_version.etcd_index + 1)
raise EtcdRendezvousRetryableFailure()
/rdzv/v_${version_counter}/rank_${agent_rank}
下写值的方式进行确认。在所有节点都确认完毕后,会进入最后的 final 阶段。rendezvous
的节点上的 agent 会为其管理的 worker 进程分配 RANK
。RANK 0
的实例会作为 master 的角色存在。随后就会直接创建对应的 worker 进程。在默认的 LocalElasticAgent
中,会利用 python.multiprocessing
在本地创建多个进程。 @prof
def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
spec = worker_group.spec
store = worker_group.store
...
for worker in worker_group.workers:
local_rank = worker.local_rank
worker_env = {
"LOCAL_RANK": str(local_rank),
"RANK": str(worker.global_rank),
...
}
...
args[local_rank] = tuple(worker_args)
...
self._pcontext = start_processes(
name=spec.role,
entrypoint=spec.entrypoint,
args=args,
envs=envs,
log_dir=attempt_log_dir,
start_method=self._start_method,
redirects=spec.redirects,
tee=spec.tee,
)
return self._pcontext.pids()
c10d 新的设计
rendezvous
实现,它可以保证多个实例之间对于参与训练的节点共识的强一致,但是这也为 PyTorch 运行训练任务引入了额外的依赖。因此 PyTorch 也提供了一个内置的实现 c10d。相比于基于 etcd 的实现,c10d 基于 TCP 来进行同步。def create_backend(params: RendezvousParameters) -> Tuple[C10dRendezvousBackend, Store]:
...
if store_type == "file":
store = _create_file_store(params)
elif store_type == "tcp":
store = _create_tcp_store(params)
...
backend = C10dRendezvousBackend(store, params.run_id)
def _create_tcp_store(params: RendezvousParameters) -> TCPStore:
host, port = parse_rendezvous_endpoint(params.endpoint, default_port=29400)
...
for is_server in [is_host, False]:
...
store = TCPStore(
host, port, is_master=is_server, timeout=timedelta(seconds=read_timeout)
)
...
break
return store
compareAndSet
、add
等原语。它也可以被理解为一个简化的,提供 KV 接口的内存数据库,类似于 Redis。有关 rendezvous
的同步,都是由各个 agent 通过一个中心化的 agent 上的 c10d TCPServer 完成的。可以预见这样的实现在可用性上相比于 etcd 是有一定差距的,但是胜在易用性。用户如果使用 c10d,那么不再需要运维一个 etcd 集群。PyTorch Elastic on Kubernetes
apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
name: imagenet
namespace: elastic-job
spec:
# Use "etcd-service:2379" if you already apply etcd.yaml
rdzvEndpoint: "<your_etcd_endpoint>:<your_etcd_port>"
minReplicas: 1
maxReplicas: 2
replicaSpecs:
Worker:
replicas: 2
restartPolicy: ExitCode
template:
apiVersion: v1
kind: Pod
spec:
containers:
- name: elasticjob-worker
image: torchelastic/examples:0.2.0
imagePullPolicy: Always
args:
- "--nproc_per_node=1"
- "/workspace/examples/imagenet/main.py"
- "--arch=resnet18"
- "--epochs=20"
- "--batch-size=32"
# number of data loader workers (NOT trainers)
# zero means load the data on the same process as the trainer
# this is set so that the container does not OOM since
# pytorch data loaders use shm
- "--workers=0"
- "/workspace/data/tiny-imagenet-200"
resources:
limits:
nvidia.com/gpu: 1
c10d
的 rendezvous
还没有被支持,所以 CRD 中需要定义 rdzvEndpoint,指向一个已经部署好的 etcd 集群。同时,用户需要指定 minReplicas
和 maxReplicas
。其他就与 Kubeflow PyTorchJob 并无二致。PyTorch Elastic 与 Horovod Elastic
agent
、rendezvous
等接口,用户可以根据需要进行扩展。discovery_hosts.sh
,帮助其在运行时获得正在参与训练的节点。$ horovodrun -np 8 --host-discovery-script discover_hosts.sh python train.py
...
$ ./discover_hosts.sh
host-1:29500
host-2:29500
host-3:29500
总结
展望
参考资料
torch.distributed.launch
即将被废弃:【 https://github.com/pytorch/pytorch/issues/60754】
即将被废弃: 【 https://github.com/pytorch/pytorch/issues/60754】
[3]pytorch/elastic:【 https://github.com/pytorch/elastic】
[4]基于 Gossip 的协议: 【 https://ieeexplore.ieee.org/document/1028914】
互动赢好礼
精读文章,回答问题赢好礼
Q1: PyTorch 新设计中 rendezvous 的达成经过了哪些阶段?
Q2: 如果希望通过一个 agent 管理多机上所有的训练 worker 进程,而不只是本机上的 worker 进程,要怎样去实现?8月26日上午12点,由作者选出回答最佳的5位读者,送定制T恤一件。
往期精选推荐