Skip to content

ロボットAPI¤

このページでは、Robopyのロボット制御に関連するAPIを説明します。

RakudaRobot¤

RakudaRobot ¤

RakudaRobot(cfg: RakudaConfig)

Bases: ComposedRobot[RakudaPairSys, Sensors, RakudaObs]

METHOD DESCRIPTION
connect
disconnect
teleoperation

Start teleoperation for Rakuda robot.

record
record_parallel

teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、

Source code in src/robopy/robots/rakuda/rakuda_robot.py
def __init__(self, cfg: RakudaConfig):
    cfg = apply_rakuda_dotconfig(cfg)
    self.config = cfg
    self._pair_sys = RakudaPairSys(cfg)
    self._sensor_configs: RakudaSensorConfigs = self._init_config()
    self._sensors: Sensors = self._init_sensors()

connect ¤

connect() -> None
Source code in src/robopy/robots/rakuda/rakuda_robot.py
def connect(self) -> None:
    try:
        self._pair_sys.connect()
    except Exception as e:
        self._pair_sys.disconnect()
        raise e

disconnect ¤

disconnect() -> None
Source code in src/robopy/robots/rakuda/rakuda_robot.py
def disconnect(self) -> None:
    self._pair_sys.disconnect()

    for cam in self._sensors.cameras or []:
        cam.disconnect()

    for tac in self._sensors.tactile or []:
        tac.disconnect()

    for audio in self._sensors.audio or []:
        audio.disconnect()

teleoperation ¤

teleoperation(max_seconds: float | None = None) -> None

Start teleoperation for Rakuda robot.

Source code in src/robopy/robots/rakuda/rakuda_robot.py
def teleoperation(self, max_seconds: float | None = None) -> None:
    """Start teleoperation for Rakuda robot."""
    if not self.is_connected:
        raise ConnectionError("RakudaRobot is not connected. Call connect() first.")

    if max_seconds is not None and max_seconds > 0:
        self._pair_sys.teleoperate(max_seconds=max_seconds)
    else:
        self._pair_sys.teleoperate()

record ¤

record(max_frame: int, fps: int = 5) -> RakudaObs
Source code in src/robopy/robots/rakuda/rakuda_robot.py
def record(self, max_frame: int, fps: int = 5) -> RakudaObs:
    if not self.is_connected:
        self.connect()

    if max_frame <= 0:
        raise ValueError("max_frame must be greater than 0.")

    leader_obs: List[NDArray[np.float32]] = []
    follower_obs: List[NDArray[np.float32]] = []
    camera_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)
    tactile_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)
    audio_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)

    get_obs_interval = 1.0 / fps
    frame_count = 0
    interval_start = time.time()

    try:
        while frame_count < max_frame:
            temp_arm_obs = self.robot_system.teleoperate_step()

            if time.time() - interval_start < get_obs_interval:
                continue

            arm_obs = temp_arm_obs
            leader_obs.append(arm_obs.leader)
            follower_obs.append(arm_obs.follower)

            sensor_data = self.sensors_observation()
            camera_data = sensor_data.cameras
            tactile_data = sensor_data.tactile
            audio_data = sensor_data.audio

            for cam_name, cam_frame in camera_data.items():
                camera_obs[cam_name].append(cam_frame)

            for tac_name, tac_frame in tactile_data.items():
                tactile_obs[tac_name].append(tac_frame)

            for audio_name, audio_frame in audio_data.items():
                audio_obs[audio_name].append(audio_frame)

            frame_count += 1
            interval_start = time.time()

    except KeyboardInterrupt:
        logger.info("Recording interrupted by user.")
    except Exception as e:
        logger.error(f"An error occurred during recording: {e}")
        raise e

    # process observations to numpy arrays
    leader_obs_np = np.array(leader_obs)
    follower_obs_np = np.array(follower_obs)
    arms: RakudaArmObs = RakudaArmObs(leader=leader_obs_np, follower=follower_obs_np)
    # process camera observations
    camera_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for cam_name, frames in camera_obs.items():
        if frames:
            camera_obs_np[cam_name] = np.array(frames)
        else:
            camera_obs_np[cam_name] = None

    # process tactile observations
    tactile_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for tac_name, frames in tactile_obs.items():
        if frames:
            tactile_obs_np[tac_name] = np.array(frames)
        else:
            tactile_obs_np[tac_name] = None

    # process audio observations
    audio_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for audio_name, frames in audio_obs.items():
        if frames:
            audio_obs_np[audio_name] = np.array(frames)
        else:
            audio_obs_np[audio_name] = None

    sensors_obs = RakudaSensorObs(
        cameras=camera_obs_np, tactile=tactile_obs_np, audio=audio_obs_np
    )
    return RakudaObs(arms=arms, sensors=sensors_obs)

record_parallel ¤

record_parallel(max_frame: int, fps: int = 20, teleop_hz: int = 25, max_processing_time_ms: float = 40) -> RakudaObs

teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、 センサデータは並列取得する高速記録。

Source code in src/robopy/robots/rakuda/rakuda_robot.py
def record_parallel(
    self,
    max_frame: int,
    fps: int = 20,
    teleop_hz: int = 25,
    max_processing_time_ms: float = 40,
) -> RakudaObs:
    """
    teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、
    センサデータは並列取得する高速記録。
    """
    if not self.is_connected:
        self.connect()

    if max_frame <= 0:
        raise ValueError("max_frame must be greater than 0.")

    # arm_obsを高頻度で取得するためのキュー
    arm_obs_queue: queue.Queue[RakudaArmObs] = queue.Queue(maxsize=teleop_hz * 2)
    stop_event = threading.Event()

    def teleop_worker() -> None:
        interval = 1.0 / teleop_hz
        while not stop_event.is_set():
            start_time = time.perf_counter()
            obs = self.robot_system.teleoperate_step()

            try:
                arm_obs_queue.put(obs, timeout=interval)
            except queue.Full:
                pass
            elapsed = time.perf_counter() - start_time
            sleep_time = max(0, interval - elapsed)
            time.sleep(sleep_time)

    teleop_thread = threading.Thread(target=teleop_worker, daemon=True)
    teleop_thread.start()

    leader_obs: List[NDArray[np.float32]] = []
    follower_obs: List[NDArray[np.float32]] = []
    camera_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)
    tactile_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)
    audio_obs: Dict[str, List[NDArray[np.float32] | None]] = defaultdict(list)

    get_obs_interval = 1.0 / fps
    max_processing_time = max_processing_time_ms / 1000.0
    frame_count = 0
    skipped_frames = 0
    total_processing_time = 0.0

    logger.info(f"Starting parallel recording: {max_frame} frames at {fps}Hz")
    logger.info(
        f"""Target interval: {get_obs_interval * 1000:.1f}ms,
        Max processing time: {max_processing_time_ms}ms"""
    )

    try:
        while frame_count < max_frame:
            frame_start_time = time.perf_counter()

            # 最新のarm_obsを取得(バッファが空なら待つ)
            try:
                while True:
                    arm_obs = arm_obs_queue.get(timeout=get_obs_interval)
                    while not arm_obs_queue.empty():
                        arm_obs = arm_obs_queue.get_nowait()
                    break
            except queue.Empty:
                logger.warning("No arm_obs available in time.")
                continue

            # センサデータは並列取得
            try:
                with ThreadPoolExecutor(max_workers=4) as executor:
                    # Camera futures
                    camera_futures: Dict[str, Future[NDArray[np.float32] | None]] = {}
                    if self._sensors.cameras:
                        for cam in self._sensors.cameras:
                            if cam.is_connected:
                                camera_futures[cam.name] = executor.submit(
                                    cam.async_read, timeout_ms=5
                                )

                    # Tactile futures
                    tactile_futures: Dict[str, Future[NDArray[np.float32] | None]] = {}
                    if self._sensors.tactile:
                        for tac in self._sensors.tactile:
                            if tac.is_connected:
                                tactile_futures[tac.name] = executor.submit(
                                    tac.async_read, timeout_ms=5
                                )

                    # Audio futures
                    audio_futures: Dict[str, Future[NDArray[np.float32] | None]] = {}
                    if self._sensors.audio:
                        for audio in self._sensors.audio:
                            if audio.is_connected:
                                audio_futures[audio.name] = executor.submit(
                                    audio.async_read, timeout_ms=5
                                )

                    timeout = max_processing_time * 0.5

                    camera_data: Dict[str, NDArray[np.float32] | None] = {}
                    for cam_name, future in camera_futures.items():
                        try:
                            camera_data[cam_name] = future.result(timeout=timeout / 2)
                        except Exception as e:
                            logger.warning(
                                f"Camera {cam_name} failed in frame {frame_count}: {e}"
                            )
                            camera_data[cam_name] = None

                    tactile_data: Dict[str, NDArray[np.float32] | None] = {}
                    for tac_name, future in tactile_futures.items():
                        try:
                            tactile_data[tac_name] = future.result(timeout=timeout / 2)
                        except Exception as e:
                            logger.warning(
                                f"Tactile {tac_name} failed in frame {frame_count}: {e}"
                            )
                            tactile_data[tac_name] = None

                    audio_data: Dict[str, NDArray[np.float32] | None] = {}
                    for audio_name, future in audio_futures.items():
                        try:
                            audio_data[audio_name] = future.result(timeout=timeout / 2)
                        except Exception as e:
                            logger.warning(
                                f"Audio {audio_name} failed in frame {frame_count}: {e}"
                            )
                            audio_data[audio_name] = None

                # 記録
                leader_obs.append(arm_obs.leader)
                follower_obs.append(arm_obs.follower)

                for cam_name, cam_frame in camera_data.items():
                    camera_obs[cam_name].append(cam_frame)

                for tac_name, tac_frame in tactile_data.items():
                    tactile_obs[tac_name].append(tac_frame)

                for audio_name, audio_frame in audio_data.items():
                    audio_obs[audio_name].append(audio_frame)

                frame_count += 1
                logger.info("Recording progress: %s/%s frames", frame_count, max_frame)

                # タイミング調整
                processing_time = time.perf_counter() - frame_start_time
                total_processing_time += processing_time

                if processing_time > max_processing_time:
                    skipped_frames += 1
                    logger.warning(
                        f"""Frame {frame_count} took {processing_time * 1000:.1f}ms
                        (>{max_processing_time_ms}ms), skipping"""
                    )
                    continue

                elapsed = time.perf_counter() - frame_start_time
                sleep_time = max(0, get_obs_interval - elapsed)
                if sleep_time > 0:
                    time.sleep(sleep_time)

            except Exception as e:
                logger.error(f"Unexpected error in frame {frame_count}: {e}")
                continue

    except KeyboardInterrupt:
        logger.info("Recording interrupted by user.")
    except Exception as e:
        logger.error(f"An error occurred during parallel recording: {e}")
        raise e
    finally:
        stop_event.set()
        teleop_thread.join(timeout=1.0)

    avg_processing_time = total_processing_time / max(1, frame_count) * 1000
    logger.info(f"Recording completed: {frame_count} frames, {skipped_frames} skipped")
    logger.info(f"Average processing time: {avg_processing_time:.1f}ms")

    leader_obs_np = np.array(leader_obs)
    follower_obs_np = np.array(follower_obs)
    arms: RakudaArmObs = RakudaArmObs(leader=leader_obs_np, follower=follower_obs_np)

    camera_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for cam_name, frames in camera_obs.items():
        if frames and all(frame is not None for frame in frames):
            camera_obs_np[cam_name] = np.array(frames)
        else:
            camera_obs_np[cam_name] = None

    tactile_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for tac_name, frames in tactile_obs.items():
        if frames and all(frame is not None for frame in frames):
            tactile_obs_np[tac_name] = np.array(frames).transpose(0, 3, 1, 2)
        else:
            tactile_obs_np[tac_name] = None

    audio_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for audio_name, frames in audio_obs.items():
        if frames and all(frame is not None for frame in frames):
            audio_obs_np[audio_name] = np.array(frames)
        else:
            audio_obs_np[audio_name] = None

    sensors_obs = RakudaSensorObs(
        cameras=camera_obs_np, tactile=tactile_obs_np, audio=audio_obs_np
    )
    return RakudaObs(arms=arms, sensors=sensors_obs)

RakudaConfig dataclass ¤

RakudaConfig(leader_port: str, follower_port: str, sensors: RakudaSensorParams | None = None, slow_mode: bool = False, leader_torque_enabled: List[str] | None = None, follower_torque_enabled: List[str] | None = None)

Configuration class for Rakuda robot.

RakudaPairSys ¤

RakudaPairSys(cfg: RakudaConfig)

Bases: Robot

Class representing the Rakuda robotic system with both leader and follower arms.

METHOD DESCRIPTION
connect

Connect to both leader and follower arms.

disconnect

Disconnect from both leader and follower arms.

get_observation

Get the current observation from both arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def __init__(self, cfg: RakudaConfig) -> None:
    cfg = apply_rakuda_dotconfig(cfg)
    self.config = cfg
    self._leader = RakudaLeader(cfg)
    self._follower = RakudaFollower(cfg)
    self._is_connected = False
    self._motor_mapping = (
        RAKUDA_MOTOR_MAPPING  # key: leader motor name, value: follower motor name
    )
    self._leader_motor_names = list(self._leader.motors.motors.keys())
    self._follower_motor_names = list(self._follower.motors.motors.keys())

    # Cache torque-enabled joints for safe write filtering.
    self._leader_torque_enabled: set[str] = (
        {"l_arm_grip", "r_arm_grip"}
        if cfg.leader_torque_enabled is None
        else set(cfg.leader_torque_enabled)
    )
    self._follower_torque_enabled: set[str] = (
        set(self._follower_motor_names)
        if cfg.follower_torque_enabled is None
        else set(cfg.follower_torque_enabled)
    )

connect ¤

connect() -> None

Connect to both leader and follower arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def connect(self) -> None:
    """Connect to both leader and follower arms."""
    if self.is_connected:
        logger.info("Successfully connected to both leader and follower arms.")
        return

    try:
        self._leader.connect()
        self._follower.connect()
        logger.info("Successfully connected to both leader and follower arms.")
        print("[cyan]Successfully connected to both leader and follower arms.[/cyan]")
        self._is_connected = True
    except (OSError, IOError, PermissionError) as e:
        logger.error(f"Failed to connect to arms: {e}")
        raise ConnectionError(f"Failed to connect to arms: {e}")
    except (pickle.PickleError, EOFError) as e:
        logger.error(f"Calibration data corrupted: {e}")
        raise ConnectionError(f"Calibration data error: {e}")

disconnect ¤

disconnect() -> None

Disconnect from both leader and follower arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def disconnect(self) -> None:
    """Disconnect from both leader and follower arms."""
    self.leader.disconnect()
    self.follower.disconnect()

get_observation ¤

get_observation() -> RakudaArmObs

Get the current observation from both arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def get_observation(self) -> RakudaArmObs:
    """Get the current observation from both arms."""
    if not self.is_connected:
        raise ConnectionError("RakudaPairSys is not connected. Call connect() first.")

    leader_motor_names = self._leader_motor_names
    follower_motor_names = self._follower_motor_names
    leader_obs = self._leader.motors.sync_read(
        XControlTable.PRESENT_POSITION, leader_motor_names
    )

    follower_obs = self._follower.motors.sync_read(
        XControlTable.PRESENT_POSITION, follower_motor_names
    )

    leader_obs_array = np.array(list(leader_obs.values()), dtype=np.float32)
    follower_obs_array = np.array(list(follower_obs.values()), dtype=np.float32)
    return RakudaArmObs(leader=leader_obs_array, follower=follower_obs_array)

RakudaLeader ¤

RakudaLeader(cfg: RakudaConfig)

Bases: RakudaArm

Class representing the leader arm of the Rakuda robotic system.

METHOD DESCRIPTION
connect
disconnect

connect ¤

connect() -> None

disconnect ¤

disconnect() -> None

RakudaFollower ¤

RakudaFollower(cfg: RakudaConfig)

Bases: RakudaArm

Class representing the follower arm of the Rakuda robotic system.

METHOD DESCRIPTION
connect

Connect to the follower arm and enable torque.

disconnect

Disconnect from the follower arm and disable torque.

connect ¤

connect() -> None

Connect to the follower arm and enable torque.

disconnect ¤

disconnect() -> None

Disconnect from the follower arm and disable torque.

KochRobot ¤

KochRobot(cfg: KochConfig)

Bases: ComposedRobot[KochPairSys, Sensors, KochObs]

METHOD DESCRIPTION
connect
disconnect
teleoperation

Start teleoperation mode where leader controls follower.

forward_kinematics

Compute FK from joint angles (degrees) to end-effector pose.

inverse_kinematics

Solve IK for a target end-effector pose.

send_frame_action
send_ee_frame_action

Send a single end-effector pose action to the robot.

send_ee

Send a trajectory of end-effector poses to the robot.

connect ¤

connect() -> None

disconnect ¤

disconnect() -> None

teleoperation ¤

teleoperation(max_seconds: float | None = None) -> None

Start teleoperation mode where leader controls follower.

forward_kinematics classmethod ¤

forward_kinematics(joint_angles_deg: NDArray[float32]) -> EEPose

Compute FK from joint angles (degrees) to end-effector pose.

PARAMETER DESCRIPTION
joint_angles_deg

(5,) or (6,) array of joint angles in degrees. If 6 values are given the last element (gripper) is ignored.

TYPE: NDArray[float32]

RETURNS DESCRIPTION
EEPose

EEPose with position in metres and orientation in radians.

inverse_kinematics ¤

inverse_kinematics(target_pose: EEPose | NDArray[float32], current_joint_angles_deg: NDArray[float32], ik_config: IKConfig | None = None) -> IKResult

Solve IK for a target end-effector pose.

PARAMETER DESCRIPTION
target_pose

Desired EE pose (EEPose or (5,) array [x, y, z, pitch, roll]).

TYPE: EEPose | NDArray[float32]

current_joint_angles_deg

(5,) or (6,) current joint angles in degrees used as the initial guess.

TYPE: NDArray[float32]

ik_config

Optional override for solver parameters.

TYPE: IKConfig | None DEFAULT: None

RETURNS DESCRIPTION
IKResult

IKResult with joint_angles_rad in radians.

send_frame_action ¤

send_frame_action(leader_action: NDArray[float32]) -> None

send_ee_frame_action ¤

send_ee_frame_action(ee_action: NDArray[float32], current_joint_angles_deg: NDArray[float32] | None = None, gripper_deg: float = 0.0) -> None

Send a single end-effector pose action to the robot.

PARAMETER DESCRIPTION
ee_action

(5,) array [x, y, z, pitch, roll] or (6,) array [x, y, z, pitch, roll, gripper_deg].

TYPE: NDArray[float32]

current_joint_angles_deg

(5,) or (6,) current joint angles in degrees. If None the current follower positions are read.

TYPE: NDArray[float32] | None DEFAULT: None

gripper_deg

Gripper angle in degrees (used when ee_action is 5-dimensional).

TYPE: float DEFAULT: 0.0

send_ee ¤

send_ee(max_frame: int, fps: int, ee_actions: NDArray[float32], teleop_hz: int = 100) -> None

Send a trajectory of end-effector poses to the robot.

Each row of ee_actions is converted to joint angles via IK. The previous frame's solution is used as the seed for the next frame so that the resulting joint trajectory is smooth.

PARAMETER DESCRIPTION
max_frame

Number of frames in the trajectory.

TYPE: int

fps

Playback frame-rate.

TYPE: int

ee_actions

(max_frame, 5) or (max_frame, 6) end-effector poses. If 6 columns, the last column is the gripper angle in degrees.

TYPE: NDArray[float32]

teleop_hz

Motor command rate.

TYPE: int DEFAULT: 100

So101Robot¤

So101Robot ¤

So101Robot(cfg: So101Config)

Bases: ComposedRobot[So101PairSys, Sensors, So101Obs]

METHOD DESCRIPTION
connect
disconnect
teleoperation

Start teleoperation mode where leader controls follower.

forward_kinematics

Compute FK from joint angles (degrees) to end-effector pose.

inverse_kinematics

Solve IK for a target end-effector pose.

send_frame_action
send_ee_frame_action

Send a single end-effector pose action to the robot.

send_ee

Send a trajectory of end-effector poses to the robot.

connect ¤

connect() -> None

disconnect ¤

disconnect() -> None

teleoperation ¤

teleoperation(max_seconds: float | None = None) -> None

Start teleoperation mode where leader controls follower.

forward_kinematics classmethod ¤

forward_kinematics(joint_angles_deg: NDArray[float32]) -> EEPose

Compute FK from joint angles (degrees) to end-effector pose.

PARAMETER DESCRIPTION
joint_angles_deg

(5,) or (6,) array of joint angles in degrees. If 6 values are given the last element (gripper) is ignored.

TYPE: NDArray[float32]

RETURNS DESCRIPTION
EEPose

EEPose with position in metres and orientation in radians.

inverse_kinematics ¤

inverse_kinematics(target_pose: EEPose | NDArray[float32], current_joint_angles_deg: NDArray[float32], ik_config: IKConfig | None = None) -> IKResult

Solve IK for a target end-effector pose.

PARAMETER DESCRIPTION
target_pose

Desired EE pose (EEPose or (5,) array [x, y, z, pitch, roll]).

TYPE: EEPose | NDArray[float32]

current_joint_angles_deg

(5,) or (6,) current joint angles in degrees used as the initial guess.

TYPE: NDArray[float32]

ik_config

Optional override for solver parameters.

TYPE: IKConfig | None DEFAULT: None

RETURNS DESCRIPTION
IKResult

IKResult with joint_angles_rad in radians.

send_frame_action ¤

send_frame_action(leader_action: NDArray[float32]) -> None

send_ee_frame_action ¤

send_ee_frame_action(ee_action: NDArray[float32], current_joint_angles_deg: NDArray[float32] | None = None, gripper_deg: float = 0.0) -> None

Send a single end-effector pose action to the robot.

PARAMETER DESCRIPTION
ee_action

(5,) array [x, y, z, pitch, roll] or (6,) array [x, y, z, pitch, roll, gripper_deg].

TYPE: NDArray[float32]

current_joint_angles_deg

(5,) or (6,) current joint angles in degrees. If None the current follower positions are read.

TYPE: NDArray[float32] | None DEFAULT: None

gripper_deg

Gripper angle in degrees (used when ee_action is 5-dimensional).

TYPE: float DEFAULT: 0.0

send_ee ¤

send_ee(max_frame: int, fps: int, ee_actions: NDArray[float32], teleop_hz: int = 100) -> None

Send a trajectory of end-effector poses to the robot.

Each row of ee_actions is converted to joint angles via IK. The previous frame's solution is used as the seed for the next frame so that the resulting joint trajectory is smooth.

PARAMETER DESCRIPTION
max_frame

Number of frames in the trajectory.

TYPE: int

fps

Playback frame-rate.

TYPE: int

ee_actions

(max_frame, 5) or (max_frame, 6) end-effector poses. If 6 columns, the last column is the gripper angle in degrees.

TYPE: NDArray[float32]

teleop_hz

Motor command rate.

TYPE: int DEFAULT: 100

So101Config dataclass ¤

So101Config(follower_port: str, calibration_path: str, leader_port: str | None = None, sensors: So101SensorConfig = So101SensorConfig())

Configuration class for SO-101 robot.

The SO-101 uses Feetech STS3215 motors with 6 joints per arm: shoulder_pan, shoulder_lift, elbow_flex, wrist_flex, wrist_roll, gripper.

So101SpaceMouseController¤

SpaceMouseを使用したEE空間テレオペレーション。詳細はSpaceMouse APIを参照。

So101SpaceMouseController ¤

So101SpaceMouseController(robot: So101Robot, spacemouse_config: SpaceMouseConfig | None = None)

Control an SO-101 follower arm via a 3Dconnexion SpaceMouse.

The SpaceMouse 6-DOF input (x, y, z, roll, pitch, yaw — all normalised to [-1, 1]) is mapped to incremental end-effector velocity commands. At each control step the current EE pose is updated by the velocity delta, then Inverse Kinematics converts the target pose to joint angles that are sent to the follower arm.

The controller operates without a physical leader arm — the SpaceMouse replaces it. Gripper open/close is driven by the left/right buttons.

PARAMETER DESCRIPTION
robot

A configured :class:So101Robot instance. Only the follower arm is used; a leader arm is not required.

TYPE: So101Robot

spacemouse_config

Optional tuning parameters. Defaults are reasonable for desktop-scale manipulation.

TYPE: SpaceMouseConfig | None DEFAULT: None

METHOD DESCRIPTION
connect

Connect the robot and start the SpaceMouse reader.

disconnect

Stop the SpaceMouse reader and disconnect the robot.

teleoperation

Run real-time SpaceMouse → EE-space teleoperation.

record_parallel

Record a SpaceMouse-driven trajectory with camera observations.

connect ¤

connect() -> None

Connect the robot and start the SpaceMouse reader.

disconnect ¤

disconnect() -> None

Stop the SpaceMouse reader and disconnect the robot.

teleoperation ¤

teleoperation(max_seconds: float | None = None) -> None

Run real-time SpaceMouse → EE-space teleoperation.

The control loop reads the SpaceMouse at control_hz, computes the incremental EE delta, runs IK and sends joint angles to the follower.

PARAMETER DESCRIPTION
max_seconds

If given, stop after this many seconds. Otherwise the loop runs until interrupted (KeyboardInterrupt).

TYPE: float | None DEFAULT: None

record_parallel ¤

record_parallel(max_frame: int, fps: int = 20, control_hz: int | None = None, is_async: bool = True) -> So101Obs

Record a SpaceMouse-driven trajectory with camera observations.

Architecture mirrors :meth:So101Robot.record_parallel — a fast control thread drives the arm while the main thread captures camera frames at the requested fps.

The returned So101Obs.arms.leader contains the commanded joint angles (IK output), so the trajectory can be replayed verbatim with So101Robot.send(). So101Obs.arms.follower contains the actual motor readings.

PARAMETER DESCRIPTION
max_frame

Number of sensor frames to record.

TYPE: int

fps

Camera / observation capture rate.

TYPE: int DEFAULT: 20

control_hz

SpaceMouse control loop rate (defaults to config).

TYPE: int | None DEFAULT: None

is_async

Use asynchronous camera reads when available.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
So101Obs

So101Obs with arm and camera observations.

使用例¤

基本的なRakudaRobotの使用¤

from robopy import RakudaConfig, RakudaRobot

# 設定の作成
config = RakudaConfig(
    leader_port="/dev/ttyUSB0",
    follower_port="/dev/ttyUSB1"
)

# ロボットの作成と接続
robot = RakudaRobot(config)
robot.connect()

try:
    # テレオペレーション
    robot.teleoperation(duration=10)

    # データ記録
    obs = robot.record_parallel(max_frame=1000,20)

finally:
    robot.disconnect()

個別アームの制御¤

from robopy.robots.rakuda import RakudaLeader, RakudaFollower

# 個別アームの作成
leader = RakudaLeader("/dev/ttyUSB0")
follower = RakudaFollower("/dev/ttyUSB1")

leader.connect()
follower.connect()

try:
    # Leaderから観測データを取得
    leader_obs = leader.get_obs()

    # Followerに動作指令を送信
    follower.set_action(leader_obs["action"])

finally:
    leader.disconnect()
    follower.disconnect()