diff --git a/assets/scenes/fr3_simple_pick_up/scene.xml b/assets/scenes/fr3_simple_pick_up/scene.xml index 88b1f269..d077d3bb 100644 --- a/assets/scenes/fr3_simple_pick_up/scene.xml +++ b/assets/scenes/fr3_simple_pick_up/scene.xml @@ -28,7 +28,7 @@ - + diff --git a/examples/teleop/quest_iris_dual_arm.py b/examples/teleop/quest_iris_dual_arm.py index 6deeb5ae..f1beb455 100644 --- a/examples/teleop/quest_iris_dual_arm.py +++ b/examples/teleop/quest_iris_dual_arm.py @@ -3,8 +3,10 @@ from time import sleep import numpy as np +import rcs +from rcs._core import common from rcs._core.common import RPY, Pose, RobotPlatform -from rcs._core.sim import SimConfig +from rcs._core.sim import CameraType, SimCameraConfig, SimConfig from rcs.camera.hw import HardwareCameraSet from rcs.envs.base import ( ControlMode, @@ -19,7 +21,7 @@ from rcs.utils import SimpleFrameRate from rcs_fr3.creators import RCSFR3MultiEnvCreator from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg -from rcs_realsense.utils import default_realsense +# from rcs_realsense.utils import default_realsense from simpub.core.simpub_server import SimPublisher from simpub.parser.simdata import SimObject, SimScene from simpub.sim.mj_publisher import MujocoPublisher @@ -39,10 +41,14 @@ # "left": "192.168.102.1", "right": "192.168.101.1", } +ROBOT2ID = { + "left": "0", + "right": "1", +} -# ROBOT_INSTANCE = RobotPlatform.SIMULATION -ROBOT_INSTANCE = RobotPlatform.HARDWARE +ROBOT_INSTANCE = RobotPlatform.SIMULATION +# ROBOT_INSTANCE = RobotPlatform.HARDWARE RECORD_FPS = 30 # set camera dict to none disable cameras # CAMERA_DICT = { @@ -87,7 +93,7 @@ def __init__(self, env: RelativeActionSpace): self._reset_lock = threading.Lock() self._env = env - self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ["right"] + self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ROBOT2ID.keys() self._trg_btn = {"left": "index_trigger", "right": "index_trigger"} self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"} self._start_btn = "A" @@ -100,7 +106,7 @@ def __init__(self, env: RelativeActionSpace): self._last_controller_pose = {key: Pose() for key in self.controller_names} self._offset_pose = {key: Pose() for key in self.controller_names} - for robot in ROBOT2IP: + for robot in ROBOT2IP if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ROBOT2ID: self._env.envs[robot].set_origin_to_current() self._step_env = False @@ -326,22 +332,50 @@ def main(): else: # FR3 - robot_cfg = default_sim_robot_cfg("fr3_empty_world") + rcs.scenes["rcs_icra_scene"] = rcs.Scene( + mjcf_scene="/home/tobi/coding/rcs_clones/prs/models/scenes/rcs_icra_scene/scene.xml", + mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot, + robot_type=common.RobotType.FR3, + ) + rcs.scenes["pick"] = rcs.Scene( + mjcf_scene="/home/tobi/coding/rcs_clones/prs/assets/scenes/fr3_simple_pick_up/scene.xml", + mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot, + robot_type=common.RobotType.FR3, + ) + + # robot_cfg = default_sim_robot_cfg("fr3_empty_world") + # robot_cfg = default_sim_robot_cfg("fr3_simple_pick_up") + robot_cfg = default_sim_robot_cfg("rcs_icra_scene") + # robot_cfg = default_sim_robot_cfg("pick") + + resolution = (256, 256) + cameras = { + cam: SimCameraConfig( + identifier=cam, + type=CameraType.fixed, + resolution_height=resolution[1], + resolution_width=resolution[0], + frame_rate=0, + ) + for cam in ["side", "wrist"] + } sim_cfg = SimConfig() sim_cfg.async_control = True env_rel = SimMultiEnvCreator()( - name2id=ROBOT2IP, + name2id=ROBOT2ID, robot_cfg=robot_cfg, control_mode=ControlMode.CARTESIAN_TQuat, gripper_cfg=default_sim_gripper_cfg(), - # cameras=default_mujoco_cameraset_cfg(), + # cameras=cameras, max_relative_movement=0.5, relative_to=RelativeTo.CONFIGURED_ORIGIN, sim_cfg=sim_cfg, ) - sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore + # sim = env_rel.unwrapped.envs[ROBOT2ID.keys().__iter__().__next__()].sim # type: ignore + sim = env_rel.get_wrapper_attr("sim") sim.open_gui() + # MySimPublisher(MySimScene(), MQ3_ADDR) MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3))) env_rel.reset() diff --git a/extensions/rcs_fr3/src/rcs_fr3/creators.py b/extensions/rcs_fr3/src/rcs_fr3/creators.py index 0f61e2cc..1a490dbb 100644 --- a/extensions/rcs_fr3/src/rcs_fr3/creators.py +++ b/extensions/rcs_fr3/src/rcs_fr3/creators.py @@ -121,7 +121,7 @@ def __call__( # type: ignore # sim_gui=True, # truncate_on_collision=False, # ) - if max_relative_movement is not None: + if relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to) return env @@ -137,6 +137,7 @@ def __call__( # type: ignore camera_set: HardwareCameraSet | None = None, max_relative_movement: float | tuple[float, float] | None = None, relative_to: RelativeTo = RelativeTo.LAST_STEP, + robot2world = None, ) -> gym.Env: ik = rcs.common.Pin( @@ -163,7 +164,7 @@ def __call__( # type: ignore env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to) envs[key] = env - env = MultiRobotWrapper(envs) + env = MultiRobotWrapper(envs, robot2world) if camera_set is not None: camera_set.start() camera_set.wait_for_frames() diff --git a/python/rcs/__init__.py b/python/rcs/__init__.py index fdfee7bb..4131ea78 100644 --- a/python/rcs/__init__.py +++ b/python/rcs/__init__.py @@ -18,7 +18,7 @@ class Scene: """Scene configuration.""" - mjb: str + mjb: str | None = None """Path to the Mujoco binary scene file.""" mjcf_scene: str """Path to the Mujoco scene XML file.""" diff --git a/python/rcs/_core/sim.pyi b/python/rcs/_core/sim.pyi index 26a187fc..c70d9209 100644 --- a/python/rcs/_core/sim.pyi +++ b/python/rcs/_core/sim.pyi @@ -136,6 +136,8 @@ class SimGripperConfig(rcs._core.common.GripperConfig): min_actuator_width: float min_joint_width: float seconds_between_callbacks: float + def __copy__(self) -> SimGripperConfig: ... + def __deepcopy__(self, arg0: dict) -> SimGripperConfig: ... def __init__(self) -> None: ... def add_id(self, id: str) -> None: ... @@ -169,6 +171,8 @@ class SimRobotConfig(rcs._core.common.RobotConfig): mjcf_scene_path: str seconds_between_callbacks: float trajectory_trace: bool + def __copy__(self) -> SimRobotConfig: ... + def __deepcopy__(self, arg0: dict) -> SimRobotConfig: ... def __init__(self) -> None: ... def add_id(self, id: str) -> None: ... diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py index 1518909d..c2734107 100644 --- a/python/rcs/envs/base.py +++ b/python/rcs/envs/base.py @@ -167,6 +167,8 @@ class ArmObsType(TQuatDictType, JointsDictType, TRPYDictType): ... CartOrJointContType: TypeAlias = TQuatDictType | JointsDictType | TRPYDictType LimitedCartOrJointContType: TypeAlias = LimitedTQuatRelDictType | LimitedJointsRelDictType | LimitedTRPYRelDictType +class ArmWithGripper(TQuatDictType, GripperDictType): ... + class ControlMode(Enum): JOINTS = auto() @@ -310,9 +312,26 @@ def close(self): class MultiRobotWrapper(gym.Env): """Wraps a dictionary of environments to allow for multi robot control.""" - def __init__(self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper]): + def __init__(self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper], robot2world: dict[str, common.Pose] | None = None): self.envs = envs self.unwrapped_multi = cast(dict[str, RobotEnv], {key: env.unwrapped for key, env in envs.items()}) + if robot2world is None: + self.robot2world = {} + else: + self.robot2world = robot2world + + def _translate_pose(self, key, dic, to_world=True): + r2w = self.robot2world.get(key, common.Pose()) + if not to_world: + r2w = r2w.inverse() + if "tquat" in dic: + p = r2w * common.Pose(translation=dic["tquat"][:3], quaternion=dic["tquat"][3:]) * r2w.inverse() + dic["tquat"] = np.concatenate([p.translation(), p.rotation_q()]) + if "xyzrpy" in dic: + p = r2w * common.Pose(translation=dic["xyzrpy"][:3], rpy_vector=dic["xyzrpy"][3:]) * r2w.inverse() + dic["xyzrpy"] = p.xyzrpy() + + return dic def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]: # follows gym env by combinding a dict of envs into a single env @@ -322,7 +341,11 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo truncated = False info = {} for key, env in self.envs.items(): - obs[key], r, t, tr, info[key] = env.step(action[key]) + act = self._translate_pose(key, action[key], to_world=False) + ob, r, t, tr, info[key] = env.step(act) + obs[key] = self._translate_pose(key, ob, to_world=True) + # old + # obs[key], r, t, tr, info[key] = env.step(action[key]) reward += float(r) terminated = terminated or t truncated = truncated or tr @@ -358,6 +381,7 @@ def close(self): class RelativeTo(Enum): LAST_STEP = auto() CONFIGURED_ORIGIN = auto() + NONE = auto() class RelativeActionSpace(gym.ActionWrapper): diff --git a/python/rcs/envs/creators.py b/python/rcs/envs/creators.py index 64bcce19..b6f884aa 100644 --- a/python/rcs/envs/creators.py +++ b/python/rcs/envs/creators.py @@ -1,3 +1,4 @@ +import copy import logging import typing from functools import partial @@ -146,33 +147,40 @@ def __call__( # type: ignore max_relative_movement: float | tuple[float, float] | None = None, relative_to: RelativeTo = RelativeTo.LAST_STEP, sim_wrapper: Type[SimWrapper] | None = None, + robot2world=None, ) -> gym.Env: simulation = sim.Sim(robot_cfg.mjcf_scene_path, sim_cfg) + print(robot_cfg.kinematic_model_path) + print(robot_cfg.attachment_site) ik = rcs.common.Pin( robot_cfg.kinematic_model_path, - robot_cfg.attachment_site, + robot_cfg.attachment_site + "_0", urdf=robot_cfg.kinematic_model_path.endswith(".urdf"), ) # ik = rcs_robotics_library._core.rl.RoboticsLibraryIK(robot_cfg.kinematic_model_path) robots: dict[str, rcs.sim.SimRobot] = {} - for key in name2id: - robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=robot_cfg) + for key, mid in name2id.items(): + cfg = copy.copy(robot_cfg) + cfg.add_id(mid) + robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=cfg) envs = {} - for key in name2id: + for key, mid in name2id.items(): env: gym.Env = RobotEnv(robots[key], control_mode) - env = RobotSimWrapper(env, simulation, sim_wrapper) if gripper_cfg is not None: - gripper = rcs.sim.SimGripper(simulation, gripper_cfg) + cfg = copy.copy(gripper_cfg) + cfg.add_id(mid) + gripper = rcs.sim.SimGripper(simulation, cfg) env = GripperWrapper(env, gripper, binary=True) - if max_relative_movement is not None: + if relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to) envs[key] = env - env = MultiRobotWrapper(envs) + env = MultiRobotWrapper(envs, robot2world) + env = RobotSimWrapper(env, simulation, sim_wrapper) if cameras is not None: camera_set = typing.cast( BaseCameraSet, SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True) diff --git a/python/rcs/envs/sim.py b/python/rcs/envs/sim.py index 589b49fd..1479230a 100644 --- a/python/rcs/envs/sim.py +++ b/python/rcs/envs/sim.py @@ -3,6 +3,7 @@ import gymnasium as gym import numpy as np +from greenlet import getcurrent, greenlet from rcs.envs.base import ( ControlMode, GripperWrapper, @@ -45,11 +46,16 @@ def __init__(self, env, simulation: sim.Sim, sim_wrapper: Type[SimWrapper] | Non self.sim = simulation cfg = self.sim.get_config() self.frame_rate = SimpleFrameRate(1 / cfg.frequency, "RobotSimWrapper") + self.main_greenlet = None def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]: obs, _, _, _, info = super().step(action) cfg = self.sim.get_config() - if cfg.async_control: + + if self.main_greenlet is not None: + self.sim_robot.clear_collision_flag() + self.main_greenlet.switch() + elif cfg.async_control: self.sim.step(round(1 / cfg.frequency / self.sim.model.opt.timestep)) if cfg.realtime: self.frame_rate.frame_rate = 1 / cfg.frequency @@ -72,10 +78,14 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo def reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None ) -> tuple[dict[str, Any], dict[str, Any]]: - self.sim.reset() + if self.main_greenlet is None: + self.sim.reset() obs, info = super().reset(seed=seed, options=options) - self.sim.step(1) - # todo: an obs method that is recursive over wrappers would be needed + + if self.main_greenlet is not None: + self.main_greenlet.switch() + else: + self.sim.step(1) obs.update(self.unwrapped.get_obs()) return obs, info @@ -88,20 +98,63 @@ def __init__(self, env: MultiRobotWrapper, simulation: sim.Sim): self.env: MultiRobotWrapper self.sim = simulation self.sim_robots = cast(dict[str, sim.SimRobot], {key: e.robot for key, e in self.env.unwrapped_multi.items()}) + self._inject_main_greenlet() + + def _inject_main_greenlet(self): + main_gr = getcurrent() + for env_item in self.env.envs.values(): + curr = env_item + while True: + if isinstance(curr, RobotSimWrapper): + curr.main_greenlet = main_gr + break + if not hasattr(curr, "env"): + break + curr = curr.env def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]: - _, _, _, _, info = super().step(action) + # 1. DOWN: Set actions for all robots + step_greenlets = {} + for key, env in self.env.envs.items(): + + def make_step_gr(env_to_step): + return greenlet(env_to_step.step) + + gr = make_step_gr(env) + step_greenlets[key] = gr + # Translate action + act = self.env._translate_pose(key, action[key], to_world=False) + + # Switch to robot greenlet. It will run until RobotSimWrapper.step switches back. + gr.switch(act) + + # 2. SIM: Step physics once self.sim.step_until_convergence() - info["is_sim_converged"] = self.sim.is_converged() - for key in self.envs.envs.items(): - state = self.sim_robots[key].get_state() - info[key]["collision"] = state.collision - info[key]["ik_success"] = state.ik_success - obs = {key: env.get_obs() for key, env in self.env.unwrapped_multi.items()} - truncated = np.all([info[key]["collision"] or info[key]["ik_success"] for key in info]) - return obs, 0.0, False, bool(truncated), info + # 3. UP: Gather observations + obs = {} + reward = 0.0 + terminated = False + truncated = False + info = {} + + for key in self.env.envs: + # Resume robot greenlet. It returns the step results. + res = step_greenlets[key].switch() + ob, r, t, tr, i = res + + # Translate observation back to world coordinates + obs[key] = self.env._translate_pose(key, ob, to_world=True) + reward += float(r) + terminated = terminated or t + truncated = truncated or tr + info[key] = i + info[key]["terminated"] = t + info[key]["truncated"] = tr + info[key]["is_sim_converged"] = self.sim.is_converged() + + return obs, reward, terminated, truncated, info def reset( # type: ignore self, *, seed: dict[str, int | None] | None = None, options: dict[str, Any] | None = None @@ -113,11 +166,27 @@ def reset( # type: ignore obs = {} info = {} self.sim.reset() + + # 1. DOWN: Reset each robot + reset_greenlets = {} for key, env in self.env.envs.items(): - _, info[key] = env.reset(seed=seed[key], options=options[key]) + + def make_reset_gr(env_to_reset, s, o): + return greenlet(lambda: env_to_reset.reset(seed=s, options=o)) + + gr = make_reset_gr(env, seed[key], options[key]) + reset_greenlets[key] = gr + gr.switch() + + # 2. SIM: Initial step self.sim.step(1) - for key, env in self.env.unwrapped_multi.items(): - obs[key] = cast(dict, env.get_obs()) + + # 3. UP: Gather initial obs + for key in self.env.envs: + ob, i = reset_greenlets[key].switch() + obs[key] = self.env._translate_pose(key, ob, to_world=True) + info[key] = i + return obs, info diff --git a/python/rcs/envs/utils.py b/python/rcs/envs/utils.py index 1d440098..98ec8288 100644 --- a/python/rcs/envs/utils.py +++ b/python/rcs/envs/utils.py @@ -19,6 +19,7 @@ def default_sim_robot_cfg(scene: str = "fr3_empty_world", idx: str = "0") -> sim robot_cfg = rcs.sim.SimRobotConfig() robot_cfg.robot_type = rcs.scenes[scene].robot_type robot_cfg.tcp_offset = common.Pose(common.FrankaHandTCPOffset()) + robot_cfg.arm_collision_geoms = [] robot_cfg.add_id(idx) if rcs.scenes[scene].mjb is not None: robot_cfg.mjcf_scene_path = rcs.scenes[scene].mjb @@ -38,7 +39,9 @@ def default_tilburg_hw_hand_cfg(file: str | PathLike | None = None) -> THConfig: def default_sim_gripper_cfg(idx: str = "0") -> sim.SimGripperConfig: cfg = sim.SimGripperConfig() - cfg.add_id(idx) + cfg.collision_geoms = [] + cfg.collision_geoms_fingers = [] + # cfg.add_id(idx) return cfg diff --git a/python/rcs/operator/__init__.py b/python/rcs/operator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/rcs/operator/franka.py b/python/rcs/operator/franka.py new file mode 100644 index 00000000..d2e73821 --- /dev/null +++ b/python/rcs/operator/franka.py @@ -0,0 +1,183 @@ +import logging +import threading +from time import sleep + +import numpy as np +import rcs +from rcs._core.common import RPY, Pose, RobotPlatform +from rcs._core import common +from rcs._core.sim import SimConfig +from rcs.camera.hw import HardwareCameraSet +from rcs.envs.base import ( + ControlMode, + GripperDictType, + LimitedTQuatRelDictType, + RelativeActionSpace, + RelativeTo, +) +from rcs.envs.creators import SimMultiEnvCreator +from rcs.envs.storage_wrapper import StorageWrapper +from rcs.envs.utils import default_digit, default_sim_gripper_cfg, default_sim_robot_cfg +from rcs.utils import SimpleFrameRate +from rcs_fr3.creators import RCSFR3MultiEnvCreator +from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg +from rcs_realsense.utils import default_realsense +from rcs.operator.gello import GelloConfig, GelloOperator, GelloArmConfig +from rcs.operator.interface import TeleopLoop + +from simpub.core.simpub_server import SimPublisher +from simpub.parser.simdata import SimObject, SimScene +from simpub.sim.mj_publisher import MujocoPublisher +from simpub.xr_device.meta_quest3 import MetaQuest3 + +class FakeSimPublisher(SimPublisher): + def get_update(self): + return {} + +class FakeSimScene(SimScene): + def __init__(self): + super().__init__() + self.root = SimObject(name="root") + +logger = logging.getLogger(__name__) + + +ROBOT2IP = { + # "left": "192.168.102.1", + "right": "192.168.101.1", +} +ROBOT2ID = { + "left": "1", + "right": "0", +} + + +ROBOT_INSTANCE = RobotPlatform.SIMULATION +# ROBOT_INSTANCE = RobotPlatform.HARDWARE + +RECORD_FPS = 30 +# set camera dict to none disable cameras +# CAMERA_DICT = { +# "left_wrist": "230422272017", +# "right_wrist": "230422271040", +# "side": "243522070385", +# "bird_eye": "243522070364", +# } +CAMERA_DICT = None +MQ3_ADDR = "10.42.0.1" + +# DIGIT_DICT = { +# "digit_right_left": "D21182", +# "digit_right_right": "D21193" +# } +DIGIT_DICT = None + + +DATASET_PATH = "test_data_iris_dual_arm14" +INSTRUCTION = "build a tower with the blocks in front of you" + +robot2world={"left": rcs.common.Pose(translation=[0, 0, 0], rpy_vector=[0.89360858, -0.17453293, 0.46425758]), + "right": rcs.common.Pose(translation=[0, 0, 0], rpy_vector=[-0.89360858, -0.17453293, -0.46425758])} + +# config = QuestConfig(mq3_addr=MQ3_ADDR, simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION) +config = GelloConfig( + arms={ + "right": GelloArmConfig(com_port="/dev/ttyACM1"), + "left": GelloArmConfig(com_port="/dev/ttyACM0"), + }, + simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION, +) + + +def get_env(): + if ROBOT_INSTANCE == RobotPlatform.HARDWARE: + + cams = [] + if CAMERA_DICT is not None: + cams.append(default_realsense(CAMERA_DICT)) + if DIGIT_DICT is not None: + cams.append(default_digit(DIGIT_DICT)) + + camera_set = HardwareCameraSet(cams) if cams else None + + env_rel = RCSFR3MultiEnvCreator()( + name2ip=ROBOT2IP, + camera_set=camera_set, + robot_cfg=default_fr3_hw_robot_cfg(async_control=True), + control_mode=GelloOperator.control_mode[0], + gripper_cfg=default_fr3_hw_gripper_cfg(async_control=True), + max_relative_movement=(0.5, np.deg2rad(90)), + relative_to=GelloOperator.control_mode[1], + robot2world=robot2world, + ) + env_rel = StorageWrapper( + env_rel, DATASET_PATH, INSTRUCTION, batch_size=32, max_rows_per_group=100, max_rows_per_file=1000 + ) + operator = GelloOperator(config) + else: + # FR3 + rcs.scenes["rcs_icra_scene"] = rcs.Scene( + mjcf_scene="/home/tobi/coding/rcs_clones/prs/models/scenes/rcs_icra_scene/scene.xml", + mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot, + robot_type=common.RobotType.FR3, + ) + rcs.scenes["pick"] = rcs.Scene( + mjcf_scene="/home/tobi/coding/rcs_clones/prs/assets/scenes/fr3_simple_pick_up/scene.xml", + mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot, + robot_type=common.RobotType.FR3, + ) + rcs.scenes["duo"] = rcs.Scene( + mjcf_scene="/home/tobi/coding/rcs_repos/clone1/rcs_models/assets/scenes/fr3_duo_empty_world/scene.xml", + mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot, + robot_type=common.RobotType.FR3, + ) + + # robot_cfg = default_sim_robot_cfg("fr3_empty_world") + # robot_cfg = default_sim_robot_cfg("fr3_simple_pick_up") + # robot_cfg = default_sim_robot_cfg("rcs_icra_scene") + # robot_cfg = default_sim_robot_cfg("pick") + robot_cfg = default_sim_robot_cfg("duo") + + # resolution = (256, 256) + # cameras = { + # cam: SimCameraConfig( + # identifier=cam, + # type=CameraType.fixed, + # resolution_height=resolution[1], + # resolution_width=resolution[0], + # frame_rate=0, + # ) + # for cam in ["side", "wrist"] + # } + + sim_cfg = SimConfig() + sim_cfg.async_control = True + env_rel = SimMultiEnvCreator()( + name2id=ROBOT2ID, + robot_cfg=robot_cfg, + control_mode=GelloOperator.control_mode[0], + gripper_cfg=default_sim_gripper_cfg(), + # cameras=default_mujoco_cameraset_cfg(), + max_relative_movement=0.5, + relative_to=GelloOperator.control_mode[1], + sim_cfg=sim_cfg, + robot2world=robot2world, + ) + # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore + sim = env_rel.get_wrapper_attr("sim") + operator = GelloOperator(config, sim) + sim.open_gui() + MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3))) + return env_rel, operator + + +def main(): + env_rel, operator = get_env() + env_rel.reset() + tele = TeleopLoop(env_rel, operator) + with env_rel, tele: # type: ignore + tele.environment_step_loop() + + +if __name__ == "__main__": + main() diff --git a/python/rcs/operator/gello.py b/python/rcs/operator/gello.py new file mode 100644 index 00000000..26e9d810 --- /dev/null +++ b/python/rcs/operator/gello.py @@ -0,0 +1,433 @@ +import copy +import logging +import threading +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Sequence, Tuple, TypedDict, Iterator + +import numpy as np + +try: + from dynamixel_sdk.group_sync_read import GroupSyncRead + from dynamixel_sdk.group_sync_write import GroupSyncWrite + from dynamixel_sdk.packet_handler import PacketHandler + from dynamixel_sdk.port_handler import PortHandler + from dynamixel_sdk.robotis_def import COMM_SUCCESS + HAS_DYNAMIXEL_SDK = True +except ImportError: + HAS_DYNAMIXEL_SDK = False + +try: + from pynput import keyboard + HAS_PYNPUT = True +except ImportError: + HAS_PYNPUT = False + +from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo, JointsDictType, GripperDictType +from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands +from rcs.sim.sim import Sim +from rcs.utils import SimpleFrameRate + +logger = logging.getLogger(__name__) + +# --- Dynamixel Driver Constants and Helpers --- + +XL330_CONTROL_TABLE = { + "model_number": {"addr": 0, "len": 2}, + "operating_mode": {"addr": 11, "len": 1}, + "torque_enable": {"addr": 64, "len": 1}, + "kp_d": {"addr": 80, "len": 2}, + "kp_i": {"addr": 82, "len": 2}, + "kp_p": {"addr": 84, "len": 2}, + "goal_current": {"addr": 102, "len": 2}, + "goal_position": {"addr": 116, "len": 4}, + "present_position": {"addr": 132, "len": 4}, +} + + +class DynamixelDriver: + """Simplified Dynamixel driver adapted for RCS.""" + + def __init__( + self, + ids: Sequence[int], + port: str = "/dev/ttyUSB0", + baudrate: int = 57600, + pulses_per_revolution: int = 4095, + ): + if not HAS_DYNAMIXEL_SDK: + raise ImportError("dynamixel_sdk is not installed. Please install it to use GelloOperator.") + + self._ids = ids + self._port = port + self._baudrate = baudrate + self._pulses_per_revolution = pulses_per_revolution + self._lock = threading.Lock() + self._buffered_joint_positions = None + + self._portHandler = PortHandler(self._port) + self._packetHandler = PacketHandler(2.0) + + self._groupSyncReadHandlers = {} + self._groupSyncWriteHandlers = {} + + for key, entry in XL330_CONTROL_TABLE.items(): + self._groupSyncReadHandlers[key] = GroupSyncRead( + self._portHandler, self._packetHandler, entry["addr"], entry["len"] + ) + for dxl_id in self._ids: + self._groupSyncReadHandlers[key].addParam(dxl_id) + + if key != "model_number" and key != "present_position": + self._groupSyncWriteHandlers[key] = GroupSyncWrite( + self._portHandler, self._packetHandler, entry["addr"], entry["len"] + ) + + if not self._portHandler.openPort(): + raise ConnectionError(f"Failed to open port {self._port}") + if not self._portHandler.setBaudRate(self._baudrate): + raise ConnectionError(f"Failed to set baudrate {self._baudrate}") + + self._stop_thread = threading.Event() + self._polling_thread = None + self._is_polling = False + + def write_value_by_name(self, name: str, values: Sequence[int | None]): + if len(values) != len(self._ids): + raise ValueError(f"The length of {name} must match the number of servos") + + handler = self._groupSyncWriteHandlers[name] + value_len = XL330_CONTROL_TABLE[name]["len"] + + with self._lock: + for dxl_id, value in zip(self._ids, values): + if value is None: + continue + param = [(value >> (8 * i)) & 0xFF for i in range(value_len)] + handler.addParam(dxl_id, param) + + comm_result = handler.txPacket() + if comm_result != COMM_SUCCESS: + handler.clearParam() + raise RuntimeError(f"Failed to syncwrite {name}: {self._packetHandler.getTxRxResult(comm_result)}") + handler.clearParam() + + def read_value_by_name(self, name: str) -> List[int]: + handler = self._groupSyncReadHandlers[name] + value_len = XL330_CONTROL_TABLE[name]["len"] + addr = XL330_CONTROL_TABLE[name]["addr"] + + with self._lock: + comm_result = handler.txRxPacket() + if comm_result != COMM_SUCCESS: + raise RuntimeError(f"Failed to sync read {name}: {self._packetHandler.getTxRxResult(comm_result)}") + + values = [] + for dxl_id in self._ids: + if handler.isAvailable(dxl_id, addr, value_len): + value = handler.getData(dxl_id, addr, value_len) + value = int(np.int32(np.uint32(value))) + values.append(value) + else: + raise RuntimeError(f"Failed to get {name} for ID {dxl_id}") + return values + + def start_joint_polling(self): + if self._is_polling: + return + self._stop_thread.clear() + self._polling_thread = threading.Thread(target=self._joint_polling_loop, daemon=True) + self._polling_thread.start() + self._is_polling = True + + def stop_joint_polling(self): + if not self._is_polling: + return + self._stop_thread.set() + if self._polling_thread: + self._polling_thread.join() + self._is_polling = False + + def _joint_polling_loop(self): + while not self._stop_thread.is_set(): + time.sleep(0.001) + try: + self._buffered_joint_positions = np.array(self.read_value_by_name("present_position"), dtype=int) + except RuntimeError as e: + logger.warning(f"Polling error: {e}") + + def get_joints(self) -> np.ndarray: + if self._is_polling: + while self._buffered_joint_positions is None: + time.sleep(0.01) + return self._pulses_to_rad(self._buffered_joint_positions.copy()) + return self._pulses_to_rad(np.array(self.read_value_by_name("present_position"), dtype=int)) + + def _pulses_to_rad(self, pulses) -> np.ndarray: + return np.array(pulses) / self._pulses_per_revolution * 2 * np.pi + + def _rad_to_pulses(self, rad: float) -> int: + return int(rad / (2 * np.pi) * self._pulses_per_revolution) + + def close(self): + self.stop_joint_polling() + if self._portHandler: + self._portHandler.closePort() + + +# --- Gello Hardware Interface Logic --- + +@dataclass +class GelloArmConfig: + com_port: str = "/dev/ttyUSB0" + num_arm_joints: int = 7 + joint_signs: List[int] = field(default_factory=lambda: [1, -1, 1, -1, 1, 1, 1]) + gripper: bool = True + gripper_range_rad: List[float] = field(default_factory=lambda: [2.00, 3.22]) + assembly_offsets: List[float] = field( + default_factory=lambda: [0.000, 0.000, 3.142, 3.142, 3.142, 4.712, 0.000] + ) + dynamixel_kp_p: List[int] = field( + default_factory=lambda: [30, 60, 0, 30, 0, 0, 0, 50] + ) + dynamixel_kp_i: List[int] = field( + default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0] + ) + dynamixel_kp_d: List[int] = field( + default_factory=lambda: [250, 100, 80, 60, 30, 10, 5, 0] + ) + dynamixel_torque_enable: List[int] = field( + default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0] + ) + dynamixel_goal_position: List[float] = field( + default_factory=lambda: [0.0, 0.0, 0.0, -1.571, 0.0, 1.571, 0.0, 3.509] + ) + + + +@dataclass +class DynamixelControlConfig: + kp_p: List[int] = field(default_factory=list) + kp_i: List[int] = field(default_factory=list) + kp_d: List[int] = field(default_factory=list) + torque_enable: List[int] = field(default_factory=list) + goal_position: List[int] = field(default_factory=list) + goal_current: List[int] = field(default_factory=list) + operating_mode: List[int] = field(default_factory=list) + + _UPDATE_ORDER = [ + "operating_mode", + "goal_current", + "kp_p", + "kp_i", + "kp_d", + "torque_enable", + "goal_position", + ] + + def __iter__(self) -> Iterator[Tuple[str, List[int]]]: + for param_name in self._UPDATE_ORDER: + if hasattr(self, param_name): + yield param_name, getattr(self, param_name) + + def __getitem__(self, param_name: str) -> List[int]: + return getattr(self, param_name) + + def __setitem__(self, param_name: str, value: List[int]) -> None: + setattr(self, param_name, value) + + +class GelloHardware: + JOINT_POSITION_LIMITS = np.array( + [ + [-2.9007, 2.9007], + [-1.8361, 1.8361], + [-2.9007, 2.9007], + [-3.0770, -0.1169], + [-2.8763, 2.8763], + [0.4398, 4.6216], + [-3.0508, 3.0508], + ] + ) + MID_JOINT_POSITIONS = JOINT_POSITION_LIMITS.mean(axis=1) + OPERATING_MODE = 5 + CURRENT_LIMIT = 600 + + def __init__(self, config: GelloArmConfig): + self._com_port = config.com_port + self._num_arm_joints = config.num_arm_joints + self._joint_signs = np.array(config.joint_signs) + self._gripper = config.gripper + self._num_total_joints = self._num_arm_joints + (1 if self._gripper else 0) + self._gripper_range_rad = config.gripper_range_rad + self._assembly_offsets = np.array(config.assembly_offsets) + + self._driver = DynamixelDriver( + ids=list(range(1, self._num_total_joints + 1)), + port=self._com_port, + ) + + self._initial_arm_joints_raw = self._driver.get_joints()[: self._num_arm_joints] + initial_arm_joints = self.normalize_joint_positions( + self._initial_arm_joints_raw, self._assembly_offsets, self._joint_signs + ) + self._prev_arm_joints_raw = self._initial_arm_joints_raw.copy() + self._prev_arm_joints = initial_arm_joints.copy() + + self._dynamixel_control_config = DynamixelControlConfig( + kp_p=config.dynamixel_kp_p.copy(), + kp_i=config.dynamixel_kp_i.copy(), + kp_d=config.dynamixel_kp_d.copy(), + torque_enable=config.dynamixel_torque_enable.copy(), + goal_position=self._goal_position_to_pulses(config.dynamixel_goal_position).copy(), + goal_current=[self.CURRENT_LIMIT] * self._num_total_joints, + operating_mode=[self.OPERATING_MODE] * self._num_total_joints, + ) + + self._initialize_parameters() + self._driver.start_joint_polling() + + @staticmethod + def normalize_joint_positions(raw, offsets, signs): + return (np.mod((raw - offsets) * signs - GelloHardware.MID_JOINT_POSITIONS, 2 * np.pi) + - np.pi + GelloHardware.MID_JOINT_POSITIONS) + + def _initialize_parameters(self): + for name, value in self._dynamixel_control_config: + self._driver.write_value_by_name(name, value) + time.sleep(0.1) + + def get_joint_and_gripper_positions(self) -> Tuple[np.ndarray, float]: + joints_raw = self._driver.get_joints() + arm_joints_raw = joints_raw[: self._num_arm_joints] + + arm_joints_delta = (arm_joints_raw - self._prev_arm_joints_raw) * self._joint_signs + arm_joints = self._prev_arm_joints + arm_joints_delta + self._prev_arm_joints = arm_joints.copy() + self._prev_arm_joints_raw = arm_joints_raw.copy() + + arm_joints_clipped = np.clip(arm_joints, self.JOINT_POSITION_LIMITS[:, 0], self.JOINT_POSITION_LIMITS[:, 1]) + + gripper_pos = 0.0 + if self._gripper: + raw_grp = joints_raw[-1] + gripper_pos = (raw_grp - self._gripper_range_rad[0]) / (self._gripper_range_rad[1] - self._gripper_range_rad[0]) + gripper_pos = max(0.0, min(1.0, gripper_pos)) + + return arm_joints_clipped, gripper_pos + + def _goal_position_to_pulses(self, goals): + arm_goals = np.array(goals[: self._num_arm_joints]) + initial_rotations = np.floor_divide(self._initial_arm_joints_raw - self._assembly_offsets - self.MID_JOINT_POSITIONS, 2 * np.pi) + arm_goals_raw = (initial_rotations * 2 * np.pi + arm_goals + self._assembly_offsets) * self._joint_signs + np.pi + goals_raw = np.append(arm_goals_raw, goals[-1]) if self._gripper else arm_goals_raw + return [self._driver._rad_to_pulses(rad) for rad in goals_raw] + + def close(self): + try: + self._driver.write_value_by_name("torque_enable", [0] * self._num_total_joints) + except: + pass + self._driver.close() + + +# --- RCS Operator Implementation --- + + +@dataclass(kw_only=True) +class GelloConfig(BaseOperatorConfig): + # Dictionary for multi-arm setups: {"left": GelloArmConfig(...), "right": GelloArmConfig(...)} + arms: Dict[str, GelloArmConfig] = field(default_factory=lambda: {"right": GelloArmConfig()}) + + +class GelloOperator(BaseOperator): + control_mode = (ControlMode.JOINTS, RelativeTo.NONE) + + def __init__(self, config: GelloConfig, sim: Sim | None = None): + super().__init__(config, sim) + self.config: GelloConfig + self._resource_lock = threading.Lock() + self._cmd_lock = threading.Lock() + + self._exit_requested = False + self._commands = TeleopCommands() + + self.controller_names = list(self.config.arms.keys()) + + self._last_joints = {name: None for name in self.controller_names} + self._last_gripper = {name: 1.0 for name in self.controller_names} + self._hws: Dict[str, GelloHardware] = {} + + if HAS_PYNPUT: + self._listener = keyboard.Listener(on_press=self._on_press) + self._listener.start() + else: + logger.warning("pynput not found. Keyboard triggers disabled.") + + def _on_press(self, key): + try: + if hasattr(key, "char"): + if key.char == "s": + with self._cmd_lock: + self._commands.sync_position = True + elif key.char == "r": + with self._cmd_lock: + self._commands.failure = True + except AttributeError: + pass + + def consume_commands(self) -> TeleopCommands: + with self._cmd_lock: + cmds = copy.copy(self._commands) + self._commands = TeleopCommands() + return cmds + + def reset_operator_state(self): + # GELLO is absolute, no internal state to reset typically + pass + + def consume_action(self) -> Dict[str, Any]: + actions = {} + with self._resource_lock: + for name in self.controller_names: + if self._last_joints[name] is not None: + actions[name] = { + "joints": self._last_joints[name].copy(), + "gripper": self._last_gripper[name], + } + return actions + + def run(self): + # Initialize all hardware instances + for name, arm_cfg in self.config.arms.items(): + try: + self._hws[name] = GelloHardware(arm_cfg) + except Exception as e: + logger.error(f"Failed to initialize GELLO hardware for {name}: {e}") + + if not self._hws: + logger.error("No GELLO hardware initialized. Exiting.") + return + + rate_limiter = SimpleFrameRate(self.config.read_frequency, "gello readout") + + while not self._exit_requested: + for name, hw in self._hws.items(): + try: + joints, gripper = hw.get_joint_and_gripper_positions() + with self._resource_lock: + self._last_joints[name] = joints + self._last_gripper[name] = gripper + except Exception as e: + logger.warning(f"Error reading GELLO {name}: {e}") + + rate_limiter() + + def close(self): + self._exit_requested = True + if HAS_PYNPUT and hasattr(self, "_listener"): + self._listener.stop() + for hw in self._hws.values(): + hw.close() + if self.is_alive() and threading.current_thread() != self: + self.join(timeout=1.0) diff --git a/python/rcs/operator/interface.py b/python/rcs/operator/interface.py new file mode 100644 index 00000000..0092cb9b --- /dev/null +++ b/python/rcs/operator/interface.py @@ -0,0 +1,227 @@ +from abc import ABC +import copy +from dataclasses import dataclass, field +import logging +import threading +import time +from time import sleep +import gymnasium as gym + +from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo +from rcs.sim.sim import Sim +from rcs.utils import SimpleFrameRate + +logger = logging.getLogger(__name__) + + +@dataclass +class TeleopCommands: + """Semantic commands decoupled from specific hardware buttons.""" + + record: bool = False + success: bool = False + failure: bool = False + sync_position: bool = False + reset_origin_to_current: dict[str, bool] = field(default_factory=dict) + + +@dataclass(kw_only=True) +class BaseOperatorConfig: + read_frequency: int = 30 + simulation: bool = True + + +class BaseOperator(ABC, threading.Thread): + control_mode: tuple[ControlMode, RelativeTo] + controller_names: list[str] = field(default=["left", "right"]) + + def __init__(self, config: BaseOperatorConfig, sim: Sim | None = None): + threading.Thread.__init__(self) + self.config = config + self.sim = sim + + def consume_commands(self) -> TeleopCommands: + """Returns the current commands and resets them (edge-triggered). Must be thread-safe.""" + raise NotImplementedError() + + def reset_operator_state(self): + """Hook for subclasses to reset their internal poses/offsets on env reset. Must be thread-safe.""" + + def run(self): + """Read out hardware, set states and process buttons.""" + raise NotImplementedError() + + def consume_action(self) -> dict[str, ArmWithGripper]: + """Returns the action dictionary to step the environment. Must be thread-safe.""" + raise NotImplementedError() + + def close(self): + pass + + +class TeleopLoop: + """Interface for an operator device""" + + # Define this as a class attribute so it can be accessed without instantiating + control_mode: tuple[ControlMode, RelativeTo] + + def __init__( + self, + env: gym.Env, + operator: BaseOperator, + env_frequency: int = 30, + key_translation: dict[str, str] | None = None, + ): + super().__init__() + self.env = env + self.operator = operator + self._exit_requested = False + self.env_frequency = env_frequency + if key_translation is None: + # controller to robot translation + self.key_translation = {key: key for key in self.operator.controller_names} + else: + self.key_translation = key_translation + + # Absolute operators (RelativeTo.NONE) need an initial sync + self._synced = (self.operator.control_mode[1] != RelativeTo.NONE) + + def stop(self): + self.operator.close() + self._exit_requested = True + self.operator.join() + + def __enter__(self): + self.operator.start() + # sleep(2) + return self + + def __exit__(self, *_): + self.stop() + + def _translate_keys(self, actions): + translated = {self.key_translation[key]: actions[key] for key in actions} + # Fill in missing robots with "hold" actions from last observation + # This is necessary because absolute environments (like MultiRobotWrapper) + # require actions for all configured robots in every step. + for robot_name in self.env.get_wrapper_attr("envs").keys(): + if robot_name not in translated: + if robot_name in self._last_obs: + translated[robot_name] = { + "joints": self._last_obs[robot_name]["joints"].copy(), + "gripper": self._last_obs[robot_name].get("gripper", 1.0) + } + return translated + + def environment_step_loop(self): + rate_limiter = SimpleFrameRate(self.env_frequency, "env loop") + + # 0. Initial Reset to get current positions for untracked robots + self._last_obs, _ = self.env.reset() + + while True: + if self._exit_requested: + break + + # 1. Process Meta-Commands + cmds = self.operator.consume_commands() + + if cmds.record: + print("Command: Start Recording") + self.env.get_wrapper_attr("start_record")() + + if cmds.success: + print("Command: Success! Resetting env...") + self.env.get_wrapper_attr("success")() + sleep(1) # sleep to let the robot reach the goal + self._last_obs, _ = self.env.reset() + self.operator.reset_operator_state() + self._synced = (self.operator.control_mode[1] != RelativeTo.NONE) + # consume new commands because of potential origin reset + continue + + elif cmds.failure: + print("Command: Failure! Resetting env...") + self._last_obs, _ = self.env.reset() + self.operator.reset_operator_state() + self._synced = (self.operator.control_mode[1] != RelativeTo.NONE) + # consume new commands because of potential origin reset + continue + + if cmds.sync_position: + self.sync_robot_to_operator() + self._synced = True + continue + + if not self._synced: + # Still waiting for sync, step the env with "hold" actions + if int(time.time()) % 5 == 0 and int(time.time() * self.env_frequency) % self.env_frequency == 0: + print("Waiting for sync... (Press 's' on GELLO/Keyboard to sync)") + + hold_actions = {} + for robot_name in self.env.get_wrapper_attr("envs").keys(): + if robot_name in self._last_obs and "joints" in self._last_obs[robot_name]: + hold_actions[robot_name] = { + "joints": self._last_obs[robot_name]["joints"].copy(), + "gripper": self._last_obs[robot_name].get("gripper", 1.0), + } + + self._last_obs, _, _, _, _ = self.env.step(hold_actions) + rate_limiter() + continue + + for controller in cmds.reset_origin_to_current: + if cmds.reset_origin_to_current[controller]: + robot = self.key_translation[controller] + print(f"Command: Resetting origin for {robot}...") + assert ( + self.operator.control_mode[1] == RelativeTo.CONFIGURED_ORIGIN + # TODO the following is a dict and can thus not easily be used like this + # and self.env.get_wrapper_attr("relative_to") == RelativeTo.CONFIGURED_ORIGIN + ), "both robot env and operator must be configured to relative_to.CONFIGURED_ORIGIN" + self.env.get_wrapper_attr("envs")[robot].set_origin_to_current() + + # 2. Step the Environment + actions = self.operator.consume_action() + actions = self._translate_keys(actions) + self._last_obs, _, _, _, _ = self.env.step(actions) + + rate_limiter() + + def sync_robot_to_operator(self, duration: float = 3.0): + print(f"Command: Syncing robot to operator (duration: {duration}s)...") + rate_limiter = SimpleFrameRate(self.env_frequency, "sync loop") + num_steps = int(duration * self.env_frequency) + + # 1. Capture the initial state for interpolation + start_obs = copy.deepcopy(self._last_obs) + + # 2. Interpolation Loop + for i in range(num_steps): + alpha = (i + 1) / num_steps + # Re-consume operator action to follow moving target! + target_actions = self._translate_keys(self.operator.consume_action()) + + interp_actions = {} + for robot_name, target in target_actions.items(): + try: + # Interpolate from FIXED start towards MOVING target + s_joints = start_obs[robot_name]["joints"] + t_joints = target["joints"] + interp_joints = s_joints + alpha * (t_joints - s_joints) + + s_gripper = start_obs[robot_name].get("gripper", 1.0) + t_gripper = target.get("gripper", 1.0) + interp_gripper = s_gripper + alpha * (t_gripper - s_gripper) + + interp_actions[robot_name] = { + "joints": interp_joints, + "gripper": interp_gripper, + } + except (KeyError, TypeError): + continue + + self._last_obs, _, _, _, _ = self.env.step(interp_actions) + rate_limiter() + + print("Sync Complete.") diff --git a/python/rcs/operator/pedals.py b/python/rcs/operator/pedals.py new file mode 100644 index 00000000..e1865f51 --- /dev/null +++ b/python/rcs/operator/pedals.py @@ -0,0 +1,111 @@ +import evdev +from evdev import ecodes +import threading +import time + +class FootPedal: + def __init__(self, device_name_substring="Foot Switch"): + """Initializes the foot pedal and starts the background reading thread.""" + self.device_path = self._find_device(device_name_substring) + + if not self.device_path: + raise FileNotFoundError(f"Could not find a device matching '{device_name_substring}'") + + self.device = evdev.InputDevice(self.device_path) + self.device.grab() # Prevent events from leaking into the OS/terminal + + # Dictionary to hold the current state of each key. + # True = Pressed/Held, False = Released + self._key_states = {} + self._lock = threading.Lock() + + # Start the background thread + self._running = True + self._thread = threading.Thread(target=self._read_events, daemon=True) + self._thread.start() + print(f"Connected to {self.device.name} at {self.device_path}") + + def _find_device(self, substring): + """Finds the device path for the foot pedal.""" + for path in evdev.list_devices(): + dev = evdev.InputDevice(path) + if substring.lower() in dev.name.lower(): + return path + return None + + def _read_events(self): + """Background loop that updates the state dictionary.""" + try: + for event in self.device.read_loop(): + if not self._running: + break + + if event.type == ecodes.EV_KEY: + key_event = evdev.categorize(event) + + with self._lock: + # keystate: 1 is DOWN, 2 is HOLD, 0 is UP + is_pressed = key_event.keystate in [1, 2] + + # Store state using the string name of the key (e.g., 'KEY_A') + # If a key resolves to a list (rare, but happens in evdev), take the first one + key_name = key_event.keycode + if isinstance(key_name, list): + key_name = key_name[0] + + self._key_states[key_name] = is_pressed + + except OSError: + pass # Device disconnected or closed + + def get_states(self): + """ + Returns a snapshot of the latest key states. + Example return: {'KEY_A': True, 'KEY_B': False, 'KEY_C': False} + """ + with self._lock: + # Return a copy to ensure thread safety + return self._key_states.copy() + + def get_key_state(self, key_name): + """Returns the state of a specific key, defaulting to False if never pressed.""" + with self._lock: + return self._key_states.get(key_name, False) + + def close(self): + """Cleans up the device and stops the thread.""" + self._running = False + try: + self.device.ungrab() + self.device.close() + except OSError: + pass + +# ========================================== +# Example Usage +# ========================================== +if __name__ == "__main__": + try: + # Initialize the pedal + pedal = FootPedal("Foot Switch") + + # Simulate a typical robotics control loop running at 10Hz + print("Starting control loop... Press Ctrl+C to exit.") + while True: + # Grab the latest states instantly without blocking + states = pedal.get_states() + + if states: + # Print only the keys that are currently pressed + pressed_keys = [key for key, is_pressed in states.items() if is_pressed] + print(f"Currently pressed: {pressed_keys}") + + # Your teleoperation logic goes here... + + time.sleep(0.1) # 10Hz loop + + except KeyboardInterrupt: + print("\nShutting down...") + finally: + if 'pedal' in locals(): + pedal.close() \ No newline at end of file diff --git a/python/rcs/operator/quest.py b/python/rcs/operator/quest.py new file mode 100644 index 00000000..d498a41b --- /dev/null +++ b/python/rcs/operator/quest.py @@ -0,0 +1,239 @@ +import copy +from dataclasses import dataclass +import logging +import threading +from time import sleep + +import numpy as np +from rcs._core.common import Pose, RPY +from rcs.envs.base import ArmWithGripper, ControlMode, GripperDictType, RelativeActionSpace, RelativeTo, TQuatDictType +from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands +from rcs.sim.sim import Sim +from rcs.utils import SimpleFrameRate + +from simpub.core.simpub_server import SimPublisher +from simpub.parser.simdata import SimObject, SimScene +from simpub.sim.mj_publisher import MujocoPublisher +from simpub.xr_device.meta_quest3 import MetaQuest3 + +try: + from simpub.core.simpub_server import SimPublisher + from simpub.parser.simdata import SimObject, SimScene + from simpub.sim.mj_publisher import MujocoPublisher + from simpub.xr_device.meta_quest3 import MetaQuest3 + HAS_SIMPUB = True +except ImportError: + HAS_SIMPUB = False + +logger = logging.getLogger(__name__) + +# download the iris apk from the following repo release: https://github.com/intuitive-robots/IRIS-Meta-Quest3 +# in order to use usb connection install adb install adb +# sudo apt install android-tools-adb +# install it on your quest with +# adb install IRIS-Meta-Quest3.apk + +if HAS_SIMPUB: + class FakeSimPublisher(SimPublisher): + def get_update(self): + return {} + + class FakeSimScene(SimScene): + def __init__(self): + super().__init__() + self.root = SimObject(name="root") + + +@dataclass(kw_only=True) +class QuestConfig(BaseOperatorConfig): + include_rotation: bool = True + mq3_addr: str = "10.42.0.1" + + +class QuestOperator(BaseOperator): + + control_mode = (ControlMode.CARTESIAN_TQuat, RelativeTo.CONFIGURED_ORIGIN) + controller_names = ["left", "right"] + + def __init__(self, config: QuestConfig, sim: Sim | None = None): + super().__init__(config, sim) + if not HAS_SIMPUB: + raise ImportError("simpub is not installed. Please install it to use QuestOperator.") + + self.config: QuestConfig + + self._resource_lock = threading.Lock() + self._cmd_lock = threading.Lock() + + self._trg_btn = {"left": "index_trigger", "right": "index_trigger"} + self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"} + self._start_btn = "A" + self._stop_btn = "B" + self._unsuccessful_btn = "Y" + + self._prev_data = None + self._exit_requested = False + self._grp_pos = {key: 1.0 for key in self.controller_names} # start with opened gripper + self._last_controller_pose = {key: Pose() for key in self.controller_names} + self._offset_pose = {key: Pose() for key in self.controller_names} + + self._commands = TeleopCommands() + self._reset_origin_to_current() + + self._step_env = False + self._set_frame = {key: Pose() for key in self.controller_names} + # if self.config.simulation: + # self._publisher = MujocoPublisher(self.sim.model, self.sim.data, self.config.mq3_addr, visible_geoms_groups=list(range(1, 3))) + if not self.config.simulation: + self._publisher = FakeSimPublisher(FakeSimScene(), self.config.mq3_addr) + # robot_cfg = default_sim_robot_cfg("fr3_empty_world") + # sim_cfg = SimConfig() + # sim_cfg.async_control = True + # twin_env = SimMultiEnvCreator()( + # name2id=ROBOT2IP, + # robot_cfg=robot_cfg, + # control_mode=ControlMode.JOINTS, + # gripper_cfg=default_sim_gripper_cfg(), + # sim_cfg=sim_cfg, + # ) + # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim + # sim.open_gui() + # MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3))) + # env_rel = DigitalTwin(env_rel, twin_env) + self._reader = MetaQuest3("RCSNode") + + def _reset_origin_to_current(self, controller: str | None = None): + with self._cmd_lock: + if controller is None: + self._commands.reset_origin_to_current = {key: True for key in self.controller_names} + else: + self._commands.reset_origin_to_current[controller] = True + + def _reset_state(self): + with self._resource_lock: + for controller in self.controller_names: + self._offset_pose[controller] = Pose() + self._last_controller_pose[controller] = Pose() + self._grp_pos[controller] = 1 + + def consume_commands(self) -> TeleopCommands: + # must be threadsafe + with self._cmd_lock: + cmds = copy.copy(self._commands) + self._commands = TeleopCommands() + return cmds + + def reset_operator_state(self): + """Resets the hardware offsets when the environment resets.""" + self._reset_state() + self._reset_origin_to_current() + + def consume_action(self) -> dict[str, ArmWithGripper]: + transforms = {} + with self._resource_lock: + for controller in self.controller_names: + transform = Pose( + translation=( + self._last_controller_pose[controller].translation() # type: ignore + - self._offset_pose[controller].translation() + ), + quaternion=( + self._last_controller_pose[controller] * self._offset_pose[controller].inverse() + ).rotation_q(), + ) + + set_axes = Pose(quaternion=self._set_frame[controller].rotation_q()) + + transform = set_axes.inverse() * transform * set_axes + if not self.config.include_rotation: + transform = Pose(translation=transform.translation()) # identity rotation + transforms[controller] = TQuatDictType( + tquat=np.concatenate([transform.translation(), transform.rotation_q()]) + ) + transforms[controller].update(GripperDictType(gripper=self._grp_pos[controller])) + return transforms + + def close(self): + self._reader.disconnect() + self._publisher.shutdown() + self._exit_requested = True + self.join() + + def run(self): + rate_limiter = SimpleFrameRate(self.config.read_frequency, "teleop readout") + warning_raised = False + + while not self._exit_requested: + input_data = self._reader.get_controller_data() + + if input_data is None: + if not warning_raised: + logger.warning("[Quest Reader] packets empty") + warning_raised = True + sleep(0.5) + continue + + if warning_raised: + logger.warning("[Quest Reader] packets arriving again") + warning_raised = False + + # === Update Semantic Commands === + with self._cmd_lock: + if input_data[self._start_btn] and (self._prev_data is None or not self._prev_data[self._start_btn]): + self._commands.record = True + + if input_data[self._stop_btn] and (self._prev_data is None or not self._prev_data[self._stop_btn]): + self._commands.success = True + + if input_data[self._unsuccessful_btn] and ( + self._prev_data is None or not self._prev_data[self._unsuccessful_btn] + ): + self._commands.failure = True + + # === Update Poses & Grippers === + for controller in self.controller_names: + last_controller_pose = Pose( + translation=np.array(input_data[controller]["pos"]), + quaternion=np.array(input_data[controller]["rot"]), + ) + # if controller == "left": + # last_controller_pose = ( + # Pose(translation=np.array([0, 0, 0]), rpy=RPY(roll=0, pitch=0, yaw=np.deg2rad(180))) # type: ignore + # * last_controller_pose + # ) + + if input_data[controller][self._trg_btn[controller]] and ( + self._prev_data is None or not self._prev_data[controller][self._trg_btn[controller]] + ): + # trigger just pressed (first data sample with button pressed) + + with self._resource_lock: + self._offset_pose[controller] = last_controller_pose + self._last_controller_pose[controller] = last_controller_pose + + elif not input_data[controller][self._trg_btn[controller]] and ( + self._prev_data is None or self._prev_data[controller][self._trg_btn[controller]] + ): + with self._resource_lock: + self._last_controller_pose[controller] = Pose() + self._offset_pose[controller] = Pose() + self._reset_origin_to_current(controller) + + elif input_data[controller][self._trg_btn[controller]]: + # button is pressed + with self._resource_lock: + self._last_controller_pose[controller] = last_controller_pose + + if input_data[controller][self._grp_btn[controller]] and ( + self._prev_data is None or not self._prev_data[controller][self._grp_btn[controller]] + ): + # just pressed + self._grp_pos[controller] = 0 + if not input_data[controller][self._grp_btn[controller]] and ( + self._prev_data is None or self._prev_data[controller][self._grp_btn[controller]] + ): + # just released + self._grp_pos[controller] = 1 + + self._prev_data = input_data + rate_limiter() diff --git a/repro_multi_robot.py b/repro_multi_robot.py new file mode 100644 index 00000000..57c8cd8e --- /dev/null +++ b/repro_multi_robot.py @@ -0,0 +1,80 @@ + +import gymnasium as gym +import numpy as np +import rcs +from rcs import sim +from rcs.envs.base import RobotEnv, ControlMode, MultiRobotWrapper +from rcs.envs.sim import RobotSimWrapper, MultiSimRobotWrapper +from rcs.envs.utils import default_sim_robot_cfg +import os +import sys + +# Add current directory to sys.path to ensure rcs is importable correctly +sys.path.insert(0, os.path.abspath("python")) + +def test_multi_robot_greenlet(): + scene_name = "fr3_empty_world" + + # Force use of local paths + project_root = os.path.abspath(".") + mjcf_scene = os.path.join(project_root, "assets", "scenes", scene_name, "scene.xml") + mjcf_robot = os.path.join(project_root, "assets", "scenes", scene_name, "robot.xml") + + print(f"Forcing local Scene MJCF: {mjcf_scene}") + + if not os.path.exists(mjcf_scene): + raise FileNotFoundError(f"Could not find local mjcf at {mjcf_scene}") + + # Update rcs.scenes for default_sim_robot_cfg to pick it up if it uses it + rcs.scenes[scene_name].mjcf_scene = mjcf_scene + rcs.scenes[scene_name].mjcf_robot = mjcf_robot + rcs.scenes[scene_name].mjb = None # Force re-parse of XML + + simulation = sim.Sim(mjcf_scene) + + # Setup two robots + envs = {} + for i in range(2): + robot_id = str(i) + cfg = default_sim_robot_cfg(scene_name, robot_id) + cfgkin = default_sim_robot_cfg(scene_name, "0") + + # Kinematic path + # cfg.kinematic_model_path = os.path.join(project_root, "assets", "fr3", "urdf", "fr3.urdf") + # cfg.kinematic_model_path = rcs.scenes["fr3_empty_world"].mjcf_robot + # cfg.arm_collision_geoms = [] + # cfg.attachment_site = "attachment_site_0" + + ik = rcs.common.Pin(cfgkin.kinematic_model_path, cfgkin.attachment_site, urdf=False) + robot = sim.SimRobot(simulation, ik, cfg) + + env = RobotEnv(robot, ControlMode.JOINTS) + # Wrap with RobotSimWrapper + env = RobotSimWrapper(env, simulation) + envs[robot_id] = env + + multi_env = MultiRobotWrapper(envs) + multi_sim_env = MultiSimRobotWrapper(multi_env, simulation) + + print("Resetting multi-robot environment...") + obs, info = multi_sim_env.reset() + print("Reset successful.") + + print("Stepping multi-robot environment...") + actions = { + "0": {"joints": np.zeros(7)}, + "1": {"joints": np.zeros(7)} + } + obs, reward, terminated, truncated, info = multi_sim_env.step(actions) + print("Step successful.") + + multi_sim_env.close() + print("Test passed!") + +if __name__ == "__main__": + try: + test_multi_robot_greenlet() + except Exception as e: + print(f"Test failed with error: {e}") + import traceback + traceback.print_exc() diff --git a/src/pybind/rcs.cpp b/src/pybind/rcs.cpp index 6496ee1f..7a36ed70 100644 --- a/src/pybind/rcs.cpp +++ b/src/pybind/rcs.cpp @@ -472,6 +472,12 @@ PYBIND11_MODULE(_core, m) { .def_readwrite("joints", &rcs::sim::SimRobotConfig::joints) .def_readwrite("actuators", &rcs::sim::SimRobotConfig::actuators) .def_readwrite("base", &rcs::sim::SimRobotConfig::base) + .def("__copy__", [](const rcs::sim::SimRobotConfig &self) { + return rcs::sim::SimRobotConfig(self); + }) + .def("__deepcopy__", [](const rcs::sim::SimRobotConfig &self, py::dict) { + return rcs::sim::SimRobotConfig(self); + }) .def("add_id", &rcs::sim::SimRobotConfig::add_id, py::arg("id")); py::class_(sim, "SimRobotState") @@ -510,6 +516,12 @@ PYBIND11_MODULE(_core, m) { &rcs::sim::SimGripperConfig::max_actuator_width) .def_readwrite("min_actuator_width", &rcs::sim::SimGripperConfig::min_actuator_width) + .def("__copy__", [](const rcs::sim::SimGripperConfig &self) { + return rcs::sim::SimGripperConfig(self); + }) + .def("__deepcopy__", [](const rcs::sim::SimGripperConfig &self, py::dict) { + return rcs::sim::SimGripperConfig(self); + }) .def("add_id", &rcs::sim::SimGripperConfig::add_id, py::arg("id")); py::class_( sim, "SimGripperState")