Source code for ignite.handlers.ema_handler
import warnings
from copy import deepcopy
from typing import Optional, Union
import torch.nn as nn
from ignite.engine import CallableEventWithFilter, Engine, Events, EventsList
from ignite.handlers.param_scheduler import BaseParamScheduler
from ignite.handlers.state_param_scheduler import LambdaStateScheduler
__all__ = ["EMAHandler"]
class EMAWarmUp:
    def __init__(self, momentum_warmup: float, warmup_iters: int, momentum: float) -> None:
        self.momentum_warmup = momentum_warmup
        self.warmup_iters = warmup_iters
        self.momentum = momentum
    def __call__(self, event_index: int) -> float:
        denominator = max(1, self.warmup_iters - 1)
        curr_momentum = self.momentum_warmup + (self.momentum - self.momentum_warmup) * (event_index - 1) / denominator
        if self.momentum >= self.momentum_warmup:
            return min(self.momentum, curr_momentum)
        else:
            return max(self.momentum, curr_momentum)
[docs]class EMAHandler:
    r"""Exponential moving average (EMA) handler can be used to compute a smoothed version of model.
    The EMA model is updated as follows:
    .. math:: \theta_{\text{EMA}, t+1} = (1 - \lambda) \cdot \theta_{\text{EMA}, t} + \lambda \cdot \theta_{t}
    where :math:`\theta_{\text{EMA}, t}` and :math:`\theta_{t}` are the EMA weights and online model weights at
    :math:`t`-th iteration, respectively; :math:`\lambda` is the update momentum. Current momentum can be retrieved
    from ``Engine.state.ema_momentum``.
    Args:
          model: the online model for which an EMA model will be computed. If ``model`` is ``DataParallel`` or
              ``DistributedDataParallel``, the EMA smoothing will be applied to ``model.module`` .
          momentum: the update momentum after warmup phase, should be float in range :math:`\left(0, 1 \right)`.
          momentum_warmup: the initial update momentum during warmup phase.
          warmup_iters: iterations of warmup.
    Attributes:
          ema_model: the exponential moving averaged model.
          model: the online model that is tracked by EMAHandler. It is ``model.module`` if ``model`` in
              the initialization method is an instance of ``DistributedDataParallel``.
          momentum: the update momentum.
    Note:
          The EMA model is already in ``eval`` mode. If model in the arguments is an ``nn.Module`` or
          ``DistributedDataParallel``, the EMA model is an ``nn.Module`` and it is on the same device as the online
          model. If the model is an ``nn.DataParallel``, then the EMA model is an ``nn.DataParallel``.
    Note:
          It is recommended to initialize and use an EMA handler in following steps:
          1. Initialize ``model`` (``nn.Module`` or ``DistributedDataParallel``) and ``ema_handler`` (``EMAHandler``).
          2. Build ``trainer`` (``ignite.engine.Engine``).
          3. Resume from checkpoint for ``model`` and ``ema_handler.ema_model``.
          4. Attach ``ema_handler`` to ``trainer``.
    Examples:
          .. code-block:: python
              device = torch.device("cuda:0")
              model = nn.Linear(2, 1).to(device)
              # update the ema every 5 iterations
              ema_handler = EMAHandler(model, momentum=0.0002)
              # get the ema model, which is an instance of nn.Module
              ema_model = ema_handler.ema_model
              trainer = Engine(train_step_fn)
              to_load = {"model": model, "ema_model", ema_model, "trainer", trainer}
              if resume_from is not None:
                  Checkpoint.load_objects(to_load, checkpoint=resume_from)
              # update the EMA model every 5 iterations
              ema_handler.attach(trainer, name="ema_momentum", event=Events.ITERATION_COMPLETED(every=5))
              # add other handlers
              to_save = to_load
              ckpt_handler = Checkpoint(to_save, DiskSaver(...), ...)
              trainer.add_event_handler(Events.EPOCH_COMPLETED, ckpt_handler)
              # current momentum can be retrieved from engine.state,
              # the attribute name is the `name` parameter used in the attach function
              @trainer.on(Events.ITERATION_COMPLETED):
              def print_ema_momentum(engine):
                  print(f"current momentum: {engine.state.ema_momentum}"
              # use ema model for validation
              val_step_fn = get_val_step_fn(ema_model)
              evaluator = Engine(val_step_fn)
              @trainer.on(Events.EPOCH_COMPLETED)
              def run_validation(engine):
                  engine.run(val_data_loader)
              trainer.run(...)
          The following example shows how to perform warm-up to the EMA momentum:
          .. code-block:: python
              device = torch.device("cuda:0")
              model = nn.Linear(2, 1).to(device)
              # linearly change the EMA momentum from 0.2 to 0.002 in the first 100 iterations,
              # then keep a constant EMA momentum of 0.002 afterwards
              ema_handler = EMAHandler(model, momentum=0.002, momentum_warmup=0.2, warmup_iters=100)
              engine = Engine(step_fn)
              ema_handler.attach(engine, name="ema_momentum")
              engine.run(...)
          The following example shows how to attach two handlers to the same trainer:
          .. code-block:: python
              generator = build_generator(...)
              discriminator = build_discriminator(...)
              gen_handler = EMAHandler(generator)
              disc_handler = EMAHandler(discriminator)
              step_fn = get_step_fn(...)
              engine = Engine(step_fn)
              # update EMA model of generator every 1 iteration
              gen_handler.attach(engine, "gen_ema_momentum", event=Events.ITERATION_COMPLETED)
              # update EMA model of discriminator every 2 iteration
              disc_handler.attach(engine, "dis_ema_momentum", event=Events.ITERATION_COMPLETED(every=2))
              @engine.on(Events.ITERATION_COMPLETED)
              def print_ema_momentum(engine):
                  print(f"current momentum for generator: {engine.state.gen_ema_momentum}")
                  print(f"current momentum for discriminator: {engine.state.disc_ema_momentum}")
              engine.run(...)
    .. versionadded:: 0.4.6
    """
    def __init__(
        self,
        model: nn.Module,
        momentum: float = 0.0002,
        momentum_warmup: Optional[float] = None,
        warmup_iters: Optional[int] = None,
    ) -> None:
        if not 0 < momentum < 1:
            raise ValueError(f"Invalid momentum: {momentum}")
        self.momentum = momentum
        self._momentum_lambda_obj: Optional[EMAWarmUp] = None
        if momentum_warmup is not None and warmup_iters is not None:
            self.momentum_scheduler: Optional[BaseParamScheduler] = None
            self._momentum_lambda_obj = EMAWarmUp(momentum_warmup, warmup_iters, momentum)
        if not isinstance(model, nn.Module):
            raise ValueError(
                f"model should be an instance of nn.Module or its subclasses, but got"
                f"model: {model.__class__.__name__}"
            )
        if isinstance(model, nn.parallel.DistributedDataParallel):
            model = model.module
        self.model = model
        self.ema_model = deepcopy(self.model)
        for param in self.ema_model.parameters():
            param.detach_()
        self.ema_model.eval()
    def _update_ema_model(self, engine: Engine, name: str) -> None:
        """Update weights of ema model"""
        momentum = getattr(engine.state, name)
        for ema_p, model_p in zip(self.ema_model.parameters(), self.model.parameters()):
            ema_p.mul_(1.0 - momentum).add_(model_p.data, alpha=momentum)
        # assign the buffers
        for ema_b, model_b in zip(self.ema_model.buffers(), self.model.buffers()):
            ema_b.data = model_b.data
[docs]    def attach(
        self,
        engine: Engine,
        name: str = "ema_momentum",
        warn_if_exists: bool = True,
        event: Union[str, Events, CallableEventWithFilter, EventsList] = Events.ITERATION_COMPLETED,
    ) -> None:
        """Attach the handler to engine. After the handler is attached, the ``Engine.state`` will add an new attribute
        with name ``name`` if the attribute does not exist. Then, the current momentum can be retrieved from
        ``Engine.state`` when the engine runs.
        Note:
            There are two cases where a momentum with name ``name`` already exists: 1. the engine has loaded its
            state dict after resuming. In this case, there is no need to initialize the momentum again, and users
            can set ``warn_if_exists`` to False to suppress the warning message; 2. another handler has created
            a state attribute with the same name. In this case, users should choose another name for the ema momentum.
        Args:
            engine: trainer to which the handler will be attached.
            name: attribute name for retrieving EMA momentum from ``Engine.state``. It should be a unique name since a
                trainer can have multiple EMA handlers.
            warn_if_exists: if True, a warning will be thrown if the momentum with name ``name`` already exists.
            event: event when the EMA momentum and EMA model are updated.
        """
        if hasattr(engine.state, name):
            if warn_if_exists:
                warnings.warn(
                    f"Attribute '{name}' already exists in Engine.state. It might because 1. the engine has loaded its "
                    f"state dict or 2. {name} is already created by other handlers. Turn off this warning by setting"
                    f"warn_if_exists to False.",
                    category=UserWarning,
                )
        else:
            setattr(engine.state, name, self.momentum)
        if self._momentum_lambda_obj is not None:
            self.momentum_scheduler = LambdaStateScheduler(self._momentum_lambda_obj, param_name="ema_momentum")
            # first update the momentum and then update the EMA model
            self.momentum_scheduler.attach(engine, event)
        engine.add_event_handler(event, self._update_ema_model, name)