torch.distributed.elastic.rendezvous.dynamic_rendezvous 的源代码
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证授权。
import inspect
import logging
import os
import pickle
import socket
import threading
import time
import weakref
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, cast, Dict, List, Optional, Set, Tuple
from torch.distributed import PrefixStore, Store
from torch.distributed.elastic.events import construct_and_record_rdzv_event, NodeState
from .api import (
RendezvousClosedError,
RendezvousError,
RendezvousGracefulExitError,
RendezvousHandler,
RendezvousParameters,
RendezvousStateError,
RendezvousTimeoutError,
)
from .utils import _delay, _PeriodicTimer
__all__ = ['RendezvousBackend', 'RendezvousTimeout', 'RendezvousSettings', 'DynamicRendezvousHandler', 'create_handler']
log = logging.getLogger(__name__)
def get_method_name(depth=2):
if len(inspect.stack()) > depth:
return inspect.stack()[depth].function
return "no_method_name"
Token = Any
"""表示由rendezvous后端使用的透明围栏令牌。"""
[docs]class RendezvousBackend(ABC):
"""表示一个持有rendezvous状态的后端。"""
@property
@abstractmethod
def name(self) -> str:
"""获取后端的名称。"""
[docs] @abstractmethod
def get_state(self) -> Optional[Tuple[bytes, Token]]:
"""获取rendezvous状态。
返回:
编码的rendezvous状态及其围栏令牌的元组,
如果后端中未找到状态,则返回``None``。
引发:
RendezvousConnectionError:
与后端的连接失败。
RendezvousStateError:
rendezvous状态已损坏。
"""
[docs] @abstractmethod
def set_state(
self, state: bytes, token: Optional[Token] = None
) -> Optional[Tuple[bytes, Token, bool]]:
"""设置rendezvous状态。
新的rendezvous状态是有条件地设置的:
- 如果指定的``token``与后端中存储的围栏令牌匹配,状态将被更新。
新的状态将返回给调用者及其围栏令牌。
- 如果指定的``token``与后端中存储的围栏令牌不匹配,状态将不会更新;
而是将现有的状态及其围栏令牌返回给调用者。
- 如果指定的``token``为``None``,新的状态将仅在
后端中没有现有状态时设置。新的状态或现有的状态及其围栏令牌将
返回给调用者。
参数:
state:
编码的rendezvous状态。
token:
一个可选的围栏令牌,通过之前的调用
获取,例如:py:meth:`get_state`或``set_state()``。
返回:
序列化的rendezvous状态、其围栏令牌以及
一个布尔值,指示我们的设置尝试是否成功。
引发:
RendezvousConnectionError:
与后端的连接失败。
RendezvousStateError:
rendezvous状态已损坏。
"""
[docs]class RendezvousTimeout:
"""持有rendezvous的超时配置。
参数:
join:
rendezvous预期完成的时间。
last_call:
在达到rendezvous所需的最小参与者数量后,
完成rendezvous之前的额外等待时间。
close:
在调用:py:meth:`RendezvousHandler.set_closed`或
:py:meth:`RendezvousHandler.shutdown`后,
rendezvous预期关闭的时间。
keep_alive:
预期完成keep-alive心跳的时间。
"""
_ZERO = timedelta(0)
_DEFAULT_TIMEOUTS = {
"join": timedelta(seconds=600),
"last_call": timedelta(seconds=30),
"close": timedelta(seconds=30),
"heartbeat": timedelta(seconds=5),
}
_join: timedelta
_last_call: timedelta
_close: timedelta
_heartbeat: timedelta
def __init__(
self,
join: Optional[timedelta] = None,
last_call: Optional[timedelta] = <span