diff --git a/docs/source/_static/atomic_actions/press.gif b/docs/source/_static/atomic_actions/press.gif new file mode 100644 index 00000000..1eafdc31 Binary files /dev/null and b/docs/source/_static/atomic_actions/press.gif differ diff --git a/docs/source/api_reference/embodichain/embodichain.lab.sim.atomic_actions.rst b/docs/source/api_reference/embodichain/embodichain.lab.sim.atomic_actions.rst index c2982d04..90077698 100644 --- a/docs/source/api_reference/embodichain/embodichain.lab.sim.atomic_actions.rst +++ b/docs/source/api_reference/embodichain/embodichain.lab.sim.atomic_actions.rst @@ -33,6 +33,8 @@ embodichain.lab.sim.atomic_actions MoveHeldObject PlaceCfg Place + PressCfg + Press AtomicActionEngine .. currentmodule:: embodichain.lab.sim.atomic_actions @@ -153,6 +155,15 @@ Actions :members: :show-inheritance: +.. autoclass:: PressCfg + :members: + :exclude-members: __init__, copy, replace, to_dict + :show-inheritance: + +.. autoclass:: Press + :members: + :show-inheritance: + Engine & Registry ----------------- diff --git a/docs/source/overview/sim/atomic_actions/builtin_actions.md b/docs/source/overview/sim/atomic_actions/builtin_actions.md index 4507d28b..818cd172 100644 --- a/docs/source/overview/sim/atomic_actions/builtin_actions.md +++ b/docs/source/overview/sim/atomic_actions/builtin_actions.md @@ -14,6 +14,7 @@ The following actions are available out of the box: | `PickUp` | Single | `GraspTarget` — object semantics | Approach → close gripper → lift | PickUp | | `MoveHeldObject` | Single | `HeldObjectPoseTarget` — held-object pose | Move held object while keeping gripper closed | MoveHeldObject | | `Place` | Single | `EndEffectorPoseTarget` — EEF release pose | Lower → open gripper → retract | Place | +| `Press` | Single | `EndEffectorPoseTarget` — EEF press pose | Close gripper → press down → return | Press | --- @@ -123,3 +124,26 @@ down to the target pose. On success, the returned `WorldState` clears `held_obje `(4, 4)` or `(n_envs, 4, 4)`. ![Place demo](../../../_static/atomic_actions/place.gif) + +--- + +## `Press` + +Three-phase contact motion: *close gripper → press down → return*. This is useful +for button-like or contact-based interactions where the end-effector should reach a +target pose and then return to the pre-press arm pose. + +`Press` does not create or clear `WorldState.held_object`; it preserves the state +threaded into it. + +| Config field | Default | Description | +|---|---|---| +| `hand_close_qpos` | `None` | **Required.** Gripper closed joint positions | +| `hand_control_part` | `"hand"` | Robot control part for the gripper | +| `hand_interp_steps` | `5` | Waypoints for the gripper close phase | +| `sample_interval` | `80` | Total waypoints across all three phases | + +**Target:** `EndEffectorPoseTarget(xpos=...)` — the EEF pose to press, a `torch.Tensor` +of shape `(4, 4)` or `(n_envs, 4, 4)`. + +![Press demo](../../../_static/atomic_actions/press.gif) diff --git a/docs/source/overview/sim/atomic_actions/index.md b/docs/source/overview/sim/atomic_actions/index.md index cee58179..c5c2e0ba 100644 --- a/docs/source/overview/sim/atomic_actions/index.md +++ b/docs/source/overview/sim/atomic_actions/index.md @@ -57,7 +57,7 @@ and every action declares the target type, or tuple of target types, it accepts | Target | Constructor | Used by | |---|---|---| -| `EndEffectorPoseTarget` | `EndEffectorPoseTarget(xpos)` | `MoveEndEffector`, `Place` | +| `EndEffectorPoseTarget` | `EndEffectorPoseTarget(xpos)` | `MoveEndEffector`, `Place`, `Press` | | `JointPositionTarget` | `JointPositionTarget(qpos)` | `MoveJoints` | | `NamedJointPositionTarget` | `NamedJointPositionTarget(name)` | `MoveJoints` | | `GraspTarget` | `GraspTarget(semantics)` | `PickUp` | @@ -94,7 +94,7 @@ action's `TargetType` before calling `execute`: | Target | Holds | Accepted by | |---|---|---| -| `EndEffectorPoseTarget(xpos)` | EEF pose tensor `(4,4)` or `(n_envs,4,4)` | `MoveEndEffector`, `Place` | +| `EndEffectorPoseTarget(xpos)` | EEF pose tensor `(4,4)` or `(n_envs,4,4)` | `MoveEndEffector`, `Place`, `Press` | | `JointPositionTarget(qpos)` | Control-part qpos tensor `(control_dof,)` or `(n_envs, control_dof)` | `MoveJoints` | | `NamedJointPositionTarget(name)` | Name resolved from `MoveJointsCfg.named_joint_positions` | `MoveJoints` | | `GraspTarget(semantics)` | `ObjectSemantics` (affordance + entity) | `PickUp` | @@ -110,6 +110,7 @@ action's `TargetType` before calling `execute`: | `Place` | Clears it to `None` | | `MoveEndEffector` | Leaves it unchanged | | `MoveJoints` | Leaves it unchanged | +| `Press` | Leaves it unchanged | If a step fails, `run()` returns `success=False` with the partial trajectory concatenated up to (but not including) the failed step, and the `WorldState` going into that step. @@ -136,6 +137,8 @@ from embodichain.lab.sim.atomic_actions import ( MoveHeldObjectCfg, Place, PlaceCfg, + Press, + PressCfg, MoveEndEffector, MoveEndEffectorCfg, ) @@ -159,6 +162,11 @@ move_held_object_cfg = MoveHeldObjectCfg( hand_close_qpos=torch.tensor([0.025, 0.025]), ) move_cfg = MoveEndEffectorCfg(control_part="arm") +press_cfg = PressCfg( + control_part="arm", + hand_control_part="hand", + hand_close_qpos=torch.tensor([0.025, 0.025]), +) move_joints_cfg = MoveJointsCfg( control_part="arm", named_joint_positions={"home": torch.zeros(6)}, @@ -170,6 +178,7 @@ engine.register(PickUp(motion_gen, cfg=pickup_cfg)) engine.register(MoveHeldObject(motion_gen, cfg=move_held_object_cfg)) engine.register(Place(motion_gen, cfg=place_cfg)) engine.register(MoveEndEffector(motion_gen, cfg=move_cfg)) +engine.register(Press(motion_gen, cfg=press_cfg)) engine.register(MoveJoints(motion_gen, cfg=move_joints_cfg)) # 3. Describe the object to pick @@ -297,6 +306,7 @@ builtin_actions - `scripts/tutorials/atomic_action/pickup.py` - `scripts/tutorials/atomic_action/move_held_object.py` - `scripts/tutorials/atomic_action/place.py` + - `scripts/tutorials/atomic_action/press.py` Run a demo in headless CPU mode with `--auto_play --headless --device cpu` to record an MP4 under `outputs/videos`. For example: diff --git a/docs/source/tutorial/atomic_actions.rst b/docs/source/tutorial/atomic_actions.rst index b65f5679..95df93bb 100644 --- a/docs/source/tutorial/atomic_actions.rst +++ b/docs/source/tutorial/atomic_actions.rst @@ -5,7 +5,7 @@ Atomic Actions EmbodiChain's **atomic action** layer provides a high-level, composable interface for common manipulation primitives such as *move end-effector*, *move joints*, *pick up*, -*move held object*, and *place*. Each action +*move held object*, *place*, and *press*. Each action encapsulates the full planning pipeline — grasp-pose estimation, IK, trajectory generation, and gripper interpolation — behind a single ``execute(target, state)`` call, making it straightforward to chain multiple actions together into complex robot behaviours. @@ -18,7 +18,7 @@ Key Features ``GraspTarget`` (wrapping an ``ObjectSemantics``), or ``HeldObjectPoseTarget``. The engine checks each step's target against the action's declared ``TargetType`` before running. - **Built-in primitives** — ``MoveEndEffector``, ``MoveJoints``, ``PickUp``, ``MoveHeldObject``, - and ``Place`` + ``Place``, and ``Press`` cover the most common tabletop manipulation workflows out of the box. See :doc:`/overview/sim/atomic_actions/index` for configs and target types. - **Extensible registry** — custom action *classes* can be registered globally with @@ -34,7 +34,7 @@ For the full design overview, architecture diagram, and extension guide see The Code -------- -Focused demo scripts are available for the five built-in primitives in the +Focused demo scripts are available for the built-in primitives in the ``scripts/tutorials/atomic_action`` directory: - ``move_end_effector.py`` @@ -42,6 +42,7 @@ Focused demo scripts are available for the five built-in primitives in the - ``pickup.py`` - ``move_held_object.py`` - ``place.py`` +- ``press.py`` Each script supports interactive inspection by default. Add ``--auto_play`` to skip keyboard prompts, and combine it with ``--headless --device cpu`` to record an MP4 under diff --git a/embodichain/lab/sim/atomic_actions/__init__.py b/embodichain/lab/sim/atomic_actions/__init__.py index bd96d7be..c2e7a2cc 100644 --- a/embodichain/lab/sim/atomic_actions/__init__.py +++ b/embodichain/lab/sim/atomic_actions/__init__.py @@ -18,7 +18,7 @@ This module provides a unified interface for the atomic motion primitives (``move_end_effector``, ``move_joints``, ``pick_up``, ``move_held_object``, -``place``), with typed targets, a ``WorldState`` threaded across sequenced +``place``, ``press``), with typed targets, a ``WorldState`` threaded across sequenced actions, and extensible custom action registration. """ @@ -47,11 +47,13 @@ MoveHeldObject, PickUp, Place, + Press, MoveEndEffectorCfg, MoveJointsCfg, MoveHeldObjectCfg, PickUpCfg, PlaceCfg, + PressCfg, ) from .engine import ( AtomicActionEngine, @@ -84,11 +86,13 @@ "MoveHeldObject", "PickUp", "Place", + "Press", "MoveEndEffectorCfg", "MoveJointsCfg", "MoveHeldObjectCfg", "PickUpCfg", "PlaceCfg", + "PressCfg", # Engine "AtomicActionEngine", "register_action", diff --git a/embodichain/lab/sim/atomic_actions/actions.py b/embodichain/lab/sim/atomic_actions/actions.py index dd7991d9..eb245264 100644 --- a/embodichain/lab/sim/atomic_actions/actions.py +++ b/embodichain/lab/sim/atomic_actions/actions.py @@ -16,10 +16,11 @@ """Concrete atomic actions built on :class:`AtomicAction` and :class:`TrajectoryBuilder`. -Five sibling actions live here: :class:`MoveEndEffector`, :class:`MoveJoints`, -:class:`PickUp`, :class:`MoveHeldObject`, and :class:`Place`. Each inherits -:class:`AtomicAction` directly and composes a :class:`TrajectoryBuilder` for -shared trajectory math. ``execute`` takes a typed target plus a +Six sibling actions live here: :class:`MoveEndEffector`, :class:`MoveJoints`, +:class:`PickUp`, :class:`MoveHeldObject`, :class:`Place`, and :class:`Press`. +Each inherits :class:`AtomicAction` directly and composes a +:class:`TrajectoryBuilder` for shared trajectory math. ``execute`` takes a typed +target plus a :class:`WorldState` and returns an :class:`ActionResult` whose trajectory is full-robot DoF shaped ``(n_envs, n_waypoints, robot.dof)``. """ @@ -144,6 +145,24 @@ class PlaceCfg(ActionCfg): """Height (m) to retract the end-effector after opening the gripper.""" +@configclass +class PressCfg(ActionCfg): + name: str = "press" + """Name of the action, used for identification and logging.""" + + sample_interval: int = 80 + """Number of waypoints for the full trajectory (hand close + down + back).""" + + hand_interp_steps: int = 5 + """Number of waypoints for closing the gripper before pressing.""" + + hand_control_part: str = "hand" + """Name of the robot part that controls the hand joints.""" + + hand_close_qpos: torch.Tensor | None = None + """Joint positions for the closed hand state, shape ``[hand_dof,]``.""" + + # ============================================================================= # Shared helpers private to this module # ============================================================================= @@ -747,6 +766,135 @@ def _fail(self, state: WorldState) -> ActionResult: ) +# ============================================================================= +# Press +# ============================================================================= + + +class Press(AtomicAction): + """Close the gripper, press down to a target pose, then return.""" + + TargetType: ClassVar[type] = EndEffectorPoseTarget + + def __init__( + self, + motion_generator, + cfg: PressCfg | None = None, + ) -> None: + super().__init__(motion_generator, cfg or PressCfg()) + self.builder = TrajectoryBuilder(motion_generator) + self.n_envs = self.robot.get_qpos().shape[0] + self.arm_joint_ids = self.robot.get_joint_ids(name=self.cfg.control_part) + self.hand_joint_ids = self.robot.get_joint_ids(name=self.cfg.hand_control_part) + self.arm_dof = len(self.arm_joint_ids) + self.hand_dof = len(self.hand_joint_ids) + self.robot_dof = self.robot.dof + + if self.cfg.hand_close_qpos is None: + logger.log_error( + "hand_close_qpos must be specified in PressCfg", ValueError + ) + self.hand_close_qpos = self.builder.expand_hand_qpos( + self.cfg.hand_close_qpos, + n_envs=self.n_envs, + hand_dof=self.hand_dof, + ) + + def execute(self, target: EndEffectorPoseTarget, state: WorldState) -> ActionResult: + press_xpos = self.builder.resolve_pose_target(target.xpos, n_envs=self.n_envs) + start_arm_qpos = self.builder.resolve_start_qpos( + _arm_qpos_from_state(state, self.arm_joint_ids, self.robot_dof), + n_envs=self.n_envs, + arm_dof=self.arm_dof, + control_part=self.cfg.control_part, + ) + start_hand_qpos = state.last_qpos[:, self.hand_joint_ids] + + n_close, n_down, n_back = self._compute_phase_waypoints() + + # Phase 1: close the hand while holding the current EEF pose. + hand_close_path = self.builder.interpolate_hand_qpos( + start_hand_qpos, + self.hand_close_qpos, + n_waypoints=n_close, + ) + + # Phase 2: press down to the target pose. + target_states_list = [ + [PlanState(xpos=press_xpos[i], move_type=MoveType.EEF_MOVE)] + for i in range(self.n_envs) + ] + ok, down_arm = self.builder.plan_arm_traj( + target_states_list, + start_arm_qpos, + n_down, + control_part=self.cfg.control_part, + arm_dof=self.arm_dof, + ) + if not ok: + logger.log_warning("Press failed to plan the down trajectory.") + return self._fail(state) + + # Phase 3: return to the arm pose from before pressing. + press_arm_qpos = down_arm[:, -1, :] + back_arm = self.builder.plan_joint_traj(press_arm_qpos, start_arm_qpos, n_back) + + full = torch.empty( + (self.n_envs, n_close + n_down + n_back, self.robot_dof), + dtype=torch.float32, + device=self.device, + ) + full[:, :, :] = state.last_qpos.unsqueeze(1) + full[:, :n_close, self.arm_joint_ids] = start_arm_qpos.unsqueeze(1) + full[:, :n_close, self.hand_joint_ids] = hand_close_path + full[:, n_close : n_close + n_down, self.arm_joint_ids] = down_arm + full[:, n_close : n_close + n_down, self.hand_joint_ids] = ( + self.hand_close_qpos.unsqueeze(1) + ) + full[:, n_close + n_down :, self.arm_joint_ids] = back_arm + full[:, n_close + n_down :, self.hand_joint_ids] = ( + self.hand_close_qpos.unsqueeze(1) + ) + + return ActionResult( + success=True, + trajectory=full, + next_state=WorldState( + last_qpos=full[:, -1, :].clone(), + held_object=state.held_object, + ), + ) + + def _compute_phase_waypoints(self) -> tuple[int, int, int]: + n_close = self.cfg.hand_interp_steps + if n_close < 1: + logger.log_error( + "hand_interp_steps must be at least 1 for PressCfg.", ValueError + ) + + motion_waypoints = self.cfg.sample_interval - n_close + n_down = motion_waypoints // 2 + n_back = motion_waypoints - n_down + if n_down < 2 or n_back < 2: + logger.log_error( + "Not enough waypoints for press trajectory. Increase " + "sample_interval or decrease hand_interp_steps.", + ValueError, + ) + return n_close, n_down, n_back + + def _fail(self, state: WorldState) -> ActionResult: + return ActionResult( + success=False, + trajectory=torch.empty( + (self.n_envs, 0, self.robot_dof), + dtype=torch.float32, + device=self.device, + ), + next_state=state, + ) + + __all__ = [ "MoveEndEffector", "MoveEndEffectorCfg", @@ -758,4 +906,6 @@ def _fail(self, state: WorldState) -> ActionResult: "PickUpCfg", "Place", "PlaceCfg", + "Press", + "PressCfg", ] diff --git a/scripts/tutorials/atomic_action/press.py b/scripts/tutorials/atomic_action/press.py new file mode 100644 index 00000000..9190e524 --- /dev/null +++ b/scripts/tutorials/atomic_action/press.py @@ -0,0 +1,445 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""Demonstrate Press on the center of a regular wooden block.""" + +from __future__ import annotations + +import argparse +import sys +import time +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parents[3] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +import torch + +from embodichain.data import get_data_path +from embodichain.lab.gym.utils.gym_utils import add_env_launcher_args_to_parser +from embodichain.lab.sim import SimulationManager, SimulationManagerCfg +from embodichain.lab.sim.atomic_actions import ( + AtomicActionEngine, + EndEffectorPoseTarget, + MoveEndEffector, + MoveEndEffectorCfg, + Press, + PressCfg, +) +from embodichain.lab.sim.cfg import ( + JointDrivePropertiesCfg, + LightCfg, + RenderCfg, + RigidBodyAttributesCfg, + RigidObjectCfg, + RobotCfg, + URDFCfg, +) +from embodichain.lab.sim.material import VisualMaterialCfg +from embodichain.lab.sim.objects import RigidObject, Robot +from embodichain.lab.sim.planners import MotionGenerator, MotionGenCfg, ToppraPlannerCfg +from embodichain.lab.sim.shapes import CubeCfg +from embodichain.lab.sim.solvers import PytorchSolverCfg +from embodichain.utils import logger +from scripts.tutorials.atomic_action.tutorial_utils import ( + draw_axis_marker, + get_tutorial_window_size, + start_auto_play_recording, + stop_auto_play_recording, +) + +GRIPPER_URDF_PATH = "DH_PGI_140_80/DH_PGI_140_80.urdf" +GRIPPER_HAND_JOINT_PATTERN = "GRIPPER_FINGER1_JOINT_1" +GRIPPER_TCP_Z = 0.15 + +MOVE_SAMPLE_INTERVAL = 60 +PRESS_SAMPLE_INTERVAL = 90 +HAND_INTERP_STEPS = 12 +POST_TRAJECTORY_STEPS = 180 +TABLE_SIZE = [1.0, 1.4, 0.05] +TABLE_TOP_Z = -0.045 +BLOCK_SIZE = [0.12, 0.12, 0.06] +BLOCK_CENTER = [-0.30, -0.12, TABLE_TOP_Z + 0.5 * BLOCK_SIZE[2]] +PRESS_CLEARANCE = 0.13 +PRESS_SURFACE_OFFSET = 0.003 +DEFAULT_PRESS_TOLERANCE = 0.01 + + +def parse_arguments() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Demonstrate Press on the center of a regular wooden block." + ) + add_env_launcher_args_to_parser(parser) + parser.add_argument( + "--auto_play", + action="store_true", + help="Run the viewer demo without waiting for keyboard input.", + ) + parser.add_argument( + "--debug_state", + action="store_true", + help="Log the block pose during replay.", + ) + parser.add_argument( + "--press_tolerance", + type=float, + default=DEFAULT_PRESS_TOLERANCE, + help="XY tolerance in meters for checking whether press reaches block center.", + ) + parser.add_argument( + "--block_pos", + type=float, + nargs=2, + default=BLOCK_CENTER[:2], + metavar=("X", "Y"), + help="Initial XY position of the wooden block center.", + ) + parser.add_argument( + "--no_vis_eef_axis", + action="store_true", + help="Do not draw the press target coordinate frame before planning.", + ) + return parser.parse_args() + + +def initialize_simulation(args: argparse.Namespace) -> SimulationManager: + width, height = get_tutorial_window_size(args) + cfg = SimulationManagerCfg( + width=width, + height=height, + headless=True, + sim_device=args.device, + render_cfg=RenderCfg(renderer=args.renderer), + physics_dt=1.0 / 100.0, + arena_space=2.5, + ) + sim = SimulationManager(cfg) + sim.add_light( + cfg=LightCfg( + uid="main_light", + color=(0.6, 0.6, 0.6), + intensity=30.0, + init_pos=(1.0, 0.0, 3.0), + ) + ) + return sim + + +def create_robot(sim: SimulationManager, position=(0.0, 0.0, 0.0)) -> Robot: + ur5_urdf_path = get_data_path("UniversalRobots/UR5/UR5.urdf") + gripper_urdf_path = get_data_path(GRIPPER_URDF_PATH) + cfg = RobotCfg( + uid="UR5", + urdf_cfg=URDFCfg( + components=[ + {"component_type": "arm", "urdf_path": ur5_urdf_path}, + {"component_type": "hand", "urdf_path": gripper_urdf_path}, + ] + ), + drive_pros=JointDrivePropertiesCfg( + stiffness={"JOINT[0-9]": 1e4, GRIPPER_HAND_JOINT_PATTERN: 1e3}, + damping={"JOINT[0-9]": 1e3, GRIPPER_HAND_JOINT_PATTERN: 1e2}, + max_effort={"JOINT[0-9]": 1e5, GRIPPER_HAND_JOINT_PATTERN: 1e4}, + drive_type="force", + ), + control_parts={ + "arm": ["JOINT[0-9]"], + "hand": [GRIPPER_HAND_JOINT_PATTERN], + }, + solver_cfg={ + "arm": PytorchSolverCfg( + end_link_name="ee_link", + root_link_name="base_link", + tcp=[ + [1.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 1.0, GRIPPER_TCP_Z], + [0.0, 0.0, 0.0, 1.0], + ], + ) + }, + init_qpos=[0.0, -1.57, 1.57, -1.57, -1.57, 0.0, 0.0, 0.0], + init_pos=position, + ) + return sim.add_robot(cfg=cfg) + + +def create_table(sim: SimulationManager) -> RigidObject: + cfg = RigidObjectCfg( + uid="table", + shape=CubeCfg(size=TABLE_SIZE), + body_type="static", + attrs=RigidBodyAttributesCfg( + dynamic_friction=0.8, + static_friction=0.9, + ), + init_pos=[-0.30, 0.10, TABLE_TOP_Z - 0.5 * TABLE_SIZE[2]], + ) + return sim.add_rigid_object(cfg=cfg) + + +def create_wooden_block( + sim: SimulationManager, + block_center: list[float], +) -> RigidObject: + cfg = RigidObjectCfg( + uid="wooden_block", + shape=CubeCfg( + size=BLOCK_SIZE, + visual_material=VisualMaterialCfg( + uid="wooden_block_mat", + base_color=[0.58, 0.32, 0.14, 1.0], + roughness=0.85, + ), + ), + body_type="static", + attrs=RigidBodyAttributesCfg( + dynamic_friction=0.8, + static_friction=0.9, + ), + init_pos=block_center, + ) + return sim.add_rigid_object(cfg=cfg) + + +def settle_object(sim: SimulationManager, obj: RigidObject, step: int = 5) -> None: + if sim.device.type == "cuda": + sim.init_gpu_physics() + obj.reset() + sim.update(step=step) + obj.clear_dynamics() + + +def get_hand_close_qpos(robot: Robot, device: torch.device) -> torch.Tensor: + hand_limits = robot.get_qpos_limits(name="hand")[0].to( + device=device, dtype=torch.float32 + ) + return hand_limits[:, 1] + + +def make_top_down_eef_pose(position: torch.Tensor) -> torch.Tensor: + pose = torch.eye(4, dtype=torch.float32, device=position.device) + pose[:3, :3] = torch.tensor( + [ + [-0.0539, -0.9985, -0.0022], + [-0.9977, 0.0540, -0.0401], + [0.0401, 0.0000, -0.9992], + ], + dtype=torch.float32, + device=position.device, + ) + pose[:3, 3] = position + return pose + + +def format_tensor(tensor: torch.Tensor) -> str: + rounded = (tensor.detach().cpu() * 10000.0).round() / 10000.0 + return str(rounded.tolist()) + + +def log_block_state(block: RigidObject, label: str) -> None: + block_pose = block.get_local_pose(to_matrix=True) + logger.log_info( + f"{label}: pos={format_tensor(block_pose[0, :3, 3])}, " + f"z_axis={format_tensor(block_pose[0, :3, 2])}" + ) + + +def draw_press_target_axis(sim: SimulationManager, press_target: torch.Tensor) -> None: + if press_target.dim() == 2: + press_target = press_target.unsqueeze(0) + draw_axis_marker(sim, "press_target_axis", press_target) + + +def compute_press_center_check( + robot: Robot, + traj: torch.Tensor, + block: RigidObject, + tolerance: float, +) -> tuple[bool, float, int, torch.Tensor, torch.Tensor]: + arm_joint_ids = robot.get_joint_ids(name="arm") + press_segment_start = MOVE_SAMPLE_INTERVAL + HAND_INTERP_STEPS + press_segment_end = MOVE_SAMPLE_INTERVAL + PRESS_SAMPLE_INTERVAL + arm_traj = traj[:, press_segment_start:press_segment_end, arm_joint_ids] + fk_pose = torch.stack( + [ + robot.compute_fk( + qpos=waypoint.unsqueeze(0), + name="arm", + to_matrix=True, + )[0] + for waypoint in arm_traj[0] + ], + dim=0, + ) + + block_pose = block.get_local_pose(to_matrix=True) + block_center = block_pose[0, :3, 3] + block_top_z = block_center[2] + 0.5 * BLOCK_SIZE[2] + target_xy = block_center[:2] + target_z = block_top_z + PRESS_SURFACE_OFFSET + + xy_error = torch.linalg.norm(fk_pose[:, :2, 3] - target_xy, dim=1) + z_error = torch.abs(fk_pose[:, 2, 3] - target_z) + combined_error = xy_error + z_error + best_idx = int(torch.argmin(combined_error).item()) + best_pos = fk_pose[best_idx, :3, 3] + center_error = float(torch.linalg.norm(best_pos[:2] - target_xy).item()) + return ( + center_error <= tolerance, + center_error, + press_segment_start + best_idx, + best_pos, + torch.tensor( + [target_xy[0], target_xy[1], target_z], + dtype=torch.float32, + device=traj.device, + ), + ) + + +def run_press_demo(args: argparse.Namespace) -> None: + sim = initialize_simulation(args) + robot = create_robot(sim) + create_table(sim) + block_center = [ + args.block_pos[0], + args.block_pos[1], + TABLE_TOP_Z + 0.5 * BLOCK_SIZE[2], + ] + block = create_wooden_block(sim, block_center) + + settle_object(sim, block, step=5) + motion_gen = MotionGenerator( + cfg=MotionGenCfg(planner_cfg=ToppraPlannerCfg(robot_uid=robot.uid)) + ) + hand_close = get_hand_close_qpos(robot, sim.device) + + atomic_engine = AtomicActionEngine(motion_generator=motion_gen) + atomic_engine.register( + MoveEndEffector( + motion_gen, + cfg=MoveEndEffectorCfg( + control_part="arm", + sample_interval=MOVE_SAMPLE_INTERVAL, + ), + ) + ) + atomic_engine.register( + Press( + motion_gen, + cfg=PressCfg( + control_part="arm", + hand_control_part="hand", + hand_close_qpos=hand_close, + sample_interval=PRESS_SAMPLE_INTERVAL, + hand_interp_steps=HAND_INTERP_STEPS, + ), + ) + ) + + if not args.headless: + sim.open_window() + if not args.auto_play: + input("Inspect the wooden block, then press Enter to plan...") + + block_pose = block.get_local_pose(to_matrix=True) + block_top_z = block_pose[0, 2, 3] + 0.5 * BLOCK_SIZE[2] + press_position = block_pose[0, :3, 3].clone() + press_position[2] = block_top_z + PRESS_SURFACE_OFFSET + move_position = press_position.clone() + move_position[2] = block_top_z + PRESS_CLEARANCE + + move_target = make_top_down_eef_pose(move_position) + press_target = make_top_down_eef_pose(press_position) + if not args.no_vis_eef_axis: + draw_press_target_axis(sim, press_target) + + logger.log_info("Planning MoveEndEffector -> Press") + start_time = time.time() + is_success, traj, _ = atomic_engine.run( + steps=[ + ("move_end_effector", EndEffectorPoseTarget(xpos=move_target)), + ("press", EndEffectorPoseTarget(xpos=press_target)), + ] + ) + cost_time = time.time() - start_time + logger.log_info(f"Plan trajectory cost time: {cost_time:.2f} seconds") + if not is_success: + logger.log_warning("Failed to plan Press demo trajectory.") + return + + is_center_hit, center_error, hit_step, hit_pos, expected_pos = ( + compute_press_center_check( + robot=robot, + traj=traj, + block=block, + tolerance=args.press_tolerance, + ) + ) + logger.log_info( + "Press center check: " + f"success={is_center_hit}, " + f"xy_error={center_error:.4f} m, " + f"hit_step={hit_step}, " + f"hit_pos={format_tensor(hit_pos)}, " + f"expected={format_tensor(expected_pos)}" + ) + if not is_center_hit: + logger.log_warning( + "Press planned trajectory did not reach the block center within " + f"{args.press_tolerance:.4f} m." + ) + return + + if not args.auto_play: + input("Press Enter to replay the Press demo...") + + recording_started = start_auto_play_recording( + sim, args, video_prefix="press_auto_play" + ) + try: + log_stride = max(1, traj.shape[1] // 10) + for i in range(traj.shape[1]): + robot.set_qpos(traj[:, i, :]) + sim.update(step=4) + if args.debug_state and (i % log_stride == 0 or i == traj.shape[1] - 1): + log_block_state(block, f"replay step {i}/{traj.shape[1] - 1}") + time.sleep(1e-2) + + logger.log_info("Press returned the end-effector to the starting pose.") + final_qpos = traj[:, -1, :] + for i in range(POST_TRAJECTORY_STEPS): + robot.set_qpos(final_qpos) + sim.update(step=2) + if args.debug_state and i % max(1, POST_TRAJECTORY_STEPS // 5) == 0: + log_block_state(block, f"post step {i}/{POST_TRAJECTORY_STEPS - 1}") + time.sleep(1e-2) + finally: + stop_auto_play_recording(sim, recording_started) + + if not args.auto_play: + input("Press Enter to exit the simulation...") + + +def main() -> None: + args = parse_arguments() + run_press_demo(args) + + +if __name__ == "__main__": + main() diff --git a/tests/sim/atomic_actions/test_actions.py b/tests/sim/atomic_actions/test_actions.py index fb5a1bf0..f0b07653 100644 --- a/tests/sim/atomic_actions/test_actions.py +++ b/tests/sim/atomic_actions/test_actions.py @@ -47,6 +47,8 @@ PickUpCfg, Place, PlaceCfg, + Press, + PressCfg, ) NUM_ENVS = 2 @@ -380,3 +382,57 @@ def test_execute_clears_held_object(self): assert result.success is True assert result.trajectory.shape[2] == TOTAL_DOF assert result.next_state.held_object is None + + +# --------------------------------------------------------------------------- +# Press +# --------------------------------------------------------------------------- + + +class TestPressAction: + def setup_method(self): + self.mg = _make_mock_motion_generator() + + def test_target_type_is_pose_target(self): + assert Press.TargetType is EndEffectorPoseTarget + + def test_default_name_is_explicit(self): + assert PressCfg(hand_close_qpos=_hand_close()).name == "press" + + def test_execute_closes_hand_and_preserves_held_object(self): + cfg = PressCfg( + hand_close_qpos=_hand_close(), + sample_interval=12, + hand_interp_steps=4, + ) + action = Press(self.mg, cfg) + sem = ObjectSemantics( + affordance=AntipodalAffordance(), geometry={}, label="mug" + ) + held = HeldObjectState( + semantics=sem, + object_to_eef=torch.eye(4).unsqueeze(0).repeat(NUM_ENVS, 1, 1), + grasp_xpos=torch.eye(4).unsqueeze(0).repeat(NUM_ENVS, 1, 1), + ) + start_hand_qpos = torch.full((NUM_ENVS, HAND_DOF), 0.01) + last_qpos = torch.cat([torch.zeros(NUM_ENVS, ARM_DOF), start_hand_qpos], dim=1) + state = WorldState(last_qpos=last_qpos, held_object=held) + + def interpolate(trajectory, interp_num, device): + return trajectory[:, -1:, :].repeat(1, interp_num, 1) + + with patch( + "embodichain.lab.sim.atomic_actions.trajectory.interpolate_with_distance", + side_effect=interpolate, + ): + result = action.execute(EndEffectorPoseTarget(xpos=torch.eye(4)), state) + + assert result.success is True + assert result.trajectory.shape == (NUM_ENVS, 12, TOTAL_DOF) + expected_hand_qpos = _hand_close().unsqueeze(0).repeat(NUM_ENVS, 1) + assert torch.allclose(result.trajectory[:, -1, ARM_DOF:], expected_hand_qpos) + assert torch.allclose( + result.next_state.last_qpos[:, :ARM_DOF], + last_qpos[:, :ARM_DOF], + ) + assert result.next_state.held_object is held