#!/usr/bin/env python3 """Test beep+collision sync using a Marionette-format move played via SDK. Intermediate test between the direct-control script and the full Marionette frontend. Creates a synthetic move (JSON + WAV) matching the beep+collision test, plays it on the robot via reachy_mini SDK's play_move(), and records with the laptop mic. This tests the SDK's built-in audio-motion sync mechanism (play_sound + motion loop) rather than our manual push_audio_sample approach. Usage: python tests/test_move_sync.py [--host reachy-mini.local] """ from __future__ import annotations import argparse import json import subprocess import sys import tempfile import time from pathlib import Path import numpy as np import sounddevice as sd import soundfile as sf sys.path.insert(0, str(Path(__file__).parent)) from audio_analysis import detect_beep_onsets, detect_transient_onsets # Timing — same as test_beep_collision_sync.py BEEP_TIMES = [1.0, 2.3, 4.0, 6.3, 9.4] BEEP_COLLISION_OFFSET = 1.0 COLLISION_TIMES = [t + BEEP_COLLISION_OFFSET for t in BEEP_TIMES] # Audio BEEP_FREQ = 2000.0 BEEP_DURATION = 0.2 BEEP_AMPLITUDE = 0.9 ROBOT_SR = 16000 # Collision RIGHT_REST = -0.68 LEFT_REST = -0.1745 LEFT_COLLISION = 0.70 HOLD_DURATION = 0.2 MOTION_SR = 100 # 100Hz motion sampling, standard for Marionette ROBOT_USER = "pollen" ROBOT_PYTHON = "/venvs/apps_venv/bin/python" LAPTOP_SR = 48000 MIC_DURATION = 30.0 REMOTE_DIR = "/tmp/sync_test_move" REMOTE_RESULTS = "/tmp/move_sync_positions.json" def generate_move_files(tmpdir: Path) -> tuple[Path, Path]: """Generate Marionette-format JSON + WAV for the beep+collision test.""" total_duration = max(COLLISION_TIMES) + HOLD_DURATION + 1.0 dt = 1.0 / MOTION_SR n_frames = int(total_duration * MOTION_SR) identity_head = np.eye(4).tolist() # Build collision timeline left_targets = np.full(n_frames, LEFT_REST, dtype=np.float64) for ct in COLLISION_TIMES: start = int(ct * MOTION_SR) end = int((ct + HOLD_DURATION) * MOTION_SR) end = min(end, n_frames) left_targets[start:end] = LEFT_COLLISION timestamps = [] frames = [] for i in range(n_frames): t = i * dt timestamps.append(round(t, 4)) frames.append({ "head": identity_head, "antennas": [float(left_targets[i]), RIGHT_REST], "body_yaw": 0.0, "check_collision": False, }) move_data = { "description": "Sync test: beeps + antenna collisions", "time": timestamps, "set_target_data": frames, } json_path = tmpdir / "sync-test.json" json_path.write_text(json.dumps(move_data), encoding="utf-8") # Generate WAV with beeps n_audio = int(total_duration * ROBOT_SR) audio = np.zeros(n_audio, dtype=np.float32) for bt in BEEP_TIMES: start = int(bt * ROBOT_SR) n_beep = int(BEEP_DURATION * ROBOT_SR) if start + n_beep > n_audio: continue t_arr = np.arange(n_beep, dtype=np.float32) / ROBOT_SR beep = BEEP_AMPLITUDE * np.sin(2 * np.pi * BEEP_FREQ * t_arr).astype(np.float32) fade = int(0.005 * ROBOT_SR) if fade > 0 and 2 * fade < n_beep: beep[:fade] *= np.linspace(0, 1, fade, dtype=np.float32) beep[-fade:] *= np.linspace(1, 0, fade, dtype=np.float32) audio[start:start + n_beep] += beep wav_path = tmpdir / "sync-test.wav" sf.write(str(wav_path), audio, ROBOT_SR) print(f" Generated move: {n_frames} frames at {MOTION_SR}Hz, {total_duration:.1f}s") print(f" Generated WAV: {n_audio} samples at {ROBOT_SR}Hz, {len(BEEP_TIMES)} beeps") return json_path, wav_path def scp_to_robot(local_path: Path, remote_path: str, host: str) -> None: target = f"{ROBOT_USER}@{host}:{remote_path}" result = subprocess.run( ["scp", "-o", "ConnectTimeout=5", str(local_path), target], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: raise RuntimeError(f"SCP failed: {result.stderr}") def scp_from_robot(remote_path: str, local_path: Path, host: str) -> None: source = f"{ROBOT_USER}@{host}:{remote_path}" result = subprocess.run( ["scp", "-o", "ConnectTimeout=5", source, str(local_path)], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: raise RuntimeError(f"SCP failed: {result.stderr}") # Robot-side playback script: loads the move, plays via SDK, records positions ROBOT_PLAY_SCRIPT = """\ import json, os, sys, time import numpy as np from pathlib import Path move_dir = sys.argv[1] results_path = sys.argv[2] print("robot: connecting to ReachyMini", flush=True) from reachy_mini import ReachyMini from reachy_mini.motion.recorded_move import RecordedMove r = ReachyMini() # Load move json_path = Path(move_dir) / "sync-test.json" wav_path = Path(move_dir) / "sync-test.wav" move_data = json.loads(json_path.read_text()) sound_path = wav_path if wav_path.exists() else None move = RecordedMove(move_data, sound_path=sound_path) print(f"robot: loaded move: {move.duration:.1f}s, sound={sound_path is not None}", flush=True) # Record present positions during playback at 50Hz # We do this in a background thread while play_move runs import threading timestamps = [] left_present = [] right_present = [] recording = True def record_positions(): t0 = time.monotonic() while recording: pos = r.get_present_antenna_joint_positions() timestamps.append(time.monotonic() - t0) left_present.append(pos[0]) right_present.append(pos[1]) time.sleep(0.02) # 50Hz recorder = threading.Thread(target=record_positions, daemon=True) print("robot: MARK_START", flush=True) recorder.start() r.play_move(move, initial_goto_duration=1.0) recording = False recorder.join(timeout=1.0) print(f"robot: playback done, recorded {len(timestamps)} position samples", flush=True) # Save results results = { "beep_times": move_data.get("_beep_times", []), "collision_times": move_data.get("_collision_times", []), "timestamps": timestamps, "left_present": left_present, "right_present": right_present, } with open(results_path, "w") as f: json.dump(results, f) print(f"robot: saved to {results_path}", flush=True) print("robot: done", flush=True) os._exit(0) """ def start_robot(host: str) -> subprocess.Popen: with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write(ROBOT_PLAY_SCRIPT) local_script = Path(f.name) remote_script = "/tmp/move_sync_play.py" try: target = f"{ROBOT_USER}@{host}:{remote_script}" subprocess.run( ["scp", "-o", "ConnectTimeout=5", str(local_script), target], capture_output=True, text=True, timeout=15, check=True, ) finally: local_script.unlink() args_str = f"{ROBOT_PYTHON} {remote_script} {REMOTE_DIR} {REMOTE_RESULTS}" proc = subprocess.Popen( ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{host}", args_str], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) return proc def plot_combined( mic_audio, mic_sr, mic_start, mark_start, robot_data, detected_beeps, detected_collisions, pairs, output_path, ): import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt mic_t = np.arange(len(mic_audio)) / mic_sr robot_offset = mark_start - mic_start fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(18, 10), sharex=True) # Mic waveform ax1.plot(mic_t, mic_audio, "k-", linewidth=0.3, alpha=0.5) ax1.set_ylabel("Mic amplitude") ax1.set_title("Move Sync Test (SDK play_move) — Laptop Mic Recording") ax1.grid(True, alpha=0.3) for i, bt in enumerate(detected_beeps): ax1.axvline(bt, color="blue", linestyle="-", linewidth=1.2, alpha=0.7, label="Detected beep" if i == 0 else None) for i, ct in enumerate(detected_collisions): ax1.axvline(ct, color="red", linestyle="-", linewidth=1.2, alpha=0.7, label="Detected collision" if i == 0 else None) for p in pairs: mid = (p["beep_mic_t"] + p["collision_mic_t"]) / 2 ax1.annotate(f'{p["interval_ms"]:.0f}ms', xy=(mid, 0), ha="center", fontsize=9, color="purple", fontweight="bold", bbox=dict(boxstyle="round,pad=0.2", facecolor="lightyellow", alpha=0.8)) ax1.legend(loc="upper right", fontsize=9) # Robot trajectory if robot_data.get("timestamps"): robot_ts = np.array(robot_data["timestamps"]) left_pos = np.array(robot_data["left_present"]) right_pos = np.array(robot_data["right_present"]) # Note: robot timestamps start from MARK_START, which includes goto_duration ax2.plot(robot_ts + robot_offset, left_pos, "b-", linewidth=1.5, label="Left antenna (present)") ax2.plot(robot_ts + robot_offset, right_pos, "r-", linewidth=1.5, label="Right antenna (present)") # Command times (approximate, via MARK_START) for i, bt in enumerate(BEEP_TIMES): # Beep times are relative to move start, not MARK_START # play_move does a 1.0s goto first, so beeps start ~1s after MARK_START mic_bt = robot_offset + 1.0 + bt # +1.0 for initial_goto_duration ax2.axvline(mic_bt, color="blue", linestyle="--", linewidth=0.8, alpha=0.4, label="Beep cmd (+goto)" if i == 0 else None) for i, ct in enumerate(COLLISION_TIMES): mic_ct = robot_offset + 1.0 + ct ax2.axvline(mic_ct, color="red", linestyle="--", linewidth=0.8, alpha=0.4, label="Collision cmd (+goto)" if i == 0 else None) for bt in detected_beeps: ax2.axvline(bt, color="blue", linestyle="-", linewidth=0.8, alpha=0.4) for ct in detected_collisions: ax2.axvline(ct, color="red", linestyle="-", linewidth=0.8, alpha=0.4) ax2.set_xlabel("Time since mic start (s)") ax2.set_ylabel("Position (rad)") ax2.set_title("Robot Antenna Trajectory (aligned to mic clock)") ax2.legend(loc="upper right", fontsize=9) ax2.grid(True, alpha=0.3) # Auto-zoom to active region all_events = detected_beeps + detected_collisions if all_events: ax1.set_xlim(min(all_events) - 1.0, max(all_events) + 1.0) fig.tight_layout() fig.savefig(str(output_path), dpi=150) plt.close(fig) print(f" Plot saved to {output_path}") def main(): parser = argparse.ArgumentParser(description="Move sync test via SDK play_move") parser.add_argument("--host", default="reachy-mini.local") args = parser.parse_args() print(f"\n{'='*60}") print("Move Sync Test (SDK play_move)") print(f"{'='*60}") print(f" Beep times: {BEEP_TIMES}") print(f" Collision times: {COLLISION_TIMES}") print(f" Expected interval: {BEEP_COLLISION_OFFSET:.1f}s\n") # Step 1: Stop running apps print("[1/6] Stopping any running app...") subprocess.run( ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}", "curl -sf -X POST http://127.0.0.1:8000/api/apps/stop-current-app >/dev/null 2>&1 || true"], capture_output=True, timeout=10, ) time.sleep(1) # Step 2: Generate move files print("[2/6] Generating Marionette-format move...") with tempfile.TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) json_path, wav_path = generate_move_files(tmpdir) # Embed beep/collision times in JSON for the robot script to pass back move_data = json.loads(json_path.read_text()) move_data["_beep_times"] = BEEP_TIMES move_data["_collision_times"] = COLLISION_TIMES json_path.write_text(json.dumps(move_data)) # Step 3: SCP to robot print("[3/6] Copying move files to robot...") subprocess.run( ["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}", f"mkdir -p {REMOTE_DIR}"], capture_output=True, timeout=10, ) scp_to_robot(json_path, f"{REMOTE_DIR}/sync-test.json", args.host) scp_to_robot(wav_path, f"{REMOTE_DIR}/sync-test.wav", args.host) print(" Done") # Step 4: Start mic recording print(f"[4/6] Starting mic recording ({MIC_DURATION}s)...") mic_start = time.monotonic() mic_data = sd.rec( int(MIC_DURATION * LAPTOP_SR), samplerate=LAPTOP_SR, channels=1, dtype="float32", ) # Step 5: Start robot playback time.sleep(0.3) print("[5/6] Starting SDK play_move on robot...") proc = start_robot(args.host) mark_start = None print("\n--- Robot output ---") for line in iter(proc.stdout.readline, ""): line = line.rstrip() if not line: continue laptop_time = time.monotonic() print(f" {line}") if "MARK_START" in line: mark_start = laptop_time proc.wait() print("--- End robot output ---") sd.wait() captured = mic_data.flatten() print(f"\n Mic recording done") if mark_start is None: print("\nFAILED: Never received MARK_START") return 1 robot_offset = mark_start - mic_start print(f" MARK_START at mic_t={robot_offset:.3f}s") mic_path = Path("tests/move_sync_mic.wav") sf.write(str(mic_path), captured, LAPTOP_SR) print(f" Saved mic to {mic_path}") # Fetch robot position data print("\n[6/6] Fetching robot data + analyzing...") local_results = Path("tests/move_sync_positions.json") scp_from_robot(REMOTE_RESULTS, local_results, args.host) with open(local_results) as f: robot_data = json.load(f) # Detect beeps and collisions detected_beeps = detect_beep_onsets( captured, LAPTOP_SR, freq=BEEP_FREQ, bandwidth=150.0, threshold_db=-12.0, min_separation=1.0, ) detected_collisions = detect_transient_onsets( captured, LAPTOP_SR, highpass_freq=3000.0, ) print(f" Detected {len(detected_beeps)} beeps at: " f"{[f'{t:.3f}' for t in detected_beeps]}") print(f" Detected {len(detected_collisions)} collisions at: " f"{[f'{t:.3f}' for t in detected_collisions]}") # Match pairs print(f"\n{'='*60}") print("Beep → Collision Interval Analysis (SDK play_move)") print(f" (Expected interval: {BEEP_COLLISION_OFFSET*1000:.0f}ms)") print(f"{'='*60}") pairs = [] for i, bt in enumerate(detected_beeps): candidates = [ct for ct in detected_collisions if 0.3 < (ct - bt) < 2.0] if not candidates: print(f" Beep {i+1} at {bt:.3f}s: NO COLLISION FOUND") continue nearest = min(candidates, key=lambda ct: abs((ct - bt) - BEEP_COLLISION_OFFSET)) interval_ms = (nearest - bt) * 1000 error_ms = interval_ms - BEEP_COLLISION_OFFSET * 1000 pairs.append({ "beep_mic_t": bt, "collision_mic_t": nearest, "interval_ms": interval_ms, "error_ms": error_ms, }) print(f" Pair {len(pairs)}: beep {bt:.3f}s → collision {nearest:.3f}s = " f"{interval_ms:.0f}ms (error {error_ms:+.0f}ms)") if pairs: errors = [p["error_ms"] for p in pairs] intervals = [p["interval_ms"] for p in pairs] print(f"\n Pairs matched: {len(pairs)}/{len(BEEP_TIMES)}") print(f" Mean interval: {np.mean(intervals):.0f}ms (expected {BEEP_COLLISION_OFFSET*1000:.0f}ms)") print(f" Mean error: {np.mean(errors):+.0f}ms") print(f" Std error: {np.std(errors):.0f}ms") print(f" Min/Max error: {min(errors):+.0f}ms / {max(errors):+.0f}ms") plot_path = Path("tests/move_sync_plot.png") plot_combined( captured, LAPTOP_SR, mic_start, mark_start, robot_data, detected_beeps, detected_collisions, pairs, plot_path, ) success = len(pairs) >= len(BEEP_TIMES) - 1 print(f"\n{'='*60}") if success: print("RESULT: PASS — Beep-collision pairs detected via SDK play_move") else: print("RESULT: FAIL — Could not reliably detect pairs") print(f"{'='*60}\n") return 0 if success else 1 if __name__ == "__main__": sys.exit(main())