Multiprocessing¶
Library that launches and manages n copies of worker subprocesses either specified by a function or a binary.
For functions, it uses torch.multiprocessing (and therefore python
multiprocessing) to spawn/fork worker processes. For binaries it uses python
subprocessing.Popen to create worker processes.
Usage 1: Launching two trainers as a function
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
    pass # train
# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
        name="trainer",
        entrypoint=trainer,
        args={0: (1,2,3), 1: (4,5,6)},
        envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
        log_dir="/tmp/foobar",
        redirects=Std.ALL, # write all worker stdout/stderr to a log file
        tee={0: Std.ERR}, # tee only local rank 0's stderr to console
      )
# waits for all copies of trainer to finish
ctx.wait()
Usage 2: Launching 2 echo workers as a binary
# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )
Just like torch.multiprocessing, the return value of the function
start_processes() is a process context (api.PContext). If a function
was launched, a api.MultiprocessContext is returned and if a binary
was launched a api.SubprocessContext is returned. Both are specific
implementations of the parent api.PContext class.
Starting Multiple Workers¶
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, log_dir, log_line_prefixes=None, start_method='spawn', redirects=Std.NONE, tee=Std.NONE)[source]¶
- Start - ncopies of- entrypointprocesses with the provided options.- entrypointis either a- Callable(function) or a- str(binary). The number of copies is determined by the number of entries for- argsand- envsarguments, which need to have the same key set.- argsand- envparameters are the arguments and environment variables to pass down to the entrypoint mapped by the replica index (local rank). All local ranks must be accounted for. That is, the keyset should be- {0,1,...,(nprocs-1)}.- Note - When the - entrypointis a binary (- str),- argscan only be strings. If any other type is given, then it is casted to a string representation (e.g.- str(arg1)). Furthermore, a binary failure will only write an- error.jsonerror file if the main function is annotated with- torch.distributed.elastic.multiprocessing.errors.record. For function launches, this is done by default and there is no need to manually annotate with the- @recordannotation.- redirectsand- teeare bitmasks specifying which std stream(s) to redirect to a log file in the- log_dir. Valid mask values are defined in- Std. To redirect/tee only certain local ranks, pass- redirectsas a map with the key as the local rank to specify the redirect behavior for. Any missing local ranks will default to- Std.NONE.- teeacts like the unix “tee” command in that it redirects + prints to console. To avoid worker stdout/stderr from printing to console, use the- redirectsparameter.- For each process, the - log_dirwill contain:- {local_rank}/error.json: if the process failed, a file with the error info
- {local_rank}/stdout.json: if- redirect & STDOUT == STDOUT
- {local_rank}/stderr.json: if- redirect & STDERR == STDERR
 - Note - It is expected that the - log_direxists, is empty, and is a directory.- Example: - log_dir = "/tmp/test" # ok; two copies of foo: foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # invalid; envs missing for local rank 1 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # ok; two copies of /usr/bin/touch: touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # caution; arguments casted to string, runs: # echo "1" "2" "3" and echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir ) - Parameters
- name (str) – a human readable short name that describes what the processes are (used as header when tee’ing stdout/stderr outputs) 
- entrypoint (Union[Callable, str]) – either a - Callable(function) or- cmd(binary)
- log_dir (str) – directory used to write log files 
- start_method (str) – multiprocessing start method (spawn, fork, forkserver) ignored for binaries 
- redirects (Union[Std, Dict[int, Std]]) – which std streams to redirect to a log file 
- tee (Union[Std, Dict[int, Std]]) – which std streams to redirect + print to console 
 
- Return type
 
Process Context¶
- class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, stdouts, stderrs, tee_stdouts, tee_stderrs, error_files, log_line_prefixes=None)[source]¶
- The base class that standardizes operations over a set of processes that are launched via different mechanisms. - The name - PContextis intentional to disambiguate with- torch.multiprocessing.ProcessContext.- Warning - stdouts and stderrs should ALWAYS be a superset of tee_stdouts and tee_stderrs (respectively) this is b/c tee is implemented as a redirect + tail -f <stdout/stderr.log> 
- class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, stdouts, stderrs, tee_stdouts, tee_stderrs, error_files, start_method, log_line_prefixes=None)[source]¶
- PContextholding worker processes invoked as a function.
- class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, stdouts, stderrs, tee_stdouts, tee_stderrs, error_files, log_line_prefixes=None)[source]¶
- PContextholding worker processes invoked as a binary.
- class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source]¶
- Results of a completed run of processes started with - start_processes(). Returned by- PContext.- Note the following: - All fields are mapped by local rank 
- return_values- only populated for functions (not the binaries).
- stdouts- path to stdout.log (empty string if no redirect)
- stderrs- path to stderr.log (empty string if no redirect)