Skip to content

ロボットAPI¤

このページでは、Robopyのロボット制御に関連するAPIを説明します。

RakudaRobot¤

RakudaRobot ¤

RakudaRobot(cfg: RakudaConfig)

Bases: ComposedRobot

METHOD DESCRIPTION
connect
disconnect
teleoperation

Start teleoperation for Rakuda robot.

record
record_parallel

teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、

Source code in src/robopy/robots/rakuda/rakuda_robot.py
def __init__(self, cfg: RakudaConfig):
    self.config = cfg
    self._pair_sys = RakudaPairSys(cfg)
    self._sensor_configs: RakudaSensorConfigs = self._init_config()
    self._sensors: Sensors = self._init_sensors()

connect ¤

connect() -> None
Source code in src/robopy/robots/rakuda/rakuda_robot.py
@override
def connect(self) -> None:
    try:
        self._pair_sys.connect()
    except Exception as e:
        self._pair_sys.disconnect()
        raise e

disconnect ¤

disconnect() -> None
Source code in src/robopy/robots/rakuda/rakuda_robot.py
@override
def disconnect(self) -> None:
    self._pair_sys.disconnect()

    for cam in self._sensors.cameras or []:
        cam.disconnect()

    for tac in self._sensors.tactile or []:
        tac.disconnect()

teleoperation ¤

teleoperation(max_seconds: float | None = None) -> None

Start teleoperation for Rakuda robot.

Source code in src/robopy/robots/rakuda/rakuda_robot.py
@override
def teleoperation(self, max_seconds: float | None = None) -> None:
    """Start teleoperation for Rakuda robot."""
    if not self.is_connected:
        raise ConnectionError("RakudaRobot is not connected. Call connect() first.")

    if max_seconds is not None and max_seconds > 0:
        self._pair_sys.teleoperate(max_seconds=max_seconds)
    else:
        self._pair_sys.teleoperate()

record ¤

record(max_frame: int, fps: int = 5) -> RakudaObs
Source code in src/robopy/robots/rakuda/rakuda_robot.py
def record(self, max_frame: int, fps: int = 5) -> RakudaObs:
    if not self.is_connected:
        self.connect()

    if max_frame <= 0:
        raise ValueError("max_frame must be greater than 0.")

    leader_obs = []
    follower_obs = []
    camera_obs: Dict[str, List] = DefaultDict(list)
    tactile_obs: Dict[str, List] = DefaultDict(list)

    get_obs_interval = 1.0 / fps
    frame_count = 0
    interval_start = time.time()

    try:
        while frame_count < max_frame:
            temp_arm_obs = self.robot_system.teleoperate_step()

            if time.time() - interval_start < get_obs_interval:
                continue

            arm_obs = temp_arm_obs
            leader_obs.append(arm_obs.leader)
            follower_obs.append(arm_obs.follower)

            sensor_data = self.sensors_observation()
            camera_data = sensor_data.cameras
            tactile_data = sensor_data.tactile

            for cam_name, cam_frame in camera_data.items():
                camera_obs[cam_name].append(cam_frame)

            for tac_name, tac_frame in tactile_data.items():
                tactile_obs[tac_name].append(tac_frame)

            frame_count += 1
            interval_start = time.time()

    except KeyboardInterrupt:
        logger.info("Recording interrupted by user.")
    except Exception as e:
        logger.error(f"An error occurred during recording: {e}")
        raise e

    # proccess observations to numpy arrays
    leader_obs_np = np.array(leader_obs)
    follower_obs_np = np.array(follower_obs)
    arms: RakudaArmObs = RakudaArmObs(leader=leader_obs_np, follower=follower_obs_np)
    # process camera observations
    camera_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for cam_name, frames in camera_obs.items():
        if frames:
            camera_obs_np[cam_name] = np.array(frames)
        else:
            camera_obs_np[cam_name] = None

    # process tactile observations
    tactile_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for tac_name, frames in tactile_obs.items():
        if frames:
            tactile_obs_np[tac_name] = np.array(frames)
        else:
            tactile_obs_np[tac_name] = None

    sensors_obs = RakudaSensorObs(cameras=camera_obs_np, tactile=tactile_obs_np)
    return RakudaObs(arms=arms, sensors=sensors_obs)

record_parallel ¤

record_parallel(max_frame: int, fps: int = 30, teleop_hz: int = 100, max_processing_time_ms: float = 25.0) -> RakudaObs

teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、 センサデータは並列取得する高速記録。

Source code in src/robopy/robots/rakuda/rakuda_robot.py
def record_parallel(
    self,
    max_frame: int,
    fps: int = 30,
    teleop_hz: int = 100,
    max_processing_time_ms: float = 25.0,
) -> RakudaObs:
    """
    teleoperate_stepをteleop_hzで回しつつ、fpsごとに最新のarm_obsを記録し、
    センサデータは並列取得する高速記録。
    """
    if not self.is_connected:
        self.connect()

    if max_frame <= 0:
        raise ValueError("max_frame must be greater than 0.")

    # arm_obsを高頻度で取得するためのキュー
    arm_obs_queue: queue.Queue[RakudaArmObs] = queue.Queue(maxsize=teleop_hz * 2)
    stop_event = threading.Event()

    def teleop_worker():
        interval = 1.0 / teleop_hz
        while not stop_event.is_set():
            obs = self.robot_system.teleoperate_step()
            try:
                arm_obs_queue.put(obs, timeout=interval)
            except queue.Full:
                pass
            time.sleep(interval)

    teleop_thread = threading.Thread(target=teleop_worker, daemon=True)
    teleop_thread.start()

    leader_obs = []
    follower_obs = []
    camera_obs: Dict[str, List] = DefaultDict(list)
    tactile_obs: Dict[str, List] = DefaultDict(list)

    get_obs_interval = 1.0 / fps
    max_processing_time = max_processing_time_ms / 1000.0
    frame_count = 0
    skipped_frames = 0
    total_processing_time = 0.0

    logger.info(f"Starting parallel recording: {max_frame} frames at {fps}Hz")
    logger.info(
        f"""Target interval: {get_obs_interval * 1000:.1f}ms, 
        Max processing time: {max_processing_time_ms}ms"""
    )

    try:
        while frame_count < max_frame:
            frame_start_time = time.perf_counter()

            # 最新のarm_obsを取得(バッファが空なら待つ)
            try:
                while True:
                    arm_obs = arm_obs_queue.get(timeout=get_obs_interval)
                    while not arm_obs_queue.empty():
                        arm_obs = arm_obs_queue.get_nowait()
                    break
            except queue.Empty:
                logger.warning("No arm_obs available in time.")
                continue

            # センサデータは並列取得
            try:
                with ThreadPoolExecutor(max_workers=4) as executor:
                    # Camera futures
                    camera_futures = {}
                    if self._sensors.cameras:
                        for cam in self._sensors.cameras:
                            if cam.is_connected:
                                camera_futures[cam.name] = executor.submit(
                                    cam.async_read, timeout_ms=5
                                )

                    # Tactile futures
                    tactile_futures = {}
                    if self._sensors.tactile:
                        for tac in self._sensors.tactile:
                            if tac.is_connected:
                                if hasattr(tac, "async_read"):
                                    tactile_futures[tac.name] = executor.submit(
                                        tac.async_read, timeout_ms=5
                                    )
                                else:
                                    tactile_futures[tac.name] = executor.submit(tac.read)

                    timeout = max_processing_time * 0.8

                    camera_data: Dict[str, NDArray | None] = {}
                    for cam_name, future in camera_futures.items():
                        try:
                            camera_data[cam_name] = future.result(timeout=timeout / 2)
                        except Exception as e:
                            logger.warning(
                                f"Camera {cam_name} failed in frame {frame_count}: {e}"
                            )
                            camera_data[cam_name] = None

                    tactile_data: Dict[str, NDArray | None] = {}
                    for tac_name, future in tactile_futures.items():
                        try:
                            tactile_data[tac_name] = future.result(timeout=timeout / 2)
                        except Exception as e:
                            logger.warning(
                                f"Tactile {tac_name} failed in frame {frame_count}: {e}"
                            )
                            tactile_data[tac_name] = None

                # 記録
                leader_obs.append(arm_obs.leader)
                follower_obs.append(arm_obs.follower)

                for cam_name, cam_frame in camera_data.items():
                    camera_obs[cam_name].append(cam_frame)

                for tac_name, tac_frame in tactile_data.items():
                    tactile_obs[tac_name].append(tac_frame)

                frame_count += 1

                # タイミング調整
                processing_time = time.perf_counter() - frame_start_time
                total_processing_time += processing_time
                if processing_time > max_processing_time:
                    skipped_frames += 1
                    logger.warning(
                        f"""Frame {frame_count} took {processing_time * 1000:.1f}ms 
                        (>{max_processing_time_ms}ms), skipping"""
                    )
                    continue

                elapsed = time.perf_counter() - frame_start_time
                sleep_time = max(0, get_obs_interval - elapsed)
                if sleep_time > 0:
                    time.sleep(sleep_time)

            except Exception as e:
                logger.error(f"Unexpected error in frame {frame_count}: {e}")
                continue

    except KeyboardInterrupt:
        logger.info("Recording interrupted by user.")
    except Exception as e:
        logger.error(f"An error occurred during parallel recording: {e}")
        raise e
    finally:
        stop_event.set()
        teleop_thread.join(timeout=1.0)

    avg_processing_time = total_processing_time / max(1, frame_count) * 1000
    logger.info(f"Recording completed: {frame_count} frames, {skipped_frames} skipped")
    logger.info(f"Average processing time: {avg_processing_time:.1f}ms")

    leader_obs_np = np.array(leader_obs)
    follower_obs_np = np.array(follower_obs)
    arms: RakudaArmObs = RakudaArmObs(leader=leader_obs_np, follower=follower_obs_np)

    camera_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for cam_name, frames in camera_obs.items():
        if frames and all(frame is not None for frame in frames):
            camera_obs_np[cam_name] = np.array(frames)
        else:
            camera_obs_np[cam_name] = None

    tactile_obs_np: Dict[str, NDArray[np.float32] | None] = {}
    for tac_name, frames in tactile_obs.items():
        if frames and all(frame is not None for frame in frames):
            tactile_obs_np[tac_name] = np.array(frames).transpose(0, 3, 1, 2)
        else:
            tactile_obs_np[tac_name] = None
    sensors_obs = RakudaSensorObs(cameras=camera_obs_np, tactile=tactile_obs_np)
    return RakudaObs(arms=arms, sensors=sensors_obs)

RakudaConfig dataclass ¤

RakudaConfig(leader_port: str, follower_port: str, sensors: RakudaSensorParams | None = None, slow_mode: bool = False)

Configuration class for Rakuda robot.

RakudaPairSys ¤

RakudaPairSys(cfg: RakudaConfig)

Bases: Robot

Class representing the Rakuda robotic system with both leader and follower arms.

METHOD DESCRIPTION
connect

Connect to both leader and follower arms.

disconnect

Disconnect from both leader and follower arms.

get_observation

Get the current observation from both arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def __init__(self, cfg: RakudaConfig):
    self.config = cfg
    self._leader = RakudaLeader(cfg)
    self._follower = RakudaFollower(cfg)
    self._is_connected = False
    self._motor_mapping = (
        RAKUDA_MOTOR_MAPPING  # key: leader motor name, value: follower motor name
    )

connect ¤

connect() -> None

Connect to both leader and follower arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def connect(self) -> None:
    """Connect to both leader and follower arms."""
    if self.is_connected:
        logger.info("Successfully connected to both leader and follower arms.")
        return

    try:
        self._leader.connect()
        self._follower.connect()
        logger.info("Successfully connected to both leader and follower arms.")
        print("[cyan]Successfully connected to both leader and follower arms.[/cyan]")
        self._is_connected = True
    except (OSError, IOError, PermissionError) as e:
        logger.error(f"Failed to connect to arms: {e}")
        raise ConnectionError(f"Failed to connect to arms: {e}")
    except (pickle.PickleError, EOFError) as e:
        logger.error(f"Calibration data corrupted: {e}")
        raise ConnectionError(f"Calibration data error: {e}")

disconnect ¤

disconnect() -> None

Disconnect from both leader and follower arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def disconnect(self) -> None:
    """Disconnect from both leader and follower arms."""
    self.leader.disconnect()
    self.follower.disconnect()

get_observation ¤

get_observation() -> RakudaArmObs

Get the current observation from both arms.

Source code in src/robopy/robots/rakuda/rakuda_pair_sys.py
def get_observation(self) -> RakudaArmObs:
    """Get the current observation from both arms."""
    if not self.is_connected:
        raise ConnectionError("RakudaPairSys is not connected. Call connect() first.")

    leader_motor_names = list(self._leader.motors.motors.keys())
    follower_motor_names = list(self._follower.motors.motors.keys())
    leader_obs = self._leader.motors.sync_read(
        XControlTable.PRESENT_POSITION, leader_motor_names
    )

    follower_obs = self._follower.motors.sync_read(
        XControlTable.PRESENT_POSITION, follower_motor_names
    )

    leader_obs_array = np.array(list(leader_obs.values()), dtype=np.float32)
    follower_obs_array = np.array(list(follower_obs.values()), dtype=np.float32)
    return RakudaArmObs(leader=leader_obs_array, follower=follower_obs_array)

RakudaLeader ¤

RakudaLeader(cfg: RakudaConfig)

Bases: RakudaArm

Class representing the leader arm of the Rakuda robotic system.

METHOD DESCRIPTION
connect
disconnect

connect ¤

connect() -> None

disconnect ¤

disconnect() -> None

RakudaFollower ¤

RakudaFollower(cfg: RakudaConfig)

Bases: RakudaArm

Class representing the follower arm of the Rakuda robotic system.

METHOD DESCRIPTION
connect

Connect to the follower arm and enable torque.

disconnect

Disconnect from the follower arm and disable torque.

connect ¤

connect() -> None

Connect to the follower arm and enable torque.

disconnect ¤

disconnect() -> None

Disconnect from the follower arm and disable torque.

KochRobot ¤

KochRobot(cfg: KochConfig)

Bases: ComposedRobot

METHOD DESCRIPTION
connect
disconnect
teleoperation

Start teleoperation mode where leader controls follower.

connect ¤

connect() -> None

disconnect ¤

disconnect() -> None

teleoperation ¤

teleoperation(max_seconds: float) -> None

Start teleoperation mode where leader controls follower.

使用例¤

基本的なRakudaRobotの使用¤

from robopy import RakudaConfig, RakudaRobot

# 設定の作成
config = RakudaConfig(
    leader_port="/dev/ttyUSB0",
    follower_port="/dev/ttyUSB1"
)

# ロボットの作成と接続
robot = RakudaRobot(config)
robot.connect()

try:
    # テレオペレーション
    robot.teleoperation(duration=10)

    # データ記録
    obs = robot.record_parallel(max_frame=1000, fps=30)

finally:
    robot.disconnect()

個別アームの制御¤

from robopy.robots.rakuda import RakudaLeader, RakudaFollower

# 個別アームの作成
leader = RakudaLeader("/dev/ttyUSB0")
follower = RakudaFollower("/dev/ttyUSB1")

leader.connect()
follower.connect()

try:
    # Leaderから観測データを取得
    leader_obs = leader.get_obs()

    # Followerに動作指令を送信
    follower.set_action(leader_obs["action"])

finally:
    leader.disconnect()
    follower.disconnect()