diff --git a/assets/scenes/fr3_simple_pick_up/scene.xml b/assets/scenes/fr3_simple_pick_up/scene.xml
index 88b1f269..d077d3bb 100644
--- a/assets/scenes/fr3_simple_pick_up/scene.xml
+++ b/assets/scenes/fr3_simple_pick_up/scene.xml
@@ -28,7 +28,7 @@
-
+
diff --git a/examples/teleop/quest_iris_dual_arm.py b/examples/teleop/quest_iris_dual_arm.py
index 6deeb5ae..f1beb455 100644
--- a/examples/teleop/quest_iris_dual_arm.py
+++ b/examples/teleop/quest_iris_dual_arm.py
@@ -3,8 +3,10 @@
from time import sleep
import numpy as np
+import rcs
+from rcs._core import common
from rcs._core.common import RPY, Pose, RobotPlatform
-from rcs._core.sim import SimConfig
+from rcs._core.sim import CameraType, SimCameraConfig, SimConfig
from rcs.camera.hw import HardwareCameraSet
from rcs.envs.base import (
ControlMode,
@@ -19,7 +21,7 @@
from rcs.utils import SimpleFrameRate
from rcs_fr3.creators import RCSFR3MultiEnvCreator
from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg
-from rcs_realsense.utils import default_realsense
+# from rcs_realsense.utils import default_realsense
from simpub.core.simpub_server import SimPublisher
from simpub.parser.simdata import SimObject, SimScene
from simpub.sim.mj_publisher import MujocoPublisher
@@ -39,10 +41,14 @@
# "left": "192.168.102.1",
"right": "192.168.101.1",
}
+ROBOT2ID = {
+ "left": "0",
+ "right": "1",
+}
-# ROBOT_INSTANCE = RobotPlatform.SIMULATION
-ROBOT_INSTANCE = RobotPlatform.HARDWARE
+ROBOT_INSTANCE = RobotPlatform.SIMULATION
+# ROBOT_INSTANCE = RobotPlatform.HARDWARE
RECORD_FPS = 30
# set camera dict to none disable cameras
# CAMERA_DICT = {
@@ -87,7 +93,7 @@ def __init__(self, env: RelativeActionSpace):
self._reset_lock = threading.Lock()
self._env = env
- self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ["right"]
+ self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ROBOT2ID.keys()
self._trg_btn = {"left": "index_trigger", "right": "index_trigger"}
self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"}
self._start_btn = "A"
@@ -100,7 +106,7 @@ def __init__(self, env: RelativeActionSpace):
self._last_controller_pose = {key: Pose() for key in self.controller_names}
self._offset_pose = {key: Pose() for key in self.controller_names}
- for robot in ROBOT2IP:
+ for robot in ROBOT2IP if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ROBOT2ID:
self._env.envs[robot].set_origin_to_current()
self._step_env = False
@@ -326,22 +332,50 @@ def main():
else:
# FR3
- robot_cfg = default_sim_robot_cfg("fr3_empty_world")
+ rcs.scenes["rcs_icra_scene"] = rcs.Scene(
+ mjcf_scene="/home/tobi/coding/rcs_clones/prs/models/scenes/rcs_icra_scene/scene.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+ rcs.scenes["pick"] = rcs.Scene(
+ mjcf_scene="/home/tobi/coding/rcs_clones/prs/assets/scenes/fr3_simple_pick_up/scene.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+
+ # robot_cfg = default_sim_robot_cfg("fr3_empty_world")
+ # robot_cfg = default_sim_robot_cfg("fr3_simple_pick_up")
+ robot_cfg = default_sim_robot_cfg("rcs_icra_scene")
+ # robot_cfg = default_sim_robot_cfg("pick")
+
+ resolution = (256, 256)
+ cameras = {
+ cam: SimCameraConfig(
+ identifier=cam,
+ type=CameraType.fixed,
+ resolution_height=resolution[1],
+ resolution_width=resolution[0],
+ frame_rate=0,
+ )
+ for cam in ["side", "wrist"]
+ }
sim_cfg = SimConfig()
sim_cfg.async_control = True
env_rel = SimMultiEnvCreator()(
- name2id=ROBOT2IP,
+ name2id=ROBOT2ID,
robot_cfg=robot_cfg,
control_mode=ControlMode.CARTESIAN_TQuat,
gripper_cfg=default_sim_gripper_cfg(),
- # cameras=default_mujoco_cameraset_cfg(),
+ # cameras=cameras,
max_relative_movement=0.5,
relative_to=RelativeTo.CONFIGURED_ORIGIN,
sim_cfg=sim_cfg,
)
- sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore
+ # sim = env_rel.unwrapped.envs[ROBOT2ID.keys().__iter__().__next__()].sim # type: ignore
+ sim = env_rel.get_wrapper_attr("sim")
sim.open_gui()
+ # MySimPublisher(MySimScene(), MQ3_ADDR)
MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
env_rel.reset()
diff --git a/extensions/rcs_fr3/src/rcs_fr3/creators.py b/extensions/rcs_fr3/src/rcs_fr3/creators.py
index 0f61e2cc..1a490dbb 100644
--- a/extensions/rcs_fr3/src/rcs_fr3/creators.py
+++ b/extensions/rcs_fr3/src/rcs_fr3/creators.py
@@ -121,7 +121,7 @@ def __call__( # type: ignore
# sim_gui=True,
# truncate_on_collision=False,
# )
- if max_relative_movement is not None:
+ if relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
return env
@@ -137,6 +137,7 @@ def __call__( # type: ignore
camera_set: HardwareCameraSet | None = None,
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
+ robot2world = None,
) -> gym.Env:
ik = rcs.common.Pin(
@@ -163,7 +164,7 @@ def __call__( # type: ignore
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
envs[key] = env
- env = MultiRobotWrapper(envs)
+ env = MultiRobotWrapper(envs, robot2world)
if camera_set is not None:
camera_set.start()
camera_set.wait_for_frames()
diff --git a/python/rcs/__init__.py b/python/rcs/__init__.py
index fdfee7bb..4131ea78 100644
--- a/python/rcs/__init__.py
+++ b/python/rcs/__init__.py
@@ -18,7 +18,7 @@
class Scene:
"""Scene configuration."""
- mjb: str
+ mjb: str | None = None
"""Path to the Mujoco binary scene file."""
mjcf_scene: str
"""Path to the Mujoco scene XML file."""
diff --git a/python/rcs/_core/sim.pyi b/python/rcs/_core/sim.pyi
index 26a187fc..c70d9209 100644
--- a/python/rcs/_core/sim.pyi
+++ b/python/rcs/_core/sim.pyi
@@ -136,6 +136,8 @@ class SimGripperConfig(rcs._core.common.GripperConfig):
min_actuator_width: float
min_joint_width: float
seconds_between_callbacks: float
+ def __copy__(self) -> SimGripperConfig: ...
+ def __deepcopy__(self, arg0: dict) -> SimGripperConfig: ...
def __init__(self) -> None: ...
def add_id(self, id: str) -> None: ...
@@ -169,6 +171,8 @@ class SimRobotConfig(rcs._core.common.RobotConfig):
mjcf_scene_path: str
seconds_between_callbacks: float
trajectory_trace: bool
+ def __copy__(self) -> SimRobotConfig: ...
+ def __deepcopy__(self, arg0: dict) -> SimRobotConfig: ...
def __init__(self) -> None: ...
def add_id(self, id: str) -> None: ...
diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py
index 1518909d..c2734107 100644
--- a/python/rcs/envs/base.py
+++ b/python/rcs/envs/base.py
@@ -167,6 +167,8 @@ class ArmObsType(TQuatDictType, JointsDictType, TRPYDictType): ...
CartOrJointContType: TypeAlias = TQuatDictType | JointsDictType | TRPYDictType
LimitedCartOrJointContType: TypeAlias = LimitedTQuatRelDictType | LimitedJointsRelDictType | LimitedTRPYRelDictType
+class ArmWithGripper(TQuatDictType, GripperDictType): ...
+
class ControlMode(Enum):
JOINTS = auto()
@@ -310,9 +312,26 @@ def close(self):
class MultiRobotWrapper(gym.Env):
"""Wraps a dictionary of environments to allow for multi robot control."""
- def __init__(self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper]):
+ def __init__(self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper], robot2world: dict[str, common.Pose] | None = None):
self.envs = envs
self.unwrapped_multi = cast(dict[str, RobotEnv], {key: env.unwrapped for key, env in envs.items()})
+ if robot2world is None:
+ self.robot2world = {}
+ else:
+ self.robot2world = robot2world
+
+ def _translate_pose(self, key, dic, to_world=True):
+ r2w = self.robot2world.get(key, common.Pose())
+ if not to_world:
+ r2w = r2w.inverse()
+ if "tquat" in dic:
+ p = r2w * common.Pose(translation=dic["tquat"][:3], quaternion=dic["tquat"][3:]) * r2w.inverse()
+ dic["tquat"] = np.concatenate([p.translation(), p.rotation_q()])
+ if "xyzrpy" in dic:
+ p = r2w * common.Pose(translation=dic["xyzrpy"][:3], rpy_vector=dic["xyzrpy"][3:]) * r2w.inverse()
+ dic["xyzrpy"] = p.xyzrpy()
+
+ return dic
def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
# follows gym env by combinding a dict of envs into a single env
@@ -322,7 +341,11 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo
truncated = False
info = {}
for key, env in self.envs.items():
- obs[key], r, t, tr, info[key] = env.step(action[key])
+ act = self._translate_pose(key, action[key], to_world=False)
+ ob, r, t, tr, info[key] = env.step(act)
+ obs[key] = self._translate_pose(key, ob, to_world=True)
+ # old
+ # obs[key], r, t, tr, info[key] = env.step(action[key])
reward += float(r)
terminated = terminated or t
truncated = truncated or tr
@@ -358,6 +381,7 @@ def close(self):
class RelativeTo(Enum):
LAST_STEP = auto()
CONFIGURED_ORIGIN = auto()
+ NONE = auto()
class RelativeActionSpace(gym.ActionWrapper):
diff --git a/python/rcs/envs/creators.py b/python/rcs/envs/creators.py
index 64bcce19..b6f884aa 100644
--- a/python/rcs/envs/creators.py
+++ b/python/rcs/envs/creators.py
@@ -1,3 +1,4 @@
+import copy
import logging
import typing
from functools import partial
@@ -146,33 +147,40 @@ def __call__( # type: ignore
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
sim_wrapper: Type[SimWrapper] | None = None,
+ robot2world=None,
) -> gym.Env:
simulation = sim.Sim(robot_cfg.mjcf_scene_path, sim_cfg)
+ print(robot_cfg.kinematic_model_path)
+ print(robot_cfg.attachment_site)
ik = rcs.common.Pin(
robot_cfg.kinematic_model_path,
- robot_cfg.attachment_site,
+ robot_cfg.attachment_site + "_0",
urdf=robot_cfg.kinematic_model_path.endswith(".urdf"),
)
# ik = rcs_robotics_library._core.rl.RoboticsLibraryIK(robot_cfg.kinematic_model_path)
robots: dict[str, rcs.sim.SimRobot] = {}
- for key in name2id:
- robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=robot_cfg)
+ for key, mid in name2id.items():
+ cfg = copy.copy(robot_cfg)
+ cfg.add_id(mid)
+ robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=cfg)
envs = {}
- for key in name2id:
+ for key, mid in name2id.items():
env: gym.Env = RobotEnv(robots[key], control_mode)
- env = RobotSimWrapper(env, simulation, sim_wrapper)
if gripper_cfg is not None:
- gripper = rcs.sim.SimGripper(simulation, gripper_cfg)
+ cfg = copy.copy(gripper_cfg)
+ cfg.add_id(mid)
+ gripper = rcs.sim.SimGripper(simulation, cfg)
env = GripperWrapper(env, gripper, binary=True)
- if max_relative_movement is not None:
+ if relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
envs[key] = env
- env = MultiRobotWrapper(envs)
+ env = MultiRobotWrapper(envs, robot2world)
+ env = RobotSimWrapper(env, simulation, sim_wrapper)
if cameras is not None:
camera_set = typing.cast(
BaseCameraSet, SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True)
diff --git a/python/rcs/envs/sim.py b/python/rcs/envs/sim.py
index 589b49fd..1479230a 100644
--- a/python/rcs/envs/sim.py
+++ b/python/rcs/envs/sim.py
@@ -3,6 +3,7 @@
import gymnasium as gym
import numpy as np
+from greenlet import getcurrent, greenlet
from rcs.envs.base import (
ControlMode,
GripperWrapper,
@@ -45,11 +46,16 @@ def __init__(self, env, simulation: sim.Sim, sim_wrapper: Type[SimWrapper] | Non
self.sim = simulation
cfg = self.sim.get_config()
self.frame_rate = SimpleFrameRate(1 / cfg.frequency, "RobotSimWrapper")
+ self.main_greenlet = None
def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]:
obs, _, _, _, info = super().step(action)
cfg = self.sim.get_config()
- if cfg.async_control:
+
+ if self.main_greenlet is not None:
+ self.sim_robot.clear_collision_flag()
+ self.main_greenlet.switch()
+ elif cfg.async_control:
self.sim.step(round(1 / cfg.frequency / self.sim.model.opt.timestep))
if cfg.realtime:
self.frame_rate.frame_rate = 1 / cfg.frequency
@@ -72,10 +78,14 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo
def reset(
self, *, seed: int | None = None, options: dict[str, Any] | None = None
) -> tuple[dict[str, Any], dict[str, Any]]:
- self.sim.reset()
+ if self.main_greenlet is None:
+ self.sim.reset()
obs, info = super().reset(seed=seed, options=options)
- self.sim.step(1)
- # todo: an obs method that is recursive over wrappers would be needed
+
+ if self.main_greenlet is not None:
+ self.main_greenlet.switch()
+ else:
+ self.sim.step(1)
obs.update(self.unwrapped.get_obs())
return obs, info
@@ -88,20 +98,63 @@ def __init__(self, env: MultiRobotWrapper, simulation: sim.Sim):
self.env: MultiRobotWrapper
self.sim = simulation
self.sim_robots = cast(dict[str, sim.SimRobot], {key: e.robot for key, e in self.env.unwrapped_multi.items()})
+ self._inject_main_greenlet()
+
+ def _inject_main_greenlet(self):
+ main_gr = getcurrent()
+ for env_item in self.env.envs.values():
+ curr = env_item
+ while True:
+ if isinstance(curr, RobotSimWrapper):
+ curr.main_greenlet = main_gr
+ break
+ if not hasattr(curr, "env"):
+ break
+ curr = curr.env
def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]:
- _, _, _, _, info = super().step(action)
+ # 1. DOWN: Set actions for all robots
+ step_greenlets = {}
+ for key, env in self.env.envs.items():
+
+ def make_step_gr(env_to_step):
+ return greenlet(env_to_step.step)
+
+ gr = make_step_gr(env)
+ step_greenlets[key] = gr
+ # Translate action
+ act = self.env._translate_pose(key, action[key], to_world=False)
+
+ # Switch to robot greenlet. It will run until RobotSimWrapper.step switches back.
+ gr.switch(act)
+
+ # 2. SIM: Step physics once
self.sim.step_until_convergence()
- info["is_sim_converged"] = self.sim.is_converged()
- for key in self.envs.envs.items():
- state = self.sim_robots[key].get_state()
- info[key]["collision"] = state.collision
- info[key]["ik_success"] = state.ik_success
- obs = {key: env.get_obs() for key, env in self.env.unwrapped_multi.items()}
- truncated = np.all([info[key]["collision"] or info[key]["ik_success"] for key in info])
- return obs, 0.0, False, bool(truncated), info
+ # 3. UP: Gather observations
+ obs = {}
+ reward = 0.0
+ terminated = False
+ truncated = False
+ info = {}
+
+ for key in self.env.envs:
+ # Resume robot greenlet. It returns the step results.
+ res = step_greenlets[key].switch()
+ ob, r, t, tr, i = res
+
+ # Translate observation back to world coordinates
+ obs[key] = self.env._translate_pose(key, ob, to_world=True)
+ reward += float(r)
+ terminated = terminated or t
+ truncated = truncated or tr
+ info[key] = i
+ info[key]["terminated"] = t
+ info[key]["truncated"] = tr
+ info[key]["is_sim_converged"] = self.sim.is_converged()
+
+ return obs, reward, terminated, truncated, info
def reset( # type: ignore
self, *, seed: dict[str, int | None] | None = None, options: dict[str, Any] | None = None
@@ -113,11 +166,27 @@ def reset( # type: ignore
obs = {}
info = {}
self.sim.reset()
+
+ # 1. DOWN: Reset each robot
+ reset_greenlets = {}
for key, env in self.env.envs.items():
- _, info[key] = env.reset(seed=seed[key], options=options[key])
+
+ def make_reset_gr(env_to_reset, s, o):
+ return greenlet(lambda: env_to_reset.reset(seed=s, options=o))
+
+ gr = make_reset_gr(env, seed[key], options[key])
+ reset_greenlets[key] = gr
+ gr.switch()
+
+ # 2. SIM: Initial step
self.sim.step(1)
- for key, env in self.env.unwrapped_multi.items():
- obs[key] = cast(dict, env.get_obs())
+
+ # 3. UP: Gather initial obs
+ for key in self.env.envs:
+ ob, i = reset_greenlets[key].switch()
+ obs[key] = self.env._translate_pose(key, ob, to_world=True)
+ info[key] = i
+
return obs, info
diff --git a/python/rcs/envs/utils.py b/python/rcs/envs/utils.py
index 1d440098..98ec8288 100644
--- a/python/rcs/envs/utils.py
+++ b/python/rcs/envs/utils.py
@@ -19,6 +19,7 @@ def default_sim_robot_cfg(scene: str = "fr3_empty_world", idx: str = "0") -> sim
robot_cfg = rcs.sim.SimRobotConfig()
robot_cfg.robot_type = rcs.scenes[scene].robot_type
robot_cfg.tcp_offset = common.Pose(common.FrankaHandTCPOffset())
+ robot_cfg.arm_collision_geoms = []
robot_cfg.add_id(idx)
if rcs.scenes[scene].mjb is not None:
robot_cfg.mjcf_scene_path = rcs.scenes[scene].mjb
@@ -38,7 +39,9 @@ def default_tilburg_hw_hand_cfg(file: str | PathLike | None = None) -> THConfig:
def default_sim_gripper_cfg(idx: str = "0") -> sim.SimGripperConfig:
cfg = sim.SimGripperConfig()
- cfg.add_id(idx)
+ cfg.collision_geoms = []
+ cfg.collision_geoms_fingers = []
+ # cfg.add_id(idx)
return cfg
diff --git a/python/rcs/operator/__init__.py b/python/rcs/operator/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/rcs/operator/franka.py b/python/rcs/operator/franka.py
new file mode 100644
index 00000000..d2e73821
--- /dev/null
+++ b/python/rcs/operator/franka.py
@@ -0,0 +1,183 @@
+import logging
+import threading
+from time import sleep
+
+import numpy as np
+import rcs
+from rcs._core.common import RPY, Pose, RobotPlatform
+from rcs._core import common
+from rcs._core.sim import SimConfig
+from rcs.camera.hw import HardwareCameraSet
+from rcs.envs.base import (
+ ControlMode,
+ GripperDictType,
+ LimitedTQuatRelDictType,
+ RelativeActionSpace,
+ RelativeTo,
+)
+from rcs.envs.creators import SimMultiEnvCreator
+from rcs.envs.storage_wrapper import StorageWrapper
+from rcs.envs.utils import default_digit, default_sim_gripper_cfg, default_sim_robot_cfg
+from rcs.utils import SimpleFrameRate
+from rcs_fr3.creators import RCSFR3MultiEnvCreator
+from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg
+from rcs_realsense.utils import default_realsense
+from rcs.operator.gello import GelloConfig, GelloOperator, GelloArmConfig
+from rcs.operator.interface import TeleopLoop
+
+from simpub.core.simpub_server import SimPublisher
+from simpub.parser.simdata import SimObject, SimScene
+from simpub.sim.mj_publisher import MujocoPublisher
+from simpub.xr_device.meta_quest3 import MetaQuest3
+
+class FakeSimPublisher(SimPublisher):
+ def get_update(self):
+ return {}
+
+class FakeSimScene(SimScene):
+ def __init__(self):
+ super().__init__()
+ self.root = SimObject(name="root")
+
+logger = logging.getLogger(__name__)
+
+
+ROBOT2IP = {
+ # "left": "192.168.102.1",
+ "right": "192.168.101.1",
+}
+ROBOT2ID = {
+ "left": "1",
+ "right": "0",
+}
+
+
+ROBOT_INSTANCE = RobotPlatform.SIMULATION
+# ROBOT_INSTANCE = RobotPlatform.HARDWARE
+
+RECORD_FPS = 30
+# set camera dict to none disable cameras
+# CAMERA_DICT = {
+# "left_wrist": "230422272017",
+# "right_wrist": "230422271040",
+# "side": "243522070385",
+# "bird_eye": "243522070364",
+# }
+CAMERA_DICT = None
+MQ3_ADDR = "10.42.0.1"
+
+# DIGIT_DICT = {
+# "digit_right_left": "D21182",
+# "digit_right_right": "D21193"
+# }
+DIGIT_DICT = None
+
+
+DATASET_PATH = "test_data_iris_dual_arm14"
+INSTRUCTION = "build a tower with the blocks in front of you"
+
+robot2world={"left": rcs.common.Pose(translation=[0, 0, 0], rpy_vector=[0.89360858, -0.17453293, 0.46425758]),
+ "right": rcs.common.Pose(translation=[0, 0, 0], rpy_vector=[-0.89360858, -0.17453293, -0.46425758])}
+
+# config = QuestConfig(mq3_addr=MQ3_ADDR, simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION)
+config = GelloConfig(
+ arms={
+ "right": GelloArmConfig(com_port="/dev/ttyACM1"),
+ "left": GelloArmConfig(com_port="/dev/ttyACM0"),
+ },
+ simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION,
+)
+
+
+def get_env():
+ if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
+
+ cams = []
+ if CAMERA_DICT is not None:
+ cams.append(default_realsense(CAMERA_DICT))
+ if DIGIT_DICT is not None:
+ cams.append(default_digit(DIGIT_DICT))
+
+ camera_set = HardwareCameraSet(cams) if cams else None
+
+ env_rel = RCSFR3MultiEnvCreator()(
+ name2ip=ROBOT2IP,
+ camera_set=camera_set,
+ robot_cfg=default_fr3_hw_robot_cfg(async_control=True),
+ control_mode=GelloOperator.control_mode[0],
+ gripper_cfg=default_fr3_hw_gripper_cfg(async_control=True),
+ max_relative_movement=(0.5, np.deg2rad(90)),
+ relative_to=GelloOperator.control_mode[1],
+ robot2world=robot2world,
+ )
+ env_rel = StorageWrapper(
+ env_rel, DATASET_PATH, INSTRUCTION, batch_size=32, max_rows_per_group=100, max_rows_per_file=1000
+ )
+ operator = GelloOperator(config)
+ else:
+ # FR3
+ rcs.scenes["rcs_icra_scene"] = rcs.Scene(
+ mjcf_scene="/home/tobi/coding/rcs_clones/prs/models/scenes/rcs_icra_scene/scene.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+ rcs.scenes["pick"] = rcs.Scene(
+ mjcf_scene="/home/tobi/coding/rcs_clones/prs/assets/scenes/fr3_simple_pick_up/scene.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+ rcs.scenes["duo"] = rcs.Scene(
+ mjcf_scene="/home/tobi/coding/rcs_repos/clone1/rcs_models/assets/scenes/fr3_duo_empty_world/scene.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+
+ # robot_cfg = default_sim_robot_cfg("fr3_empty_world")
+ # robot_cfg = default_sim_robot_cfg("fr3_simple_pick_up")
+ # robot_cfg = default_sim_robot_cfg("rcs_icra_scene")
+ # robot_cfg = default_sim_robot_cfg("pick")
+ robot_cfg = default_sim_robot_cfg("duo")
+
+ # resolution = (256, 256)
+ # cameras = {
+ # cam: SimCameraConfig(
+ # identifier=cam,
+ # type=CameraType.fixed,
+ # resolution_height=resolution[1],
+ # resolution_width=resolution[0],
+ # frame_rate=0,
+ # )
+ # for cam in ["side", "wrist"]
+ # }
+
+ sim_cfg = SimConfig()
+ sim_cfg.async_control = True
+ env_rel = SimMultiEnvCreator()(
+ name2id=ROBOT2ID,
+ robot_cfg=robot_cfg,
+ control_mode=GelloOperator.control_mode[0],
+ gripper_cfg=default_sim_gripper_cfg(),
+ # cameras=default_mujoco_cameraset_cfg(),
+ max_relative_movement=0.5,
+ relative_to=GelloOperator.control_mode[1],
+ sim_cfg=sim_cfg,
+ robot2world=robot2world,
+ )
+ # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore
+ sim = env_rel.get_wrapper_attr("sim")
+ operator = GelloOperator(config, sim)
+ sim.open_gui()
+ MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
+ return env_rel, operator
+
+
+def main():
+ env_rel, operator = get_env()
+ env_rel.reset()
+ tele = TeleopLoop(env_rel, operator)
+ with env_rel, tele: # type: ignore
+ tele.environment_step_loop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python/rcs/operator/gello.py b/python/rcs/operator/gello.py
new file mode 100644
index 00000000..26e9d810
--- /dev/null
+++ b/python/rcs/operator/gello.py
@@ -0,0 +1,433 @@
+import copy
+import logging
+import threading
+import time
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional, Sequence, Tuple, TypedDict, Iterator
+
+import numpy as np
+
+try:
+ from dynamixel_sdk.group_sync_read import GroupSyncRead
+ from dynamixel_sdk.group_sync_write import GroupSyncWrite
+ from dynamixel_sdk.packet_handler import PacketHandler
+ from dynamixel_sdk.port_handler import PortHandler
+ from dynamixel_sdk.robotis_def import COMM_SUCCESS
+ HAS_DYNAMIXEL_SDK = True
+except ImportError:
+ HAS_DYNAMIXEL_SDK = False
+
+try:
+ from pynput import keyboard
+ HAS_PYNPUT = True
+except ImportError:
+ HAS_PYNPUT = False
+
+from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo, JointsDictType, GripperDictType
+from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+logger = logging.getLogger(__name__)
+
+# --- Dynamixel Driver Constants and Helpers ---
+
+XL330_CONTROL_TABLE = {
+ "model_number": {"addr": 0, "len": 2},
+ "operating_mode": {"addr": 11, "len": 1},
+ "torque_enable": {"addr": 64, "len": 1},
+ "kp_d": {"addr": 80, "len": 2},
+ "kp_i": {"addr": 82, "len": 2},
+ "kp_p": {"addr": 84, "len": 2},
+ "goal_current": {"addr": 102, "len": 2},
+ "goal_position": {"addr": 116, "len": 4},
+ "present_position": {"addr": 132, "len": 4},
+}
+
+
+class DynamixelDriver:
+ """Simplified Dynamixel driver adapted for RCS."""
+
+ def __init__(
+ self,
+ ids: Sequence[int],
+ port: str = "/dev/ttyUSB0",
+ baudrate: int = 57600,
+ pulses_per_revolution: int = 4095,
+ ):
+ if not HAS_DYNAMIXEL_SDK:
+ raise ImportError("dynamixel_sdk is not installed. Please install it to use GelloOperator.")
+
+ self._ids = ids
+ self._port = port
+ self._baudrate = baudrate
+ self._pulses_per_revolution = pulses_per_revolution
+ self._lock = threading.Lock()
+ self._buffered_joint_positions = None
+
+ self._portHandler = PortHandler(self._port)
+ self._packetHandler = PacketHandler(2.0)
+
+ self._groupSyncReadHandlers = {}
+ self._groupSyncWriteHandlers = {}
+
+ for key, entry in XL330_CONTROL_TABLE.items():
+ self._groupSyncReadHandlers[key] = GroupSyncRead(
+ self._portHandler, self._packetHandler, entry["addr"], entry["len"]
+ )
+ for dxl_id in self._ids:
+ self._groupSyncReadHandlers[key].addParam(dxl_id)
+
+ if key != "model_number" and key != "present_position":
+ self._groupSyncWriteHandlers[key] = GroupSyncWrite(
+ self._portHandler, self._packetHandler, entry["addr"], entry["len"]
+ )
+
+ if not self._portHandler.openPort():
+ raise ConnectionError(f"Failed to open port {self._port}")
+ if not self._portHandler.setBaudRate(self._baudrate):
+ raise ConnectionError(f"Failed to set baudrate {self._baudrate}")
+
+ self._stop_thread = threading.Event()
+ self._polling_thread = None
+ self._is_polling = False
+
+ def write_value_by_name(self, name: str, values: Sequence[int | None]):
+ if len(values) != len(self._ids):
+ raise ValueError(f"The length of {name} must match the number of servos")
+
+ handler = self._groupSyncWriteHandlers[name]
+ value_len = XL330_CONTROL_TABLE[name]["len"]
+
+ with self._lock:
+ for dxl_id, value in zip(self._ids, values):
+ if value is None:
+ continue
+ param = [(value >> (8 * i)) & 0xFF for i in range(value_len)]
+ handler.addParam(dxl_id, param)
+
+ comm_result = handler.txPacket()
+ if comm_result != COMM_SUCCESS:
+ handler.clearParam()
+ raise RuntimeError(f"Failed to syncwrite {name}: {self._packetHandler.getTxRxResult(comm_result)}")
+ handler.clearParam()
+
+ def read_value_by_name(self, name: str) -> List[int]:
+ handler = self._groupSyncReadHandlers[name]
+ value_len = XL330_CONTROL_TABLE[name]["len"]
+ addr = XL330_CONTROL_TABLE[name]["addr"]
+
+ with self._lock:
+ comm_result = handler.txRxPacket()
+ if comm_result != COMM_SUCCESS:
+ raise RuntimeError(f"Failed to sync read {name}: {self._packetHandler.getTxRxResult(comm_result)}")
+
+ values = []
+ for dxl_id in self._ids:
+ if handler.isAvailable(dxl_id, addr, value_len):
+ value = handler.getData(dxl_id, addr, value_len)
+ value = int(np.int32(np.uint32(value)))
+ values.append(value)
+ else:
+ raise RuntimeError(f"Failed to get {name} for ID {dxl_id}")
+ return values
+
+ def start_joint_polling(self):
+ if self._is_polling:
+ return
+ self._stop_thread.clear()
+ self._polling_thread = threading.Thread(target=self._joint_polling_loop, daemon=True)
+ self._polling_thread.start()
+ self._is_polling = True
+
+ def stop_joint_polling(self):
+ if not self._is_polling:
+ return
+ self._stop_thread.set()
+ if self._polling_thread:
+ self._polling_thread.join()
+ self._is_polling = False
+
+ def _joint_polling_loop(self):
+ while not self._stop_thread.is_set():
+ time.sleep(0.001)
+ try:
+ self._buffered_joint_positions = np.array(self.read_value_by_name("present_position"), dtype=int)
+ except RuntimeError as e:
+ logger.warning(f"Polling error: {e}")
+
+ def get_joints(self) -> np.ndarray:
+ if self._is_polling:
+ while self._buffered_joint_positions is None:
+ time.sleep(0.01)
+ return self._pulses_to_rad(self._buffered_joint_positions.copy())
+ return self._pulses_to_rad(np.array(self.read_value_by_name("present_position"), dtype=int))
+
+ def _pulses_to_rad(self, pulses) -> np.ndarray:
+ return np.array(pulses) / self._pulses_per_revolution * 2 * np.pi
+
+ def _rad_to_pulses(self, rad: float) -> int:
+ return int(rad / (2 * np.pi) * self._pulses_per_revolution)
+
+ def close(self):
+ self.stop_joint_polling()
+ if self._portHandler:
+ self._portHandler.closePort()
+
+
+# --- Gello Hardware Interface Logic ---
+
+@dataclass
+class GelloArmConfig:
+ com_port: str = "/dev/ttyUSB0"
+ num_arm_joints: int = 7
+ joint_signs: List[int] = field(default_factory=lambda: [1, -1, 1, -1, 1, 1, 1])
+ gripper: bool = True
+ gripper_range_rad: List[float] = field(default_factory=lambda: [2.00, 3.22])
+ assembly_offsets: List[float] = field(
+ default_factory=lambda: [0.000, 0.000, 3.142, 3.142, 3.142, 4.712, 0.000]
+ )
+ dynamixel_kp_p: List[int] = field(
+ default_factory=lambda: [30, 60, 0, 30, 0, 0, 0, 50]
+ )
+ dynamixel_kp_i: List[int] = field(
+ default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0]
+ )
+ dynamixel_kp_d: List[int] = field(
+ default_factory=lambda: [250, 100, 80, 60, 30, 10, 5, 0]
+ )
+ dynamixel_torque_enable: List[int] = field(
+ default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0]
+ )
+ dynamixel_goal_position: List[float] = field(
+ default_factory=lambda: [0.0, 0.0, 0.0, -1.571, 0.0, 1.571, 0.0, 3.509]
+ )
+
+
+
+@dataclass
+class DynamixelControlConfig:
+ kp_p: List[int] = field(default_factory=list)
+ kp_i: List[int] = field(default_factory=list)
+ kp_d: List[int] = field(default_factory=list)
+ torque_enable: List[int] = field(default_factory=list)
+ goal_position: List[int] = field(default_factory=list)
+ goal_current: List[int] = field(default_factory=list)
+ operating_mode: List[int] = field(default_factory=list)
+
+ _UPDATE_ORDER = [
+ "operating_mode",
+ "goal_current",
+ "kp_p",
+ "kp_i",
+ "kp_d",
+ "torque_enable",
+ "goal_position",
+ ]
+
+ def __iter__(self) -> Iterator[Tuple[str, List[int]]]:
+ for param_name in self._UPDATE_ORDER:
+ if hasattr(self, param_name):
+ yield param_name, getattr(self, param_name)
+
+ def __getitem__(self, param_name: str) -> List[int]:
+ return getattr(self, param_name)
+
+ def __setitem__(self, param_name: str, value: List[int]) -> None:
+ setattr(self, param_name, value)
+
+
+class GelloHardware:
+ JOINT_POSITION_LIMITS = np.array(
+ [
+ [-2.9007, 2.9007],
+ [-1.8361, 1.8361],
+ [-2.9007, 2.9007],
+ [-3.0770, -0.1169],
+ [-2.8763, 2.8763],
+ [0.4398, 4.6216],
+ [-3.0508, 3.0508],
+ ]
+ )
+ MID_JOINT_POSITIONS = JOINT_POSITION_LIMITS.mean(axis=1)
+ OPERATING_MODE = 5
+ CURRENT_LIMIT = 600
+
+ def __init__(self, config: GelloArmConfig):
+ self._com_port = config.com_port
+ self._num_arm_joints = config.num_arm_joints
+ self._joint_signs = np.array(config.joint_signs)
+ self._gripper = config.gripper
+ self._num_total_joints = self._num_arm_joints + (1 if self._gripper else 0)
+ self._gripper_range_rad = config.gripper_range_rad
+ self._assembly_offsets = np.array(config.assembly_offsets)
+
+ self._driver = DynamixelDriver(
+ ids=list(range(1, self._num_total_joints + 1)),
+ port=self._com_port,
+ )
+
+ self._initial_arm_joints_raw = self._driver.get_joints()[: self._num_arm_joints]
+ initial_arm_joints = self.normalize_joint_positions(
+ self._initial_arm_joints_raw, self._assembly_offsets, self._joint_signs
+ )
+ self._prev_arm_joints_raw = self._initial_arm_joints_raw.copy()
+ self._prev_arm_joints = initial_arm_joints.copy()
+
+ self._dynamixel_control_config = DynamixelControlConfig(
+ kp_p=config.dynamixel_kp_p.copy(),
+ kp_i=config.dynamixel_kp_i.copy(),
+ kp_d=config.dynamixel_kp_d.copy(),
+ torque_enable=config.dynamixel_torque_enable.copy(),
+ goal_position=self._goal_position_to_pulses(config.dynamixel_goal_position).copy(),
+ goal_current=[self.CURRENT_LIMIT] * self._num_total_joints,
+ operating_mode=[self.OPERATING_MODE] * self._num_total_joints,
+ )
+
+ self._initialize_parameters()
+ self._driver.start_joint_polling()
+
+ @staticmethod
+ def normalize_joint_positions(raw, offsets, signs):
+ return (np.mod((raw - offsets) * signs - GelloHardware.MID_JOINT_POSITIONS, 2 * np.pi)
+ - np.pi + GelloHardware.MID_JOINT_POSITIONS)
+
+ def _initialize_parameters(self):
+ for name, value in self._dynamixel_control_config:
+ self._driver.write_value_by_name(name, value)
+ time.sleep(0.1)
+
+ def get_joint_and_gripper_positions(self) -> Tuple[np.ndarray, float]:
+ joints_raw = self._driver.get_joints()
+ arm_joints_raw = joints_raw[: self._num_arm_joints]
+
+ arm_joints_delta = (arm_joints_raw - self._prev_arm_joints_raw) * self._joint_signs
+ arm_joints = self._prev_arm_joints + arm_joints_delta
+ self._prev_arm_joints = arm_joints.copy()
+ self._prev_arm_joints_raw = arm_joints_raw.copy()
+
+ arm_joints_clipped = np.clip(arm_joints, self.JOINT_POSITION_LIMITS[:, 0], self.JOINT_POSITION_LIMITS[:, 1])
+
+ gripper_pos = 0.0
+ if self._gripper:
+ raw_grp = joints_raw[-1]
+ gripper_pos = (raw_grp - self._gripper_range_rad[0]) / (self._gripper_range_rad[1] - self._gripper_range_rad[0])
+ gripper_pos = max(0.0, min(1.0, gripper_pos))
+
+ return arm_joints_clipped, gripper_pos
+
+ def _goal_position_to_pulses(self, goals):
+ arm_goals = np.array(goals[: self._num_arm_joints])
+ initial_rotations = np.floor_divide(self._initial_arm_joints_raw - self._assembly_offsets - self.MID_JOINT_POSITIONS, 2 * np.pi)
+ arm_goals_raw = (initial_rotations * 2 * np.pi + arm_goals + self._assembly_offsets) * self._joint_signs + np.pi
+ goals_raw = np.append(arm_goals_raw, goals[-1]) if self._gripper else arm_goals_raw
+ return [self._driver._rad_to_pulses(rad) for rad in goals_raw]
+
+ def close(self):
+ try:
+ self._driver.write_value_by_name("torque_enable", [0] * self._num_total_joints)
+ except:
+ pass
+ self._driver.close()
+
+
+# --- RCS Operator Implementation ---
+
+
+@dataclass(kw_only=True)
+class GelloConfig(BaseOperatorConfig):
+ # Dictionary for multi-arm setups: {"left": GelloArmConfig(...), "right": GelloArmConfig(...)}
+ arms: Dict[str, GelloArmConfig] = field(default_factory=lambda: {"right": GelloArmConfig()})
+
+
+class GelloOperator(BaseOperator):
+ control_mode = (ControlMode.JOINTS, RelativeTo.NONE)
+
+ def __init__(self, config: GelloConfig, sim: Sim | None = None):
+ super().__init__(config, sim)
+ self.config: GelloConfig
+ self._resource_lock = threading.Lock()
+ self._cmd_lock = threading.Lock()
+
+ self._exit_requested = False
+ self._commands = TeleopCommands()
+
+ self.controller_names = list(self.config.arms.keys())
+
+ self._last_joints = {name: None for name in self.controller_names}
+ self._last_gripper = {name: 1.0 for name in self.controller_names}
+ self._hws: Dict[str, GelloHardware] = {}
+
+ if HAS_PYNPUT:
+ self._listener = keyboard.Listener(on_press=self._on_press)
+ self._listener.start()
+ else:
+ logger.warning("pynput not found. Keyboard triggers disabled.")
+
+ def _on_press(self, key):
+ try:
+ if hasattr(key, "char"):
+ if key.char == "s":
+ with self._cmd_lock:
+ self._commands.sync_position = True
+ elif key.char == "r":
+ with self._cmd_lock:
+ self._commands.failure = True
+ except AttributeError:
+ pass
+
+ def consume_commands(self) -> TeleopCommands:
+ with self._cmd_lock:
+ cmds = copy.copy(self._commands)
+ self._commands = TeleopCommands()
+ return cmds
+
+ def reset_operator_state(self):
+ # GELLO is absolute, no internal state to reset typically
+ pass
+
+ def consume_action(self) -> Dict[str, Any]:
+ actions = {}
+ with self._resource_lock:
+ for name in self.controller_names:
+ if self._last_joints[name] is not None:
+ actions[name] = {
+ "joints": self._last_joints[name].copy(),
+ "gripper": self._last_gripper[name],
+ }
+ return actions
+
+ def run(self):
+ # Initialize all hardware instances
+ for name, arm_cfg in self.config.arms.items():
+ try:
+ self._hws[name] = GelloHardware(arm_cfg)
+ except Exception as e:
+ logger.error(f"Failed to initialize GELLO hardware for {name}: {e}")
+
+ if not self._hws:
+ logger.error("No GELLO hardware initialized. Exiting.")
+ return
+
+ rate_limiter = SimpleFrameRate(self.config.read_frequency, "gello readout")
+
+ while not self._exit_requested:
+ for name, hw in self._hws.items():
+ try:
+ joints, gripper = hw.get_joint_and_gripper_positions()
+ with self._resource_lock:
+ self._last_joints[name] = joints
+ self._last_gripper[name] = gripper
+ except Exception as e:
+ logger.warning(f"Error reading GELLO {name}: {e}")
+
+ rate_limiter()
+
+ def close(self):
+ self._exit_requested = True
+ if HAS_PYNPUT and hasattr(self, "_listener"):
+ self._listener.stop()
+ for hw in self._hws.values():
+ hw.close()
+ if self.is_alive() and threading.current_thread() != self:
+ self.join(timeout=1.0)
diff --git a/python/rcs/operator/interface.py b/python/rcs/operator/interface.py
new file mode 100644
index 00000000..0092cb9b
--- /dev/null
+++ b/python/rcs/operator/interface.py
@@ -0,0 +1,227 @@
+from abc import ABC
+import copy
+from dataclasses import dataclass, field
+import logging
+import threading
+import time
+from time import sleep
+import gymnasium as gym
+
+from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TeleopCommands:
+ """Semantic commands decoupled from specific hardware buttons."""
+
+ record: bool = False
+ success: bool = False
+ failure: bool = False
+ sync_position: bool = False
+ reset_origin_to_current: dict[str, bool] = field(default_factory=dict)
+
+
+@dataclass(kw_only=True)
+class BaseOperatorConfig:
+ read_frequency: int = 30
+ simulation: bool = True
+
+
+class BaseOperator(ABC, threading.Thread):
+ control_mode: tuple[ControlMode, RelativeTo]
+ controller_names: list[str] = field(default=["left", "right"])
+
+ def __init__(self, config: BaseOperatorConfig, sim: Sim | None = None):
+ threading.Thread.__init__(self)
+ self.config = config
+ self.sim = sim
+
+ def consume_commands(self) -> TeleopCommands:
+ """Returns the current commands and resets them (edge-triggered). Must be thread-safe."""
+ raise NotImplementedError()
+
+ def reset_operator_state(self):
+ """Hook for subclasses to reset their internal poses/offsets on env reset. Must be thread-safe."""
+
+ def run(self):
+ """Read out hardware, set states and process buttons."""
+ raise NotImplementedError()
+
+ def consume_action(self) -> dict[str, ArmWithGripper]:
+ """Returns the action dictionary to step the environment. Must be thread-safe."""
+ raise NotImplementedError()
+
+ def close(self):
+ pass
+
+
+class TeleopLoop:
+ """Interface for an operator device"""
+
+ # Define this as a class attribute so it can be accessed without instantiating
+ control_mode: tuple[ControlMode, RelativeTo]
+
+ def __init__(
+ self,
+ env: gym.Env,
+ operator: BaseOperator,
+ env_frequency: int = 30,
+ key_translation: dict[str, str] | None = None,
+ ):
+ super().__init__()
+ self.env = env
+ self.operator = operator
+ self._exit_requested = False
+ self.env_frequency = env_frequency
+ if key_translation is None:
+ # controller to robot translation
+ self.key_translation = {key: key for key in self.operator.controller_names}
+ else:
+ self.key_translation = key_translation
+
+ # Absolute operators (RelativeTo.NONE) need an initial sync
+ self._synced = (self.operator.control_mode[1] != RelativeTo.NONE)
+
+ def stop(self):
+ self.operator.close()
+ self._exit_requested = True
+ self.operator.join()
+
+ def __enter__(self):
+ self.operator.start()
+ # sleep(2)
+ return self
+
+ def __exit__(self, *_):
+ self.stop()
+
+ def _translate_keys(self, actions):
+ translated = {self.key_translation[key]: actions[key] for key in actions}
+ # Fill in missing robots with "hold" actions from last observation
+ # This is necessary because absolute environments (like MultiRobotWrapper)
+ # require actions for all configured robots in every step.
+ for robot_name in self.env.get_wrapper_attr("envs").keys():
+ if robot_name not in translated:
+ if robot_name in self._last_obs:
+ translated[robot_name] = {
+ "joints": self._last_obs[robot_name]["joints"].copy(),
+ "gripper": self._last_obs[robot_name].get("gripper", 1.0)
+ }
+ return translated
+
+ def environment_step_loop(self):
+ rate_limiter = SimpleFrameRate(self.env_frequency, "env loop")
+
+ # 0. Initial Reset to get current positions for untracked robots
+ self._last_obs, _ = self.env.reset()
+
+ while True:
+ if self._exit_requested:
+ break
+
+ # 1. Process Meta-Commands
+ cmds = self.operator.consume_commands()
+
+ if cmds.record:
+ print("Command: Start Recording")
+ self.env.get_wrapper_attr("start_record")()
+
+ if cmds.success:
+ print("Command: Success! Resetting env...")
+ self.env.get_wrapper_attr("success")()
+ sleep(1) # sleep to let the robot reach the goal
+ self._last_obs, _ = self.env.reset()
+ self.operator.reset_operator_state()
+ self._synced = (self.operator.control_mode[1] != RelativeTo.NONE)
+ # consume new commands because of potential origin reset
+ continue
+
+ elif cmds.failure:
+ print("Command: Failure! Resetting env...")
+ self._last_obs, _ = self.env.reset()
+ self.operator.reset_operator_state()
+ self._synced = (self.operator.control_mode[1] != RelativeTo.NONE)
+ # consume new commands because of potential origin reset
+ continue
+
+ if cmds.sync_position:
+ self.sync_robot_to_operator()
+ self._synced = True
+ continue
+
+ if not self._synced:
+ # Still waiting for sync, step the env with "hold" actions
+ if int(time.time()) % 5 == 0 and int(time.time() * self.env_frequency) % self.env_frequency == 0:
+ print("Waiting for sync... (Press 's' on GELLO/Keyboard to sync)")
+
+ hold_actions = {}
+ for robot_name in self.env.get_wrapper_attr("envs").keys():
+ if robot_name in self._last_obs and "joints" in self._last_obs[robot_name]:
+ hold_actions[robot_name] = {
+ "joints": self._last_obs[robot_name]["joints"].copy(),
+ "gripper": self._last_obs[robot_name].get("gripper", 1.0),
+ }
+
+ self._last_obs, _, _, _, _ = self.env.step(hold_actions)
+ rate_limiter()
+ continue
+
+ for controller in cmds.reset_origin_to_current:
+ if cmds.reset_origin_to_current[controller]:
+ robot = self.key_translation[controller]
+ print(f"Command: Resetting origin for {robot}...")
+ assert (
+ self.operator.control_mode[1] == RelativeTo.CONFIGURED_ORIGIN
+ # TODO the following is a dict and can thus not easily be used like this
+ # and self.env.get_wrapper_attr("relative_to") == RelativeTo.CONFIGURED_ORIGIN
+ ), "both robot env and operator must be configured to relative_to.CONFIGURED_ORIGIN"
+ self.env.get_wrapper_attr("envs")[robot].set_origin_to_current()
+
+ # 2. Step the Environment
+ actions = self.operator.consume_action()
+ actions = self._translate_keys(actions)
+ self._last_obs, _, _, _, _ = self.env.step(actions)
+
+ rate_limiter()
+
+ def sync_robot_to_operator(self, duration: float = 3.0):
+ print(f"Command: Syncing robot to operator (duration: {duration}s)...")
+ rate_limiter = SimpleFrameRate(self.env_frequency, "sync loop")
+ num_steps = int(duration * self.env_frequency)
+
+ # 1. Capture the initial state for interpolation
+ start_obs = copy.deepcopy(self._last_obs)
+
+ # 2. Interpolation Loop
+ for i in range(num_steps):
+ alpha = (i + 1) / num_steps
+ # Re-consume operator action to follow moving target!
+ target_actions = self._translate_keys(self.operator.consume_action())
+
+ interp_actions = {}
+ for robot_name, target in target_actions.items():
+ try:
+ # Interpolate from FIXED start towards MOVING target
+ s_joints = start_obs[robot_name]["joints"]
+ t_joints = target["joints"]
+ interp_joints = s_joints + alpha * (t_joints - s_joints)
+
+ s_gripper = start_obs[robot_name].get("gripper", 1.0)
+ t_gripper = target.get("gripper", 1.0)
+ interp_gripper = s_gripper + alpha * (t_gripper - s_gripper)
+
+ interp_actions[robot_name] = {
+ "joints": interp_joints,
+ "gripper": interp_gripper,
+ }
+ except (KeyError, TypeError):
+ continue
+
+ self._last_obs, _, _, _, _ = self.env.step(interp_actions)
+ rate_limiter()
+
+ print("Sync Complete.")
diff --git a/python/rcs/operator/pedals.py b/python/rcs/operator/pedals.py
new file mode 100644
index 00000000..e1865f51
--- /dev/null
+++ b/python/rcs/operator/pedals.py
@@ -0,0 +1,111 @@
+import evdev
+from evdev import ecodes
+import threading
+import time
+
+class FootPedal:
+ def __init__(self, device_name_substring="Foot Switch"):
+ """Initializes the foot pedal and starts the background reading thread."""
+ self.device_path = self._find_device(device_name_substring)
+
+ if not self.device_path:
+ raise FileNotFoundError(f"Could not find a device matching '{device_name_substring}'")
+
+ self.device = evdev.InputDevice(self.device_path)
+ self.device.grab() # Prevent events from leaking into the OS/terminal
+
+ # Dictionary to hold the current state of each key.
+ # True = Pressed/Held, False = Released
+ self._key_states = {}
+ self._lock = threading.Lock()
+
+ # Start the background thread
+ self._running = True
+ self._thread = threading.Thread(target=self._read_events, daemon=True)
+ self._thread.start()
+ print(f"Connected to {self.device.name} at {self.device_path}")
+
+ def _find_device(self, substring):
+ """Finds the device path for the foot pedal."""
+ for path in evdev.list_devices():
+ dev = evdev.InputDevice(path)
+ if substring.lower() in dev.name.lower():
+ return path
+ return None
+
+ def _read_events(self):
+ """Background loop that updates the state dictionary."""
+ try:
+ for event in self.device.read_loop():
+ if not self._running:
+ break
+
+ if event.type == ecodes.EV_KEY:
+ key_event = evdev.categorize(event)
+
+ with self._lock:
+ # keystate: 1 is DOWN, 2 is HOLD, 0 is UP
+ is_pressed = key_event.keystate in [1, 2]
+
+ # Store state using the string name of the key (e.g., 'KEY_A')
+ # If a key resolves to a list (rare, but happens in evdev), take the first one
+ key_name = key_event.keycode
+ if isinstance(key_name, list):
+ key_name = key_name[0]
+
+ self._key_states[key_name] = is_pressed
+
+ except OSError:
+ pass # Device disconnected or closed
+
+ def get_states(self):
+ """
+ Returns a snapshot of the latest key states.
+ Example return: {'KEY_A': True, 'KEY_B': False, 'KEY_C': False}
+ """
+ with self._lock:
+ # Return a copy to ensure thread safety
+ return self._key_states.copy()
+
+ def get_key_state(self, key_name):
+ """Returns the state of a specific key, defaulting to False if never pressed."""
+ with self._lock:
+ return self._key_states.get(key_name, False)
+
+ def close(self):
+ """Cleans up the device and stops the thread."""
+ self._running = False
+ try:
+ self.device.ungrab()
+ self.device.close()
+ except OSError:
+ pass
+
+# ==========================================
+# Example Usage
+# ==========================================
+if __name__ == "__main__":
+ try:
+ # Initialize the pedal
+ pedal = FootPedal("Foot Switch")
+
+ # Simulate a typical robotics control loop running at 10Hz
+ print("Starting control loop... Press Ctrl+C to exit.")
+ while True:
+ # Grab the latest states instantly without blocking
+ states = pedal.get_states()
+
+ if states:
+ # Print only the keys that are currently pressed
+ pressed_keys = [key for key, is_pressed in states.items() if is_pressed]
+ print(f"Currently pressed: {pressed_keys}")
+
+ # Your teleoperation logic goes here...
+
+ time.sleep(0.1) # 10Hz loop
+
+ except KeyboardInterrupt:
+ print("\nShutting down...")
+ finally:
+ if 'pedal' in locals():
+ pedal.close()
\ No newline at end of file
diff --git a/python/rcs/operator/quest.py b/python/rcs/operator/quest.py
new file mode 100644
index 00000000..d498a41b
--- /dev/null
+++ b/python/rcs/operator/quest.py
@@ -0,0 +1,239 @@
+import copy
+from dataclasses import dataclass
+import logging
+import threading
+from time import sleep
+
+import numpy as np
+from rcs._core.common import Pose, RPY
+from rcs.envs.base import ArmWithGripper, ControlMode, GripperDictType, RelativeActionSpace, RelativeTo, TQuatDictType
+from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+from simpub.core.simpub_server import SimPublisher
+from simpub.parser.simdata import SimObject, SimScene
+from simpub.sim.mj_publisher import MujocoPublisher
+from simpub.xr_device.meta_quest3 import MetaQuest3
+
+try:
+ from simpub.core.simpub_server import SimPublisher
+ from simpub.parser.simdata import SimObject, SimScene
+ from simpub.sim.mj_publisher import MujocoPublisher
+ from simpub.xr_device.meta_quest3 import MetaQuest3
+ HAS_SIMPUB = True
+except ImportError:
+ HAS_SIMPUB = False
+
+logger = logging.getLogger(__name__)
+
+# download the iris apk from the following repo release: https://github.com/intuitive-robots/IRIS-Meta-Quest3
+# in order to use usb connection install adb install adb
+# sudo apt install android-tools-adb
+# install it on your quest with
+# adb install IRIS-Meta-Quest3.apk
+
+if HAS_SIMPUB:
+ class FakeSimPublisher(SimPublisher):
+ def get_update(self):
+ return {}
+
+ class FakeSimScene(SimScene):
+ def __init__(self):
+ super().__init__()
+ self.root = SimObject(name="root")
+
+
+@dataclass(kw_only=True)
+class QuestConfig(BaseOperatorConfig):
+ include_rotation: bool = True
+ mq3_addr: str = "10.42.0.1"
+
+
+class QuestOperator(BaseOperator):
+
+ control_mode = (ControlMode.CARTESIAN_TQuat, RelativeTo.CONFIGURED_ORIGIN)
+ controller_names = ["left", "right"]
+
+ def __init__(self, config: QuestConfig, sim: Sim | None = None):
+ super().__init__(config, sim)
+ if not HAS_SIMPUB:
+ raise ImportError("simpub is not installed. Please install it to use QuestOperator.")
+
+ self.config: QuestConfig
+
+ self._resource_lock = threading.Lock()
+ self._cmd_lock = threading.Lock()
+
+ self._trg_btn = {"left": "index_trigger", "right": "index_trigger"}
+ self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"}
+ self._start_btn = "A"
+ self._stop_btn = "B"
+ self._unsuccessful_btn = "Y"
+
+ self._prev_data = None
+ self._exit_requested = False
+ self._grp_pos = {key: 1.0 for key in self.controller_names} # start with opened gripper
+ self._last_controller_pose = {key: Pose() for key in self.controller_names}
+ self._offset_pose = {key: Pose() for key in self.controller_names}
+
+ self._commands = TeleopCommands()
+ self._reset_origin_to_current()
+
+ self._step_env = False
+ self._set_frame = {key: Pose() for key in self.controller_names}
+ # if self.config.simulation:
+ # self._publisher = MujocoPublisher(self.sim.model, self.sim.data, self.config.mq3_addr, visible_geoms_groups=list(range(1, 3)))
+ if not self.config.simulation:
+ self._publisher = FakeSimPublisher(FakeSimScene(), self.config.mq3_addr)
+ # robot_cfg = default_sim_robot_cfg("fr3_empty_world")
+ # sim_cfg = SimConfig()
+ # sim_cfg.async_control = True
+ # twin_env = SimMultiEnvCreator()(
+ # name2id=ROBOT2IP,
+ # robot_cfg=robot_cfg,
+ # control_mode=ControlMode.JOINTS,
+ # gripper_cfg=default_sim_gripper_cfg(),
+ # sim_cfg=sim_cfg,
+ # )
+ # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim
+ # sim.open_gui()
+ # MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
+ # env_rel = DigitalTwin(env_rel, twin_env)
+ self._reader = MetaQuest3("RCSNode")
+
+ def _reset_origin_to_current(self, controller: str | None = None):
+ with self._cmd_lock:
+ if controller is None:
+ self._commands.reset_origin_to_current = {key: True for key in self.controller_names}
+ else:
+ self._commands.reset_origin_to_current[controller] = True
+
+ def _reset_state(self):
+ with self._resource_lock:
+ for controller in self.controller_names:
+ self._offset_pose[controller] = Pose()
+ self._last_controller_pose[controller] = Pose()
+ self._grp_pos[controller] = 1
+
+ def consume_commands(self) -> TeleopCommands:
+ # must be threadsafe
+ with self._cmd_lock:
+ cmds = copy.copy(self._commands)
+ self._commands = TeleopCommands()
+ return cmds
+
+ def reset_operator_state(self):
+ """Resets the hardware offsets when the environment resets."""
+ self._reset_state()
+ self._reset_origin_to_current()
+
+ def consume_action(self) -> dict[str, ArmWithGripper]:
+ transforms = {}
+ with self._resource_lock:
+ for controller in self.controller_names:
+ transform = Pose(
+ translation=(
+ self._last_controller_pose[controller].translation() # type: ignore
+ - self._offset_pose[controller].translation()
+ ),
+ quaternion=(
+ self._last_controller_pose[controller] * self._offset_pose[controller].inverse()
+ ).rotation_q(),
+ )
+
+ set_axes = Pose(quaternion=self._set_frame[controller].rotation_q())
+
+ transform = set_axes.inverse() * transform * set_axes
+ if not self.config.include_rotation:
+ transform = Pose(translation=transform.translation()) # identity rotation
+ transforms[controller] = TQuatDictType(
+ tquat=np.concatenate([transform.translation(), transform.rotation_q()])
+ )
+ transforms[controller].update(GripperDictType(gripper=self._grp_pos[controller]))
+ return transforms
+
+ def close(self):
+ self._reader.disconnect()
+ self._publisher.shutdown()
+ self._exit_requested = True
+ self.join()
+
+ def run(self):
+ rate_limiter = SimpleFrameRate(self.config.read_frequency, "teleop readout")
+ warning_raised = False
+
+ while not self._exit_requested:
+ input_data = self._reader.get_controller_data()
+
+ if input_data is None:
+ if not warning_raised:
+ logger.warning("[Quest Reader] packets empty")
+ warning_raised = True
+ sleep(0.5)
+ continue
+
+ if warning_raised:
+ logger.warning("[Quest Reader] packets arriving again")
+ warning_raised = False
+
+ # === Update Semantic Commands ===
+ with self._cmd_lock:
+ if input_data[self._start_btn] and (self._prev_data is None or not self._prev_data[self._start_btn]):
+ self._commands.record = True
+
+ if input_data[self._stop_btn] and (self._prev_data is None or not self._prev_data[self._stop_btn]):
+ self._commands.success = True
+
+ if input_data[self._unsuccessful_btn] and (
+ self._prev_data is None or not self._prev_data[self._unsuccessful_btn]
+ ):
+ self._commands.failure = True
+
+ # === Update Poses & Grippers ===
+ for controller in self.controller_names:
+ last_controller_pose = Pose(
+ translation=np.array(input_data[controller]["pos"]),
+ quaternion=np.array(input_data[controller]["rot"]),
+ )
+ # if controller == "left":
+ # last_controller_pose = (
+ # Pose(translation=np.array([0, 0, 0]), rpy=RPY(roll=0, pitch=0, yaw=np.deg2rad(180))) # type: ignore
+ # * last_controller_pose
+ # )
+
+ if input_data[controller][self._trg_btn[controller]] and (
+ self._prev_data is None or not self._prev_data[controller][self._trg_btn[controller]]
+ ):
+ # trigger just pressed (first data sample with button pressed)
+
+ with self._resource_lock:
+ self._offset_pose[controller] = last_controller_pose
+ self._last_controller_pose[controller] = last_controller_pose
+
+ elif not input_data[controller][self._trg_btn[controller]] and (
+ self._prev_data is None or self._prev_data[controller][self._trg_btn[controller]]
+ ):
+ with self._resource_lock:
+ self._last_controller_pose[controller] = Pose()
+ self._offset_pose[controller] = Pose()
+ self._reset_origin_to_current(controller)
+
+ elif input_data[controller][self._trg_btn[controller]]:
+ # button is pressed
+ with self._resource_lock:
+ self._last_controller_pose[controller] = last_controller_pose
+
+ if input_data[controller][self._grp_btn[controller]] and (
+ self._prev_data is None or not self._prev_data[controller][self._grp_btn[controller]]
+ ):
+ # just pressed
+ self._grp_pos[controller] = 0
+ if not input_data[controller][self._grp_btn[controller]] and (
+ self._prev_data is None or self._prev_data[controller][self._grp_btn[controller]]
+ ):
+ # just released
+ self._grp_pos[controller] = 1
+
+ self._prev_data = input_data
+ rate_limiter()
diff --git a/repro_multi_robot.py b/repro_multi_robot.py
new file mode 100644
index 00000000..57c8cd8e
--- /dev/null
+++ b/repro_multi_robot.py
@@ -0,0 +1,80 @@
+
+import gymnasium as gym
+import numpy as np
+import rcs
+from rcs import sim
+from rcs.envs.base import RobotEnv, ControlMode, MultiRobotWrapper
+from rcs.envs.sim import RobotSimWrapper, MultiSimRobotWrapper
+from rcs.envs.utils import default_sim_robot_cfg
+import os
+import sys
+
+# Add current directory to sys.path to ensure rcs is importable correctly
+sys.path.insert(0, os.path.abspath("python"))
+
+def test_multi_robot_greenlet():
+ scene_name = "fr3_empty_world"
+
+ # Force use of local paths
+ project_root = os.path.abspath(".")
+ mjcf_scene = os.path.join(project_root, "assets", "scenes", scene_name, "scene.xml")
+ mjcf_robot = os.path.join(project_root, "assets", "scenes", scene_name, "robot.xml")
+
+ print(f"Forcing local Scene MJCF: {mjcf_scene}")
+
+ if not os.path.exists(mjcf_scene):
+ raise FileNotFoundError(f"Could not find local mjcf at {mjcf_scene}")
+
+ # Update rcs.scenes for default_sim_robot_cfg to pick it up if it uses it
+ rcs.scenes[scene_name].mjcf_scene = mjcf_scene
+ rcs.scenes[scene_name].mjcf_robot = mjcf_robot
+ rcs.scenes[scene_name].mjb = None # Force re-parse of XML
+
+ simulation = sim.Sim(mjcf_scene)
+
+ # Setup two robots
+ envs = {}
+ for i in range(2):
+ robot_id = str(i)
+ cfg = default_sim_robot_cfg(scene_name, robot_id)
+ cfgkin = default_sim_robot_cfg(scene_name, "0")
+
+ # Kinematic path
+ # cfg.kinematic_model_path = os.path.join(project_root, "assets", "fr3", "urdf", "fr3.urdf")
+ # cfg.kinematic_model_path = rcs.scenes["fr3_empty_world"].mjcf_robot
+ # cfg.arm_collision_geoms = []
+ # cfg.attachment_site = "attachment_site_0"
+
+ ik = rcs.common.Pin(cfgkin.kinematic_model_path, cfgkin.attachment_site, urdf=False)
+ robot = sim.SimRobot(simulation, ik, cfg)
+
+ env = RobotEnv(robot, ControlMode.JOINTS)
+ # Wrap with RobotSimWrapper
+ env = RobotSimWrapper(env, simulation)
+ envs[robot_id] = env
+
+ multi_env = MultiRobotWrapper(envs)
+ multi_sim_env = MultiSimRobotWrapper(multi_env, simulation)
+
+ print("Resetting multi-robot environment...")
+ obs, info = multi_sim_env.reset()
+ print("Reset successful.")
+
+ print("Stepping multi-robot environment...")
+ actions = {
+ "0": {"joints": np.zeros(7)},
+ "1": {"joints": np.zeros(7)}
+ }
+ obs, reward, terminated, truncated, info = multi_sim_env.step(actions)
+ print("Step successful.")
+
+ multi_sim_env.close()
+ print("Test passed!")
+
+if __name__ == "__main__":
+ try:
+ test_multi_robot_greenlet()
+ except Exception as e:
+ print(f"Test failed with error: {e}")
+ import traceback
+ traceback.print_exc()
diff --git a/src/pybind/rcs.cpp b/src/pybind/rcs.cpp
index 6496ee1f..7a36ed70 100644
--- a/src/pybind/rcs.cpp
+++ b/src/pybind/rcs.cpp
@@ -472,6 +472,12 @@ PYBIND11_MODULE(_core, m) {
.def_readwrite("joints", &rcs::sim::SimRobotConfig::joints)
.def_readwrite("actuators", &rcs::sim::SimRobotConfig::actuators)
.def_readwrite("base", &rcs::sim::SimRobotConfig::base)
+ .def("__copy__", [](const rcs::sim::SimRobotConfig &self) {
+ return rcs::sim::SimRobotConfig(self);
+ })
+ .def("__deepcopy__", [](const rcs::sim::SimRobotConfig &self, py::dict) {
+ return rcs::sim::SimRobotConfig(self);
+ })
.def("add_id", &rcs::sim::SimRobotConfig::add_id, py::arg("id"));
py::class_(sim,
"SimRobotState")
@@ -510,6 +516,12 @@ PYBIND11_MODULE(_core, m) {
&rcs::sim::SimGripperConfig::max_actuator_width)
.def_readwrite("min_actuator_width",
&rcs::sim::SimGripperConfig::min_actuator_width)
+ .def("__copy__", [](const rcs::sim::SimGripperConfig &self) {
+ return rcs::sim::SimGripperConfig(self);
+ })
+ .def("__deepcopy__", [](const rcs::sim::SimGripperConfig &self, py::dict) {
+ return rcs::sim::SimGripperConfig(self);
+ })
.def("add_id", &rcs::sim::SimGripperConfig::add_id, py::arg("id"));
py::class_(
sim, "SimGripperState")