Shortcuts

VLLMDoubleBufferSyncScheme

class torchrl.weight_update.llm.VLLMDoubleBufferSyncScheme(remote_addr: str, local_addr: str | None = None, num_threads: int = 1, strategy: Literal['tensordict', 'state_dict'] = 'tensordict')[source]

Weight synchronization scheme for vLLM using double-buffered storage.

This scheme uses memory-mapped TensorDict storage to transfer weights from a trainer to vLLM inference workers. It’s simpler than NCCL-based approaches and doesn’t require process group coordination.

Parameters:
  • remote_addr – Directory path where sender writes weights.

  • local_addr – Directory path where receiver reads weights. If None, uses same path as remote_addr (for local testing).

  • num_threads – Number of threads for memmap operations. Defaults to 1.

  • strategy – Weight extraction strategy (“tensordict” or “state_dict”).

Example

>>> # Local testing (same machine)
>>> scheme = VLLMDoubleBufferSyncScheme(
...     remote_addr="/tmp/weights",
...     strategy="tensordict"
... )
>>>
>>> # Distributed setup (different machines)
>>> # On trainer node:
>>> scheme = VLLMDoubleBufferSyncScheme(
...     remote_addr="/mnt/shared/weights",  # NFS mount
...     num_threads=4
... )
>>>
>>> # On vLLM worker node:
>>> scheme = VLLMDoubleBufferSyncScheme(
...     remote_addr="/mnt/shared/weights",  # Same NFS mount
...     num_threads=4
... )
apply_weights(weights: TensorDictBase, inplace: bool = True) None

Apply weights to the model.

Parameters:
  • weights – The weights to apply.

  • inplace – Whether to apply weights in place. Default is True.

connect(*, worker_idx: int | None = None, weights: Any | None = None) None

Method to be called once the workers have started.

Triggers a rendez-vous for the workers to receive their copy of the weights.

Dispatches to _setup_connection_and_weights_on_sender_impl() or _setup_connection_and_weights_on_receiver_impl() based on which initialization was performed.

property context: Any | None

Get the context object (e.g., collector), if available.

Returns:

The context object if available, None otherwise.

create_receiver(vllm_engine) VLLMDoubleBufferWeightReceiver[source]

Create a weight receiver for a vLLM worker process.

Parameters:

vllm_engine – The vLLM engine instance (must have .llm_engine.model_executor attribute).

create_sender() VLLMDoubleBufferWeightSender[source]

Create a weight sender for the trainer process.

create_transport(**kwargs) VLLMDoubleBufferTransport[source]

Create transport for double-buffered storage.

Parameters:

**kwargs – Not used for file-based transport (kept for API compatibility).

Returns:

A VLLMDoubleBufferTransport instance.

init_on_receiver(*, model_id: str, context: Any = None, **kwargs) None

Initialize on worker process (receiver side).

This method is called once in each worker’s initialization.

Parameters:
  • model_id – Identifier for the model being synchronized

  • context – Optional context object (e.g., inner collector)

  • **kwargs – Alternative to context (model, etc.)

init_on_sender(*args, **kwargs) None

Initialize on the main process (sender side).

This method is called once in the collector’s _run_processes() method, after workers have been started and are ready to receive messages.

property model: Any | None

Get the model object, if available.

Returns:

The model object if available, None otherwise.

property model_id: str | None

Get the model ID for this scheme.

Returns:

The model ID if set, None otherwise.

prepare_weights(weights: Any, model_id: str, strategy: WeightStrategy, context: Any = None) Any

Prepare weights for sending.

This method handles weight extraction, conversion, and any scheme-specific preparation (e.g., cache lookups for SharedMemWeightSyncScheme).

Parameters:
  • weights – Raw weights input (can be None, nn.Module, TensorDict, dict, str reference, etc.)

  • model_id – The model identifier (e.g., “policy”)

  • strategy – WeightStrategy for extracting/converting weights

  • context – Optional context (e.g., collector) for model resolution

Returns:

Prepared weights ready to send via transport

receive(timeout: float | None = None) tensordict.base.TensorDictBase | None

Check for and apply new weights (non-blocking).

This method is called in the worker’s main loop to check if new weights have been sent. If weights are available, they are applied to the registered model immediately, and the update is cascaded to any sub-collectors via context.update_policy_weights_().

Parameters:

timeout – Maximum time to wait for weights (seconds). None means no timeout (blocking). Some transports may not support timeout and will raise ValueError if specified.

Returns:

The received weights if available, None otherwise.

Note: For SharedMemWeightSyncScheme, this always returns None since workers automatically see updates via shared memory.

property receiver_transport: torchrl.weight_update.weight_sync_schemes.TransportBackend | None

Get the receiver transport.

Returns:

The receiver transport.

send(weights: Any = None, worker_ids: int | list[int] | None = None) None

Send weights synchronously to workers.

This method: 1. Prepares weights (extracts from model if weights=None) 2. Sends to specified workers (or all if worker_ids=None) 3. Waits for acknowledgments from those workers 4. Returns when workers have applied the weights

Parameters:
  • weights – Weights to send. Can be: - None: Extract from model via context.get_model(model_id) - nn.Module: Extract weights from module - TensorDict: Use directly - dict: Convert to TensorDict

  • worker_ids – Which workers to send to: - None: Send to all workers (default) - int: Send to single worker - list[int]: Send to specific workers

Note: This is a blocking call that ensures specified workers are updated before returning.

property sender_transports: dict[int, torchrl.weight_update.weight_sync_schemes.TransportBackend]

Get the sender transports.

Returns:

The sender transports.

property shared_transport: torchrl.weight_update.weight_sync_schemes.TransportBackend | None

Get the shared transport.

Returns:

The shared transport.

shutdown() None

Shutdown the scheme and release resources.

This method stops any background threads and cleans up connections. It is safe to call multiple times. Subclasses should override this method to add custom cleanup logic, but should call super().shutdown() to ensure base cleanup is performed.

property weights: Any | None

Get the current weights, if available.

Returns:

The weights as TensorDict if available, None otherwise.

property worker_idx: int | None

Get the worker index for this scheme.

Returns:

The worker index if set, None otherwise.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources