Rate this Page

MuJoCo scripted manipulation with human-readable robot actions#

The first step in controlling a complex robot is rarely an end-to-end RL policy. Closed-loop controllers, scripted motion primitives, and imitation data are often used first to make the task reliable enough for later reward-driven fine tuning. This tutorial uses a MuJoCo Menagerie UR5e arm with a Robotiq 2F-85 gripper and shows how to ask the robot to do human-readable things such as “reach this pose”, “close the gripper”, or “go home” while TorchRL handles the low-level action plumbing.

We will control a cube-to-bowl task without learning. A policy emits a single RobotMacroAction object under the normal "action" key, and a TorchRL transform expands and executes that high-level command through MuJoCo. The same pattern can later provide demonstrations, curricula, or a safe initialization for residual RL.

What you will learn#

  • how to instantiate a Menagerie-backed custom MuJoCo environment;

  • how to use RobotMacroAction as a readable action object;

  • how URScriptPrimitiveTransform lets env.step(td) consume those actions directly;

  • how to write a scripted contact-rich cube-to-bowl policy as an explicit list of poses and gripper commands;

  • how to request rendered pixels from the environment and record them with VideoRecorder;

  • how to reset and reuse one environment across scripted scenes.

from __future__ import annotations

import importlib.util
import os

from collections.abc import Iterator

import torch
from tensordict import TensorDict, TensorDictBase
from torchrl.envs import CubeBowlEnv, RobotMacroAction, RobotMacroActionMode, step_mdp
from torchrl.record import VideoRecorder

_has_mujoco = importlib.util.find_spec("mujoco") is not None

if not _has_mujoco:
    raise ImportError("This tutorial requires the `mujoco` Python package.")

Environment#

The scene uses the MuJoCo Menagerie UR5e and Robotiq 2F-85 models. Set TORCHRL_MUJOCO_MENAGERIE_PATH to a checkout containing the universal_robots_ur5e and robotiq_2f85 assets:

git clone --depth=1 --filter=blob:none --sparse \
    https://github.com/google-deepmind/mujoco_menagerie.git /tmp/menagerie
git -C /tmp/menagerie sparse-checkout set \
    universal_robots_ur5e robotiq_2f85
export TORCHRL_MUJOCO_MENAGERIE_PATH=/tmp/menagerie

CubeBowlEnv exposes the state needed for scripted manipulation: robot joints, gripper joints, the pinch site, the cube pose, the bowl target and a success flag. The task reward is intentionally sparse. It is 1 only when the cube center is within the environment’s placement tolerance of the bowl target coordinate, and 0 otherwise.

MENAGERIE_PATH = os.environ.get(CubeBowlEnv.MENAGERIE_ENV_VAR)
if MENAGERIE_PATH is None:
    raise RuntimeError(
        f"Set {CubeBowlEnv.MENAGERIE_ENV_VAR} to a MuJoCo Menagerie checkout "
        "before running this tutorial."
    )

MAX_EPISODE_STEPS = 12000
RENDER_WIDTH = 480
RENDER_HEIGHT = 360
VIDEO_FRAME_SKIP = 16
VIDEO_INTERVAL_MS = 55
IK_KWARGS = {
    "iterations": 220,
    "orientation_weight": 1.0,
    "step_size": 0.7,
    "damping": 1e-4,
}

RobotMacroAction as the action#

A readable manipulation policy should not have to emit magic flat tensors or loose collections of root TensorDict keys. It can emit one structured object under "action". RobotMacroAction is that object:

  • RobotMacroAction.reach_pose(position=..., quaternion=...) moves the arm;

  • RobotMacroAction.close_gripper() closes only the gripper and keeps the arm where it is;

  • RobotMacroAction.reach_pose(..., gripper="closed") moves the arm and closes the gripper in one macro action;

  • RobotMacroAction.RESET asks the environment to return to its configured home joint posture.

The symbolic gripper commands "open", "closed" and "keep" are resolved by the transform. User code never has to know the Robotiq actuator’s numeric command range. When we want a task-specific grasp, we can still make the target explicit in object units: close about one millimeter past the cube width, ask the environment to convert that width to a gripper command, and pass that command to the RobotMacroAction.

Create the MuJoCo scene#

We start by constructing the cube-and-bowl task once. from_pixels=True asks the environment to include rendered frames in the TensorDict, which a standard VideoRecorder can capture. We then append one primitive-control transform with execute=True. After that, env.step accepts a RobotMacroAction directly under td["action"] and executes the expanded low-level sequence internally.

env = CubeBowlEnv(
    seed=0,
    max_episode_steps=MAX_EPISODE_STEPS,
    menagerie_path=MENAGERIE_PATH,
    from_pixels=True,
    pixels_only=False,
    render_width=RENDER_WIDTH,
    render_height=RENDER_HEIGHT,
)
assert env.action_spec.shape[-1] == 7
assert env.robot_home_qpos is not None

primitive_control = env.make_urscript_transform(
    execute=True,
    macro_steps=250,
    settle_steps=1600,
    ik_kwargs=IK_KWARGS,
    stack_rewards=True,
    stack_observations=False,
)
recorder = VideoRecorder(
    logger=None,
    tag="scripted_macro",
    skip=VIDEO_FRAME_SKIP,
    make_grid=False,
)
env = env.append_transform(recorder)
env = env.append_transform(primitive_control)

cube_width = 2 * env.OBJECT_HALF_SIZE
grasp_width = cube_width - 0.001
gripper_close_command = env.gripper_ctrl_for_width(grasp_width)

workspace_layouts = torch.tensor(
    [
        # Cube in front of the robot, bowl behind it and slightly to the right.
        [[0.44, -0.21], [0.48, 0.10]],
        # Cube on the robot's right-front side, bowl on the left-back side.
        [[0.51, -0.18], [0.39, 0.04]],
        # Cube in front of the robot, bowl on the left-back side.
        [[0.45, -0.20], [0.38, 0.02]],
        # Cube on the robot's right-front side, bowl near the center-back.
        [[0.52, -0.18], [0.45, 0.02]],
    ],
)


def random_scene_reset(rollout_index: int) -> TensorDict:
    layout = workspace_layouts[rollout_index % len(workspace_layouts)]
    cube_xy = layout[0].view(1, 2)
    bowl_xy = layout[1].view(1, 2)
    cube_pos = torch.cat(
        [cube_xy, torch.full_like(cube_xy[..., :1], env.cube_position[2])],
        dim=-1,
    )
    bowl_target_z = env.bowl_position[2] + env.MENAGERIE_BOWL_TARGET_OFFSET[2]
    bowl_pos = torch.cat(
        [bowl_xy, torch.full_like(bowl_xy[..., :1], bowl_target_z)], -1
    )
    return TensorDict(
        {
            "cube_pos": cube_pos.to(dtype=env.dtype, device=env.device),
            "bowl_pos": bowl_pos.to(dtype=env.dtype, device=env.device),
        },
        batch_size=[1],
        device=env.device,
    )


td = env.reset()
assert td["robot_qpos"].shape[-1] == 6
assert env.low_level_action(td["robot_qpos"]).shape[-1] == 7
assert td["pixels"].shape[-3:] == torch.Size([RENDER_HEIGHT, RENDER_WIDTH, 3])

Policy shape#

A scripted policy can be just a generator of explicit robot commands. This is the minimal shape we want users to see: build a list of poses and gripper commands, place the next RobotMacroAction under the normal action key, and call env.step.

def gen_small_actions(td: TensorDictBase) -> Iterator[RobotMacroAction]:
    joints = td["robot_qpos"].clone()
    joints[..., 0] = joints[..., 0] + 0.35
    yield RobotMacroAction.reach_joints(joints=joints, gripper="open", steps=18)
    yield RobotMacroAction.close_gripper(steps=18)


small_actions = gen_small_actions(td)
small_action = next(small_actions)
td["action"] = small_action
assert td["action"].mode.shape == torch.Size([1, 1])

A scripted cube-to-bowl macro#

We can now write the cube-to-bowl controller as a readable sequence of manual robot commands. The generator below is deliberately explicit: it names the poses we want the pinch site to reach and interleaves them with gripper commands.

home_qpos = torch.as_tensor(
    env.robot_home_qpos,
    dtype=td["robot_qpos"].dtype,
    device=td["robot_qpos"].device,
).view(1, 6)
policy_state: dict[str, TensorDictBase | torch.Tensor] = {
    "td": td,
    "gripper_quat": td["pinch_quat"].clone(),
}


def pose_at(
    xyz: torch.Tensor, quat: torch.Tensor | None = None
) -> tuple[torch.Tensor, torch.Tensor]:
    if quat is None:
        quat = torch.zeros(xyz.shape[:-1] + (4,), dtype=xyz.dtype, device=xyz.device)
        quat[..., 0] = 1.0
    else:
        quat = quat.to(dtype=xyz.dtype, device=xyz.device).expand(xyz.shape[:-1] + (4,))
    return xyz, quat


def carry_action(alpha: float, bowl: torch.Tensor) -> RobotMacroAction:
    current_td = policy_state["td"]
    gripper_quat = policy_state["gripper_quat"]
    cube = current_td["cube_pos"].clone()
    carry_start_cube = policy_state["carry_start_cube"]
    pinch_to_cube = current_td["pinch_pos"].clone() - cube
    desired_xy = carry_start_cube[..., :2] + alpha * (
        bowl[..., :2] - carry_start_cube[..., :2]
    )
    desired_cube = torch.cat(
        [
            desired_xy,
            torch.full_like(bowl[..., 2:3], 0.24),
        ],
        dim=-1,
    )
    position, quaternion = pose_at(desired_cube + pinch_to_cube, gripper_quat)
    return RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="closed",
        gripper_command=gripper_close_command,
        steps=80,
        settle_steps=20,
    )


def gen_actions(td: TensorDictBase) -> Iterator:
    cube = td["cube_pos"].clone()
    bowl = td["bowl_pos"].clone()
    quat = td["pinch_quat"].clone()
    grasp_offset = cube.new_tensor([[0.0, 0.0, -0.016]])

    # Action 0: Keep the arm at the reset joint target and let the scene settle.
    yield RobotMacroAction.wait(gripper="open", steps=1, settle_steps=19)

    # Action 1: Fully open the gripper before approaching the cube.
    yield RobotMacroAction.open_gripper(steps=100, settle_steps=20)

    current_td = policy_state["td"]
    cube = current_td["cube_pos"].clone()
    bowl = current_td["bowl_pos"].clone()
    quat = current_td["pinch_quat"].clone()
    position, quaternion = pose_at(cube + cube.new_tensor([[0.0, 0.0, 0.18]]), quat)
    # Action 2: Move the open gripper above the cube.
    yield RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="open",
        steps=180,
        settle_steps=60,
    )

    position, quaternion = pose_at(cube + grasp_offset, quat)
    # Action 3: Lower the open gripper to the grasp pose around the cube.
    yield RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="open",
        steps=180,
        settle_steps=60,
    )

    # Action 4: Close the gripper to a grasp about one millimeter tighter than
    # the cube width.
    yield RobotMacroAction.close_gripper(
        command=gripper_close_command, steps=160, settle_steps=80
    )

    current_td = policy_state["td"]
    cube = current_td["cube_pos"].clone()
    pinch_to_cube = current_td["pinch_pos"].clone() - cube
    position, quaternion = pose_at(
        cube + pinch_to_cube + cube.new_tensor([[0.0, 0.0, 0.20]]),
        quat,
    )
    # Action 5: Lift the grasped cube above the table.
    yield RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="closed",
        gripper_command=gripper_close_command,
        steps=120,
        settle_steps=60,
    )

    # The cube is not welded to the gripper. It is held by contact and
    # friction between the pads. If we ask for one large move directly to the
    # bowl, the fingers can drag faster than the cube can safely follow: the
    # cube may slide inside the grasp, get pushed out, or arrive shifted enough
    # that the final drop misses the bowl. We therefore carry it through a few
    # intermediate waypoints in the table plane. After each waypoint, the next
    # command uses the latest observed cube and finger positions.
    policy_state["carry_start_cube"] = policy_state["td"]["cube_pos"].clone()

    # Action 6: Carry the cube one quarter of the way toward the bowl.
    yield carry_action(0.25, bowl)

    # Action 7: Carry the cube halfway toward the bowl.
    yield carry_action(0.50, bowl)

    # Action 8: Carry the cube three quarters of the way toward the bowl.
    yield carry_action(0.75, bowl)

    # Action 9: Carry the cube above the bowl center.
    yield carry_action(1.00, bowl)

    drop_cube = torch.cat(
        [
            bowl[..., :1],
            bowl[..., 1:2],
            torch.full_like(bowl[..., 2:3], 0.13),
        ],
        dim=-1,
    )
    current_td = policy_state["td"]
    cube = current_td["cube_pos"].clone()
    pinch_to_cube = current_td["pinch_pos"].clone() - cube
    position, quaternion = pose_at(drop_cube + pinch_to_cube, quat)
    # Action 10: Lower the grasped cube into the bowl.
    yield RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="closed",
        gripper_command=gripper_close_command,
        steps=100,
        settle_steps=40,
    )

    # Action 11: Open the gripper to release the cube.
    yield RobotMacroAction.open_gripper(steps=100, settle_steps=20)

    # Action 12: Wait with the gripper open so the cube can settle in the bowl.
    yield RobotMacroAction.wait(gripper="open", steps=240)

    current_td = policy_state["td"]
    retreat_xyz = torch.cat(
        [
            current_td["pinch_pos"][..., :2],
            torch.full_like(current_td["pinch_pos"][..., 2:3], 0.26),
        ],
        dim=-1,
    )
    position, quaternion = pose_at(retreat_xyz, quat)
    # Action 13: Retreat upward and away from the released cube.
    yield RobotMacroAction.reach_pose(
        position=position,
        quaternion=quaternion,
        gripper="open",
        steps=120,
        settle_steps=60,
    )

    # Action 14: Return the arm to the environment's reset joint configuration.
    yield RobotMacroAction.RESET

    # Action 15: Hold the reset pose while the final reward is measured.
    yield RobotMacroAction.wait(gripper="open", steps=800)


def action_mode(action) -> int | None:
    if not hasattr(action, "mode"):
        return None
    return int(action.mode.reshape(-1)[0].item())


def action_gripper(action) -> int:
    if not hasattr(action, "gripper"):
        return RobotMacroAction.GRIPPER_OPEN
    return int(action.gripper.reshape(-1)[0].item())


def run_scripted_rollout(td: TensorDictBase) -> TensorDictBase:
    policy_state["td"] = td
    policy_state["gripper_quat"] = td["pinch_quat"].clone()
    policy_state.pop("carry_start_cube", None)
    grasp_distance = torch.full_like(td["cube_pos"][..., :1], float("inf"))
    cube_motion_while_closed = torch.zeros_like(grasp_distance)
    cube_lift_while_closed = torch.zeros_like(grasp_distance)
    max_reward = torch.zeros_like(grasp_distance)
    last_reward = torch.zeros_like(grasp_distance)
    closed_reference_cube: torch.Tensor | None = None

    script_actions = gen_actions(td)
    while True:
        try:
            td["action"] = next(script_actions)
        except StopIteration:
            break

        action = td["action"]
        transition = env.step(td)
        reward = transition["next", "reward"]
        max_reward = torch.maximum(max_reward, reward.amax(dim=-2))
        last_reward = reward[..., -1, :]
        td = step_mdp(transition)
        policy_state["td"] = td

        mode = action_mode(action)
        gripper = action_gripper(action)
        if mode == int(RobotMacroActionMode.CLOSE_GRIPPER):
            grasp_distance = torch.minimum(
                grasp_distance, env.gripper_cube_distance(td)
            )
            closed_reference_cube = td["cube_pos"].clone()
        if (
            closed_reference_cube is not None
            and gripper == RobotMacroAction.GRIPPER_CLOSED
        ):
            cube_motion = (td["cube_pos"] - closed_reference_cube).norm(
                dim=-1, keepdim=True
            )
            cube_lift = td["cube_pos"][..., 2:3] - closed_reference_cube[..., 2:3]
            cube_motion_while_closed = torch.maximum(
                cube_motion_while_closed, cube_motion
            )
            cube_lift_while_closed = torch.maximum(cube_lift_while_closed, cube_lift)

    robot_home_error = (td["robot_qpos"] - home_qpos).norm(dim=-1, keepdim=True)

    assert grasp_distance.item() <= 0.025, grasp_distance.item()
    assert cube_motion_while_closed.item() >= 0.05, cube_motion_while_closed.item()
    assert cube_lift_while_closed.item() >= 0.08, cube_lift_while_closed.item()
    assert robot_home_error.item() <= 0.04, robot_home_error.item()
    assert max_reward.item() == 1.0, (
        max_reward.item(),
        td["cube_pos"],
        td["bowl_pos"],
    )
    assert last_reward.item() == 1.0, last_reward.item()
    assert td["success"].all()
    return td

Fixed scene video#

First, render the default task once. This is the simple cube-to-bowl scene used above to introduce the scripted policy.

td = run_scripted_rollout(td)

scripted_macro_animation = recorder.to_animation(
    title="Scripted RobotMacroAction rollout",
    interval=VIDEO_INTERVAL_MS,
    clear=True,
)

Randomized workspace video#

Next, reset the same environment onto new reachable layouts sampled from a square in the table plane. For readability, each rollout starts from a different pair of workspace regions: front-to-back, right-front to left-back, front-to-left-back, and right-front to center-back. The VideoRecorder keeps appending frames until we ask it to render, so the final video is one long clip containing all four randomized rollouts.

rollout_count = len(workspace_layouts)
for rollout_index in range(rollout_count):
    td = env.reset(random_scene_reset(rollout_index))
    td = run_scripted_rollout(td)

Rendered randomized rollouts#

The second animation shows the same hand-written policy on the randomized scenes.

randomized_macro_animation = recorder.to_animation(
    title="Randomized RobotMacroAction rollouts",
    interval=VIDEO_INTERVAL_MS,
    clear=True,
)

Reset and reuse the same environment#

The same transformed environment can be reset and reused for another scripted rollout.

reset_td = env.reset()
assert reset_td["robot_qpos"].shape == td["robot_qpos"].shape

Conclusion and further reading#

Macro actions are useful when a policy should choose what to do at a slower time scale while a scripted controller handles short-horizon actuation. They are not a replacement for learning in difficult contact-rich settings: they still depend on calibration, solver quality and reachable waypoints. Their practical value is that they provide a strong baseline, a source of demonstrations, and a structured action space for residual policies or RL fine tuning.

See also

env.close()

Total running time of the script: (45 minutes 8.488 seconds)

Gallery generated by Sphinx-Gallery