Single Node Collectors¶
TorchRL provides several collector classes for single-node data collection, each with different execution strategies.
Single node data collectors¶
Base class for data collectors. |
|
|
Generic data collector for RL problems. |
|
Runs a single DataCollector on a separate process. |
|
Runs a given number of DataCollectors on separate processes. |
|
Runs a given number of DataCollectors on separate processes synchronously. |
|
Runs a given number of DataCollectors on separate processes asynchronously. |
Note
The following legacy names are also available for backward compatibility:
DataCollectorBase→BaseCollectorSyncDataCollector→CollectoraSyncDataCollector→AsyncCollector_MultiDataCollector→MultiCollectorMultiSyncDataCollector→MultiSyncCollectorMultiaSyncDataCollector→MultiAsyncCollector
Using MultiCollector¶
The MultiCollector class is the recommended way to run parallel data collection.
It uses a sync parameter to dispatch to either MultiSyncCollector or MultiAsyncCollector:
from torchrl.collectors import MultiCollector
from torchrl.envs import GymEnv
def make_env():
return GymEnv("CartPole-v1")
# Synchronous multi-worker collection (recommended for on-policy algorithms)
sync_collector = MultiCollector(
create_env_fn=[make_env] * 4, # 4 parallel workers
policy=my_policy,
frames_per_batch=1000,
total_frames=100000,
sync=True, # ← All workers complete before delivering batch
)
# Asynchronous multi-worker collection (recommended for off-policy algorithms)
async_collector = MultiCollector(
create_env_fn=[make_env] * 4,
policy=my_policy,
frames_per_batch=1000,
total_frames=100000,
sync=False, # ← First-come-first-serve delivery
)
# Iterate over collected data
for data in sync_collector:
# Train on data...
pass
sync_collector.shutdown()
Comparison:
Feature |
|
|
|---|---|---|
Batch delivery |
All workers complete first |
First available worker |
Policy consistency |
All data from same policy version |
Data may be from older policy |
Best for |
On-policy (PPO, A2C) |
Off-policy (SAC, DQN) |
Throughput |
Limited by slowest worker |
Higher throughput |
Running the Collector Asynchronously¶
Passing replay buffers to a collector allows us to start the collection and get rid of the iterative nature of the
collector.
If you want to run a data collector in the background, simply run start():
>>> collector = Collector(..., replay_buffer=rb) # pass your replay buffer
>>> collector.start()
>>> # little pause
>>> time.sleep(10)
>>> # Start training
>>> for i in range(optim_steps):
... data = rb.sample() # Sampling from the replay buffer
... # rest of the training loop
Single-process collectors (Collector) will run the process using multithreading,
so be mindful of Python’s GIL and related multithreading restrictions.
Multiprocessed collectors will on the other hand let the child processes handle the filling of the buffer on their own, which truly decouples the data collection and training.
Data collectors that have been started with start() should be shut down using
async_shutdown().
Warning
Running a collector asynchronously decouples the collection from training, which means that the training performance may be drastically different depending on the hardware, load and other factors (although it is generally expected to provide significant speed-ups). Make sure you understand how this may affect your algorithm and if it is a legitimate thing to do! (For example, on-policy algorithms such as PPO should not be run asynchronously unless properly benchmarked).