SlotTransport¶
- class torchrl.modules.inference_server.SlotTransport(num_slots: int, *, preallocate: bool = False)[source]¶
Lock-free, in-process transport using per-env slots.
Each actor thread owns a dedicated slot. Submitting an observation writes to the slot without any lock (each slot is accessed by exactly one writer thread). The server sweeps slots to find ready ones, collects observations, runs the model, and writes actions back via per-slot events.
This eliminates:
The shared
threading.LockthatThreadingTransportuses for everysubmit()anddrain().concurrent.futures.Futureallocations (one per inference request).
The trade-off is that the number of slots is fixed at construction time (equal to the number of environments).
- Parameters:
num_slots (int) – number of slots (one per environment / actor thread).
- Keyword Arguments:
preallocate (bool, optional) – if
True, a contiguous observation buffer of shape[num_slots, ...]is allocated on the first submit. Subsequent submits copy into the buffer in-place (update_). Defaults toFalsebecause the extra copy into the buffer is not currently compensated by the batching path (lazy_stackstill callstorch.stack).
Note
This transport is only suitable for in-process threading scenarios (the default for
AsyncBatchedCollectorwithpolicy_backend="threading").- drain(max_items: int) tuple[list[TensorDictBase], list[int]][source]¶
Sweep slots and return (observations, slot_ids) for ready ones.
- resolve(callback: int, result: TensorDictBase) None[source]¶
Write the action into the slot and wake the waiting env thread.
- resolve_exception(callback: int, exc: BaseException) None[source]¶
Propagate an exception to the waiting env thread.
- submit(td: TensorDictBase)[source]¶
Not supported – use
client()to get a slot-bound callable.