Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Test beep+collision sync using a Marionette-format move played via SDK. | |
| Intermediate test between the direct-control script and the full Marionette | |
| frontend. Creates a synthetic move (JSON + WAV) matching the beep+collision | |
| test, plays it on the robot via reachy_mini SDK's play_move(), and records | |
| with the laptop mic. | |
| This tests the SDK's built-in audio-motion sync mechanism (play_sound + | |
| motion loop) rather than our manual push_audio_sample approach. | |
| Usage: | |
| python tests/test_move_sync.py [--host reachy-mini.local] | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from audio_analysis import detect_beep_onsets, detect_transient_onsets | |
| # Timing — same as test_beep_collision_sync.py | |
| BEEP_TIMES = [1.0, 2.3, 4.0, 6.3, 9.4] | |
| BEEP_COLLISION_OFFSET = 1.0 | |
| COLLISION_TIMES = [t + BEEP_COLLISION_OFFSET for t in BEEP_TIMES] | |
| # Audio | |
| BEEP_FREQ = 2000.0 | |
| BEEP_DURATION = 0.2 | |
| BEEP_AMPLITUDE = 0.9 | |
| ROBOT_SR = 16000 | |
| # Collision | |
| RIGHT_REST = -0.68 | |
| LEFT_REST = -0.1745 | |
| LEFT_COLLISION = 0.70 | |
| HOLD_DURATION = 0.2 | |
| MOTION_SR = 100 # 100Hz motion sampling, standard for Marionette | |
| ROBOT_USER = "pollen" | |
| ROBOT_PYTHON = "/venvs/apps_venv/bin/python" | |
| LAPTOP_SR = 48000 | |
| MIC_DURATION = 30.0 | |
| REMOTE_DIR = "/tmp/sync_test_move" | |
| REMOTE_RESULTS = "/tmp/move_sync_positions.json" | |
| def generate_move_files(tmpdir: Path) -> tuple[Path, Path]: | |
| """Generate Marionette-format JSON + WAV for the beep+collision test.""" | |
| total_duration = max(COLLISION_TIMES) + HOLD_DURATION + 1.0 | |
| dt = 1.0 / MOTION_SR | |
| n_frames = int(total_duration * MOTION_SR) | |
| identity_head = np.eye(4).tolist() | |
| # Build collision timeline | |
| left_targets = np.full(n_frames, LEFT_REST, dtype=np.float64) | |
| for ct in COLLISION_TIMES: | |
| start = int(ct * MOTION_SR) | |
| end = int((ct + HOLD_DURATION) * MOTION_SR) | |
| end = min(end, n_frames) | |
| left_targets[start:end] = LEFT_COLLISION | |
| timestamps = [] | |
| frames = [] | |
| for i in range(n_frames): | |
| t = i * dt | |
| timestamps.append(round(t, 4)) | |
| frames.append({ | |
| "head": identity_head, | |
| "antennas": [float(left_targets[i]), RIGHT_REST], | |
| "body_yaw": 0.0, | |
| "check_collision": False, | |
| }) | |
| move_data = { | |
| "description": "Sync test: beeps + antenna collisions", | |
| "time": timestamps, | |
| "set_target_data": frames, | |
| } | |
| json_path = tmpdir / "sync-test.json" | |
| json_path.write_text(json.dumps(move_data), encoding="utf-8") | |
| # Generate WAV with beeps | |
| n_audio = int(total_duration * ROBOT_SR) | |
| audio = np.zeros(n_audio, dtype=np.float32) | |
| for bt in BEEP_TIMES: | |
| start = int(bt * ROBOT_SR) | |
| n_beep = int(BEEP_DURATION * ROBOT_SR) | |
| if start + n_beep > n_audio: | |
| continue | |
| t_arr = np.arange(n_beep, dtype=np.float32) / ROBOT_SR | |
| beep = BEEP_AMPLITUDE * np.sin(2 * np.pi * BEEP_FREQ * t_arr).astype(np.float32) | |
| fade = int(0.005 * ROBOT_SR) | |
| if fade > 0 and 2 * fade < n_beep: | |
| beep[:fade] *= np.linspace(0, 1, fade, dtype=np.float32) | |
| beep[-fade:] *= np.linspace(1, 0, fade, dtype=np.float32) | |
| audio[start:start + n_beep] += beep | |
| wav_path = tmpdir / "sync-test.wav" | |
| sf.write(str(wav_path), audio, ROBOT_SR) | |
| print(f" Generated move: {n_frames} frames at {MOTION_SR}Hz, {total_duration:.1f}s") | |
| print(f" Generated WAV: {n_audio} samples at {ROBOT_SR}Hz, {len(BEEP_TIMES)} beeps") | |
| return json_path, wav_path | |
| def scp_to_robot(local_path: Path, remote_path: str, host: str) -> None: | |
| target = f"{ROBOT_USER}@{host}:{remote_path}" | |
| result = subprocess.run( | |
| ["scp", "-o", "ConnectTimeout=5", str(local_path), target], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"SCP failed: {result.stderr}") | |
| def scp_from_robot(remote_path: str, local_path: Path, host: str) -> None: | |
| source = f"{ROBOT_USER}@{host}:{remote_path}" | |
| result = subprocess.run( | |
| ["scp", "-o", "ConnectTimeout=5", source, str(local_path)], | |
| capture_output=True, text=True, timeout=15, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"SCP failed: {result.stderr}") | |
| # Robot-side playback script: loads the move, plays via SDK, records positions | |
| ROBOT_PLAY_SCRIPT = """\ | |
| import json, os, sys, time | |
| import numpy as np | |
| from pathlib import Path | |
| move_dir = sys.argv[1] | |
| results_path = sys.argv[2] | |
| print("robot: connecting to ReachyMini", flush=True) | |
| from reachy_mini import ReachyMini | |
| from reachy_mini.motion.recorded_move import RecordedMove | |
| r = ReachyMini() | |
| # Load move | |
| json_path = Path(move_dir) / "sync-test.json" | |
| wav_path = Path(move_dir) / "sync-test.wav" | |
| move_data = json.loads(json_path.read_text()) | |
| sound_path = wav_path if wav_path.exists() else None | |
| move = RecordedMove(move_data, sound_path=sound_path) | |
| print(f"robot: loaded move: {move.duration:.1f}s, sound={sound_path is not None}", flush=True) | |
| # Record present positions during playback at 50Hz | |
| # We do this in a background thread while play_move runs | |
| import threading | |
| timestamps = [] | |
| left_present = [] | |
| right_present = [] | |
| recording = True | |
| def record_positions(): | |
| t0 = time.monotonic() | |
| while recording: | |
| pos = r.get_present_antenna_joint_positions() | |
| timestamps.append(time.monotonic() - t0) | |
| left_present.append(pos[0]) | |
| right_present.append(pos[1]) | |
| time.sleep(0.02) # 50Hz | |
| recorder = threading.Thread(target=record_positions, daemon=True) | |
| print("robot: MARK_START", flush=True) | |
| recorder.start() | |
| r.play_move(move, initial_goto_duration=1.0) | |
| recording = False | |
| recorder.join(timeout=1.0) | |
| print(f"robot: playback done, recorded {len(timestamps)} position samples", flush=True) | |
| # Save results | |
| results = { | |
| "beep_times": move_data.get("_beep_times", []), | |
| "collision_times": move_data.get("_collision_times", []), | |
| "timestamps": timestamps, | |
| "left_present": left_present, | |
| "right_present": right_present, | |
| } | |
| with open(results_path, "w") as f: | |
| json.dump(results, f) | |
| print(f"robot: saved to {results_path}", flush=True) | |
| print("robot: done", flush=True) | |
| os._exit(0) | |
| """ | |
| def start_robot(host: str) -> subprocess.Popen: | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: | |
| f.write(ROBOT_PLAY_SCRIPT) | |
| local_script = Path(f.name) | |
| remote_script = "/tmp/move_sync_play.py" | |
| try: | |
| target = f"{ROBOT_USER}@{host}:{remote_script}" | |
| subprocess.run( | |
| ["scp", "-o", "ConnectTimeout=5", str(local_script), target], | |
| capture_output=True, text=True, timeout=15, check=True, | |
| ) | |
| finally: | |
| local_script.unlink() | |
| args_str = f"{ROBOT_PYTHON} {remote_script} {REMOTE_DIR} {REMOTE_RESULTS}" | |
| proc = subprocess.Popen( | |
| ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{host}", args_str], | |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, | |
| ) | |
| return proc | |
| def plot_combined( | |
| mic_audio, mic_sr, mic_start, mark_start, | |
| robot_data, detected_beeps, detected_collisions, pairs, | |
| output_path, | |
| ): | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| mic_t = np.arange(len(mic_audio)) / mic_sr | |
| robot_offset = mark_start - mic_start | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(18, 10), sharex=True) | |
| # Mic waveform | |
| ax1.plot(mic_t, mic_audio, "k-", linewidth=0.3, alpha=0.5) | |
| ax1.set_ylabel("Mic amplitude") | |
| ax1.set_title("Move Sync Test (SDK play_move) — Laptop Mic Recording") | |
| ax1.grid(True, alpha=0.3) | |
| for i, bt in enumerate(detected_beeps): | |
| ax1.axvline(bt, color="blue", linestyle="-", linewidth=1.2, alpha=0.7, | |
| label="Detected beep" if i == 0 else None) | |
| for i, ct in enumerate(detected_collisions): | |
| ax1.axvline(ct, color="red", linestyle="-", linewidth=1.2, alpha=0.7, | |
| label="Detected collision" if i == 0 else None) | |
| for p in pairs: | |
| mid = (p["beep_mic_t"] + p["collision_mic_t"]) / 2 | |
| ax1.annotate(f'{p["interval_ms"]:.0f}ms', xy=(mid, 0), | |
| ha="center", fontsize=9, color="purple", fontweight="bold", | |
| bbox=dict(boxstyle="round,pad=0.2", facecolor="lightyellow", alpha=0.8)) | |
| ax1.legend(loc="upper right", fontsize=9) | |
| # Robot trajectory | |
| if robot_data.get("timestamps"): | |
| robot_ts = np.array(robot_data["timestamps"]) | |
| left_pos = np.array(robot_data["left_present"]) | |
| right_pos = np.array(robot_data["right_present"]) | |
| # Note: robot timestamps start from MARK_START, which includes goto_duration | |
| ax2.plot(robot_ts + robot_offset, left_pos, "b-", linewidth=1.5, | |
| label="Left antenna (present)") | |
| ax2.plot(robot_ts + robot_offset, right_pos, "r-", linewidth=1.5, | |
| label="Right antenna (present)") | |
| # Command times (approximate, via MARK_START) | |
| for i, bt in enumerate(BEEP_TIMES): | |
| # Beep times are relative to move start, not MARK_START | |
| # play_move does a 1.0s goto first, so beeps start ~1s after MARK_START | |
| mic_bt = robot_offset + 1.0 + bt # +1.0 for initial_goto_duration | |
| ax2.axvline(mic_bt, color="blue", linestyle="--", linewidth=0.8, alpha=0.4, | |
| label="Beep cmd (+goto)" if i == 0 else None) | |
| for i, ct in enumerate(COLLISION_TIMES): | |
| mic_ct = robot_offset + 1.0 + ct | |
| ax2.axvline(mic_ct, color="red", linestyle="--", linewidth=0.8, alpha=0.4, | |
| label="Collision cmd (+goto)" if i == 0 else None) | |
| for bt in detected_beeps: | |
| ax2.axvline(bt, color="blue", linestyle="-", linewidth=0.8, alpha=0.4) | |
| for ct in detected_collisions: | |
| ax2.axvline(ct, color="red", linestyle="-", linewidth=0.8, alpha=0.4) | |
| ax2.set_xlabel("Time since mic start (s)") | |
| ax2.set_ylabel("Position (rad)") | |
| ax2.set_title("Robot Antenna Trajectory (aligned to mic clock)") | |
| ax2.legend(loc="upper right", fontsize=9) | |
| ax2.grid(True, alpha=0.3) | |
| # Auto-zoom to active region | |
| all_events = detected_beeps + detected_collisions | |
| if all_events: | |
| ax1.set_xlim(min(all_events) - 1.0, max(all_events) + 1.0) | |
| fig.tight_layout() | |
| fig.savefig(str(output_path), dpi=150) | |
| plt.close(fig) | |
| print(f" Plot saved to {output_path}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Move sync test via SDK play_move") | |
| parser.add_argument("--host", default="reachy-mini.local") | |
| args = parser.parse_args() | |
| print(f"\n{'='*60}") | |
| print("Move Sync Test (SDK play_move)") | |
| print(f"{'='*60}") | |
| print(f" Beep times: {BEEP_TIMES}") | |
| print(f" Collision times: {COLLISION_TIMES}") | |
| print(f" Expected interval: {BEEP_COLLISION_OFFSET:.1f}s\n") | |
| # Step 1: Stop running apps | |
| print("[1/6] Stopping any running app...") | |
| subprocess.run( | |
| ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}", | |
| "curl -sf -X POST http://127.0.0.1:8000/api/apps/stop-current-app >/dev/null 2>&1 || true"], | |
| capture_output=True, timeout=10, | |
| ) | |
| time.sleep(1) | |
| # Step 2: Generate move files | |
| print("[2/6] Generating Marionette-format move...") | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| tmpdir = Path(tmpdir) | |
| json_path, wav_path = generate_move_files(tmpdir) | |
| # Embed beep/collision times in JSON for the robot script to pass back | |
| move_data = json.loads(json_path.read_text()) | |
| move_data["_beep_times"] = BEEP_TIMES | |
| move_data["_collision_times"] = COLLISION_TIMES | |
| json_path.write_text(json.dumps(move_data)) | |
| # Step 3: SCP to robot | |
| print("[3/6] Copying move files to robot...") | |
| subprocess.run( | |
| ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}", | |
| f"mkdir -p {REMOTE_DIR}"], | |
| capture_output=True, timeout=10, | |
| ) | |
| scp_to_robot(json_path, f"{REMOTE_DIR}/sync-test.json", args.host) | |
| scp_to_robot(wav_path, f"{REMOTE_DIR}/sync-test.wav", args.host) | |
| print(" Done") | |
| # Step 4: Start mic recording | |
| print(f"[4/6] Starting mic recording ({MIC_DURATION}s)...") | |
| mic_start = time.monotonic() | |
| mic_data = sd.rec( | |
| int(MIC_DURATION * LAPTOP_SR), | |
| samplerate=LAPTOP_SR, channels=1, dtype="float32", | |
| ) | |
| # Step 5: Start robot playback | |
| time.sleep(0.3) | |
| print("[5/6] Starting SDK play_move on robot...") | |
| proc = start_robot(args.host) | |
| mark_start = None | |
| print("\n--- Robot output ---") | |
| for line in iter(proc.stdout.readline, ""): | |
| line = line.rstrip() | |
| if not line: | |
| continue | |
| laptop_time = time.monotonic() | |
| print(f" {line}") | |
| if "MARK_START" in line: | |
| mark_start = laptop_time | |
| proc.wait() | |
| print("--- End robot output ---") | |
| sd.wait() | |
| captured = mic_data.flatten() | |
| print(f"\n Mic recording done") | |
| if mark_start is None: | |
| print("\nFAILED: Never received MARK_START") | |
| return 1 | |
| robot_offset = mark_start - mic_start | |
| print(f" MARK_START at mic_t={robot_offset:.3f}s") | |
| mic_path = Path("tests/move_sync_mic.wav") | |
| sf.write(str(mic_path), captured, LAPTOP_SR) | |
| print(f" Saved mic to {mic_path}") | |
| # Fetch robot position data | |
| print("\n[6/6] Fetching robot data + analyzing...") | |
| local_results = Path("tests/move_sync_positions.json") | |
| scp_from_robot(REMOTE_RESULTS, local_results, args.host) | |
| with open(local_results) as f: | |
| robot_data = json.load(f) | |
| # Detect beeps and collisions | |
| detected_beeps = detect_beep_onsets( | |
| captured, LAPTOP_SR, freq=BEEP_FREQ, bandwidth=150.0, threshold_db=-12.0, | |
| min_separation=1.0, | |
| ) | |
| detected_collisions = detect_transient_onsets( | |
| captured, LAPTOP_SR, highpass_freq=3000.0, | |
| ) | |
| print(f" Detected {len(detected_beeps)} beeps at: " | |
| f"{[f'{t:.3f}' for t in detected_beeps]}") | |
| print(f" Detected {len(detected_collisions)} collisions at: " | |
| f"{[f'{t:.3f}' for t in detected_collisions]}") | |
| # Match pairs | |
| print(f"\n{'='*60}") | |
| print("Beep → Collision Interval Analysis (SDK play_move)") | |
| print(f" (Expected interval: {BEEP_COLLISION_OFFSET*1000:.0f}ms)") | |
| print(f"{'='*60}") | |
| pairs = [] | |
| for i, bt in enumerate(detected_beeps): | |
| candidates = [ct for ct in detected_collisions if 0.3 < (ct - bt) < 2.0] | |
| if not candidates: | |
| print(f" Beep {i+1} at {bt:.3f}s: NO COLLISION FOUND") | |
| continue | |
| nearest = min(candidates, key=lambda ct: abs((ct - bt) - BEEP_COLLISION_OFFSET)) | |
| interval_ms = (nearest - bt) * 1000 | |
| error_ms = interval_ms - BEEP_COLLISION_OFFSET * 1000 | |
| pairs.append({ | |
| "beep_mic_t": bt, | |
| "collision_mic_t": nearest, | |
| "interval_ms": interval_ms, | |
| "error_ms": error_ms, | |
| }) | |
| print(f" Pair {len(pairs)}: beep {bt:.3f}s → collision {nearest:.3f}s = " | |
| f"{interval_ms:.0f}ms (error {error_ms:+.0f}ms)") | |
| if pairs: | |
| errors = [p["error_ms"] for p in pairs] | |
| intervals = [p["interval_ms"] for p in pairs] | |
| print(f"\n Pairs matched: {len(pairs)}/{len(BEEP_TIMES)}") | |
| print(f" Mean interval: {np.mean(intervals):.0f}ms (expected {BEEP_COLLISION_OFFSET*1000:.0f}ms)") | |
| print(f" Mean error: {np.mean(errors):+.0f}ms") | |
| print(f" Std error: {np.std(errors):.0f}ms") | |
| print(f" Min/Max error: {min(errors):+.0f}ms / {max(errors):+.0f}ms") | |
| plot_path = Path("tests/move_sync_plot.png") | |
| plot_combined( | |
| captured, LAPTOP_SR, mic_start, mark_start, | |
| robot_data, detected_beeps, detected_collisions, pairs, plot_path, | |
| ) | |
| success = len(pairs) >= len(BEEP_TIMES) - 1 | |
| print(f"\n{'='*60}") | |
| if success: | |
| print("RESULT: PASS — Beep-collision pairs detected via SDK play_move") | |
| else: | |
| print("RESULT: FAIL — Could not reliably detect pairs") | |
| print(f"{'='*60}\n") | |
| return 0 if success else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |