Shortcuts

Source code for torchtune.utils._device

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import os
from enum import Enum
from typing import Optional, Tuple

import torch

from torchtune.utils._import_guard import _SUPPORTS_FLEX_ATTENTION
from torchtune.utils._logging import get_logger

logger = get_logger("DEBUG")

if _SUPPORTS_FLEX_ATTENTION:
    from torch.nn.attention.flex_attention import BlockMask
else:
    BlockMask = torch.Tensor


[docs]def get_world_size_and_rank() -> Tuple[int, int]: """Function that gets the current world size (aka total number of ranks) and rank number of the current process in the default process group. Returns: Tuple[int, int]: world size, rank """ if torch.distributed.is_available() and torch.distributed.is_initialized(): return torch.distributed.get_world_size(), torch.distributed.get_rank() else: return 1, 0
def is_torch_npu_available() -> bool: """Check the availability of NPU""" try: import torch_npu # noqa: F401 return torch.npu.is_available() except ImportError: return False is_npu_available = is_torch_npu_available() def _get_local_rank() -> Optional[int]: """Function that gets the local rank from the environment. Returns: local_rank int or None if not set. """ local_rank = os.environ.get("LOCAL_RANK") if local_rank is not None: local_rank = int(local_rank) return local_rank def _setup_device(device: torch.device) -> torch.device: """Function that sets the CUDA-like device and infers the device index if not set. Args: device (torch.device): The device to set. Raises: RuntimeError: If device index is not available. AttributeError: If ``set_device`` is not supported for the device type (e.g. on MPS). Returns: device """ local_rank = _get_local_rank() or 0 device_support = get_device_support() device_type = device_support.device_type device_name = device_support.device_name torch_device = get_torch_device_namespace() if device.index is None: device = torch.device(type=device_type, index=local_rank) # Ensure index is available before setting device if device.index >= torch_device.device_count(): raise RuntimeError( f"The local rank is larger than the number of available {device_name}s." ) if not hasattr(torch_device, "set_device"): raise AttributeError( f"The device type {device_type} does not support the `set_device` method." ) torch_device.set_device(device) return device def _get_device_type_from_env() -> str: """Function that gets the torch.device based on the current machine. This currently only supports CPU, CUDA, NPU. Returns: device """ if torch.cuda.is_available(): device = "cuda" elif is_npu_available: device = "npu" elif torch.xpu.is_available(): device = "xpu" else: device = "cpu" return device def _validate_device_from_env(device: torch.device) -> None: """Function that validates the device is correct given the current machine. This will raise an error if the device is not available or doesn't match the assigned process device on distributed runs. Args: device (torch.device): The device to validate. Raises: RuntimeError: If the device is not available or doesn't match the assigned process device. Returns: device """ local_rank = _get_local_rank() # Check if the device index is correct if device.type != "cpu" and local_rank is not None: # Ensure device index matches assigned index when distributed training if device.index != local_rank: raise RuntimeError( f"You can't specify a device index when using distributed training. " f"Device specified is {device} but local rank is:{local_rank}" ) # Check if the device is available on this machine try: torch.empty(0, device=device) except RuntimeError as e: raise RuntimeError( f"The device {device} is not available on this machine." ) from e
[docs]def get_device(device: Optional[str] = None) -> torch.device: """Function that takes an optional device string, verifies it's correct and available given the machine and distributed settings, and returns a :func:`~torch.device`. If device string is not provided, this function will infer the device based on the environment. If CUDA-like is available and being used, this function also sets the CUDA-like device. Args: device (Optional[str]): The name of the device to use, e.g. "cuda" or "cpu" or "npu" or "xpu". Example: >>> device = get_device("cuda") >>> device device(type='cuda', index=0) Returns: torch.device: Device """ if device is None: device = _get_device_type_from_env() device = torch.device(device) if device.type in ["cuda", "npu", "xpu"]: device = _setup_device(device) _validate_device_from_env(device) return device
[docs]def batch_to_device(batch: dict, device: torch.device) -> None: """Function that takes a dictionary (or nested dictionary) of tensors and sets them all to the same device. This utility is intended to be used for batches of data to be moved to device, the update is inplace. Args: batch (dict): dict of Tensors or more nested dicts of tensors. device (torch.device): torch device to move the tensors to. Raises: ValueError: if batch dict contains anything other than ``torch.Tensor``. """ for k, v in batch.items(): if isinstance(v, dict): batch_to_device(v, device) elif isinstance(v, torch.Tensor): batch[k] = v.to(device) elif _SUPPORTS_FLEX_ATTENTION and isinstance(v, BlockMask): batch[k] = v.to(device) else: raise ValueError( f"""To use batch_to_device, all elements in the batch must be a dict or Tensor. Got key "{k}" with value of type {type(v)}""" )
class DeviceSupport(Enum): """ This is a simple enum for compute devices, This currently only supports CPU, CUDA, NPU, and XPU. The following enumeration defines various device configurations with attributes: 1. `device_type` (str): The type of device (e.g., "cpu", "cuda", "npu", "xpu"). 2. `device_name` (str): A user-friendly name for the device (e.g., "CPU", "GPU", "NPU", "XPU"). 3. `communication_backend` (str): Specifies the backend used for communication on this device (e.g., "gloo", "nccl", "hccl", "ccl"). """ CPU = ("cpu", "CPU", "gloo") CUDA = ("cuda", "GPU", "nccl") NPU = ("npu", "NPU", "hccl") XPU = ("xpu", "XPU", "ccl") def __init__( self, device_type: str, device_name: str, communication_backend: str, ): self.device_type = device_type self.device_name = device_name self.communication_backend = communication_backend @staticmethod def from_type(device_type: str): for member in DeviceSupport: if member.device_type == device_type: return member raise ValueError(f"Unknown device type: {device_type}.") def get_device_support() -> DeviceSupport: """function that gets the DeviceSupport with compute devices based on the current machine. This currently only supports CPU, CUDA, NPU, XPU. Returns: device_support: DeviceSupport """ device_type = _get_device_type_from_env() return DeviceSupport.from_type(device_type) def get_torch_device_namespace() -> any: """Return the corresponding torch attribute based on the device type string. Returns: module: The corresponding torch device namespace, or torch.cuda if not found. """ device_type = get_device_support().device_type try: return getattr(torch, device_type) except AttributeError: logger.warning( f"Device namespace '{device_type}' not found in torch, try to load torch.cuda." ) return torch.cuda

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources