marionette / tests /test_move_sync.py
RemiFabre
Use new antenna neutral position [-0.1745, 0.1745] to reduce shaking
263b53b
#!/usr/bin/env python3
"""Test beep+collision sync using a Marionette-format move played via SDK.
Intermediate test between the direct-control script and the full Marionette
frontend. Creates a synthetic move (JSON + WAV) matching the beep+collision
test, plays it on the robot via reachy_mini SDK's play_move(), and records
with the laptop mic.
This tests the SDK's built-in audio-motion sync mechanism (play_sound +
motion loop) rather than our manual push_audio_sample approach.
Usage:
python tests/test_move_sync.py [--host reachy-mini.local]
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import numpy as np
import sounddevice as sd
import soundfile as sf
sys.path.insert(0, str(Path(__file__).parent))
from audio_analysis import detect_beep_onsets, detect_transient_onsets
# Timing — same as test_beep_collision_sync.py
BEEP_TIMES = [1.0, 2.3, 4.0, 6.3, 9.4]
BEEP_COLLISION_OFFSET = 1.0
COLLISION_TIMES = [t + BEEP_COLLISION_OFFSET for t in BEEP_TIMES]
# Audio
BEEP_FREQ = 2000.0
BEEP_DURATION = 0.2
BEEP_AMPLITUDE = 0.9
ROBOT_SR = 16000
# Collision
RIGHT_REST = -0.68
LEFT_REST = -0.1745
LEFT_COLLISION = 0.70
HOLD_DURATION = 0.2
MOTION_SR = 100 # 100Hz motion sampling, standard for Marionette
ROBOT_USER = "pollen"
ROBOT_PYTHON = "/venvs/apps_venv/bin/python"
LAPTOP_SR = 48000
MIC_DURATION = 30.0
REMOTE_DIR = "/tmp/sync_test_move"
REMOTE_RESULTS = "/tmp/move_sync_positions.json"
def generate_move_files(tmpdir: Path) -> tuple[Path, Path]:
"""Generate Marionette-format JSON + WAV for the beep+collision test."""
total_duration = max(COLLISION_TIMES) + HOLD_DURATION + 1.0
dt = 1.0 / MOTION_SR
n_frames = int(total_duration * MOTION_SR)
identity_head = np.eye(4).tolist()
# Build collision timeline
left_targets = np.full(n_frames, LEFT_REST, dtype=np.float64)
for ct in COLLISION_TIMES:
start = int(ct * MOTION_SR)
end = int((ct + HOLD_DURATION) * MOTION_SR)
end = min(end, n_frames)
left_targets[start:end] = LEFT_COLLISION
timestamps = []
frames = []
for i in range(n_frames):
t = i * dt
timestamps.append(round(t, 4))
frames.append({
"head": identity_head,
"antennas": [float(left_targets[i]), RIGHT_REST],
"body_yaw": 0.0,
"check_collision": False,
})
move_data = {
"description": "Sync test: beeps + antenna collisions",
"time": timestamps,
"set_target_data": frames,
}
json_path = tmpdir / "sync-test.json"
json_path.write_text(json.dumps(move_data), encoding="utf-8")
# Generate WAV with beeps
n_audio = int(total_duration * ROBOT_SR)
audio = np.zeros(n_audio, dtype=np.float32)
for bt in BEEP_TIMES:
start = int(bt * ROBOT_SR)
n_beep = int(BEEP_DURATION * ROBOT_SR)
if start + n_beep > n_audio:
continue
t_arr = np.arange(n_beep, dtype=np.float32) / ROBOT_SR
beep = BEEP_AMPLITUDE * np.sin(2 * np.pi * BEEP_FREQ * t_arr).astype(np.float32)
fade = int(0.005 * ROBOT_SR)
if fade > 0 and 2 * fade < n_beep:
beep[:fade] *= np.linspace(0, 1, fade, dtype=np.float32)
beep[-fade:] *= np.linspace(1, 0, fade, dtype=np.float32)
audio[start:start + n_beep] += beep
wav_path = tmpdir / "sync-test.wav"
sf.write(str(wav_path), audio, ROBOT_SR)
print(f" Generated move: {n_frames} frames at {MOTION_SR}Hz, {total_duration:.1f}s")
print(f" Generated WAV: {n_audio} samples at {ROBOT_SR}Hz, {len(BEEP_TIMES)} beeps")
return json_path, wav_path
def scp_to_robot(local_path: Path, remote_path: str, host: str) -> None:
target = f"{ROBOT_USER}@{host}:{remote_path}"
result = subprocess.run(
["scp", "-o", "ConnectTimeout=5", str(local_path), target],
capture_output=True, text=True, timeout=15,
)
if result.returncode != 0:
raise RuntimeError(f"SCP failed: {result.stderr}")
def scp_from_robot(remote_path: str, local_path: Path, host: str) -> None:
source = f"{ROBOT_USER}@{host}:{remote_path}"
result = subprocess.run(
["scp", "-o", "ConnectTimeout=5", source, str(local_path)],
capture_output=True, text=True, timeout=15,
)
if result.returncode != 0:
raise RuntimeError(f"SCP failed: {result.stderr}")
# Robot-side playback script: loads the move, plays via SDK, records positions
ROBOT_PLAY_SCRIPT = """\
import json, os, sys, time
import numpy as np
from pathlib import Path
move_dir = sys.argv[1]
results_path = sys.argv[2]
print("robot: connecting to ReachyMini", flush=True)
from reachy_mini import ReachyMini
from reachy_mini.motion.recorded_move import RecordedMove
r = ReachyMini()
# Load move
json_path = Path(move_dir) / "sync-test.json"
wav_path = Path(move_dir) / "sync-test.wav"
move_data = json.loads(json_path.read_text())
sound_path = wav_path if wav_path.exists() else None
move = RecordedMove(move_data, sound_path=sound_path)
print(f"robot: loaded move: {move.duration:.1f}s, sound={sound_path is not None}", flush=True)
# Record present positions during playback at 50Hz
# We do this in a background thread while play_move runs
import threading
timestamps = []
left_present = []
right_present = []
recording = True
def record_positions():
t0 = time.monotonic()
while recording:
pos = r.get_present_antenna_joint_positions()
timestamps.append(time.monotonic() - t0)
left_present.append(pos[0])
right_present.append(pos[1])
time.sleep(0.02) # 50Hz
recorder = threading.Thread(target=record_positions, daemon=True)
print("robot: MARK_START", flush=True)
recorder.start()
r.play_move(move, initial_goto_duration=1.0)
recording = False
recorder.join(timeout=1.0)
print(f"robot: playback done, recorded {len(timestamps)} position samples", flush=True)
# Save results
results = {
"beep_times": move_data.get("_beep_times", []),
"collision_times": move_data.get("_collision_times", []),
"timestamps": timestamps,
"left_present": left_present,
"right_present": right_present,
}
with open(results_path, "w") as f:
json.dump(results, f)
print(f"robot: saved to {results_path}", flush=True)
print("robot: done", flush=True)
os._exit(0)
"""
def start_robot(host: str) -> subprocess.Popen:
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(ROBOT_PLAY_SCRIPT)
local_script = Path(f.name)
remote_script = "/tmp/move_sync_play.py"
try:
target = f"{ROBOT_USER}@{host}:{remote_script}"
subprocess.run(
["scp", "-o", "ConnectTimeout=5", str(local_script), target],
capture_output=True, text=True, timeout=15, check=True,
)
finally:
local_script.unlink()
args_str = f"{ROBOT_PYTHON} {remote_script} {REMOTE_DIR} {REMOTE_RESULTS}"
proc = subprocess.Popen(
["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{host}", args_str],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
)
return proc
def plot_combined(
mic_audio, mic_sr, mic_start, mark_start,
robot_data, detected_beeps, detected_collisions, pairs,
output_path,
):
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
mic_t = np.arange(len(mic_audio)) / mic_sr
robot_offset = mark_start - mic_start
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(18, 10), sharex=True)
# Mic waveform
ax1.plot(mic_t, mic_audio, "k-", linewidth=0.3, alpha=0.5)
ax1.set_ylabel("Mic amplitude")
ax1.set_title("Move Sync Test (SDK play_move) — Laptop Mic Recording")
ax1.grid(True, alpha=0.3)
for i, bt in enumerate(detected_beeps):
ax1.axvline(bt, color="blue", linestyle="-", linewidth=1.2, alpha=0.7,
label="Detected beep" if i == 0 else None)
for i, ct in enumerate(detected_collisions):
ax1.axvline(ct, color="red", linestyle="-", linewidth=1.2, alpha=0.7,
label="Detected collision" if i == 0 else None)
for p in pairs:
mid = (p["beep_mic_t"] + p["collision_mic_t"]) / 2
ax1.annotate(f'{p["interval_ms"]:.0f}ms', xy=(mid, 0),
ha="center", fontsize=9, color="purple", fontweight="bold",
bbox=dict(boxstyle="round,pad=0.2", facecolor="lightyellow", alpha=0.8))
ax1.legend(loc="upper right", fontsize=9)
# Robot trajectory
if robot_data.get("timestamps"):
robot_ts = np.array(robot_data["timestamps"])
left_pos = np.array(robot_data["left_present"])
right_pos = np.array(robot_data["right_present"])
# Note: robot timestamps start from MARK_START, which includes goto_duration
ax2.plot(robot_ts + robot_offset, left_pos, "b-", linewidth=1.5,
label="Left antenna (present)")
ax2.plot(robot_ts + robot_offset, right_pos, "r-", linewidth=1.5,
label="Right antenna (present)")
# Command times (approximate, via MARK_START)
for i, bt in enumerate(BEEP_TIMES):
# Beep times are relative to move start, not MARK_START
# play_move does a 1.0s goto first, so beeps start ~1s after MARK_START
mic_bt = robot_offset + 1.0 + bt # +1.0 for initial_goto_duration
ax2.axvline(mic_bt, color="blue", linestyle="--", linewidth=0.8, alpha=0.4,
label="Beep cmd (+goto)" if i == 0 else None)
for i, ct in enumerate(COLLISION_TIMES):
mic_ct = robot_offset + 1.0 + ct
ax2.axvline(mic_ct, color="red", linestyle="--", linewidth=0.8, alpha=0.4,
label="Collision cmd (+goto)" if i == 0 else None)
for bt in detected_beeps:
ax2.axvline(bt, color="blue", linestyle="-", linewidth=0.8, alpha=0.4)
for ct in detected_collisions:
ax2.axvline(ct, color="red", linestyle="-", linewidth=0.8, alpha=0.4)
ax2.set_xlabel("Time since mic start (s)")
ax2.set_ylabel("Position (rad)")
ax2.set_title("Robot Antenna Trajectory (aligned to mic clock)")
ax2.legend(loc="upper right", fontsize=9)
ax2.grid(True, alpha=0.3)
# Auto-zoom to active region
all_events = detected_beeps + detected_collisions
if all_events:
ax1.set_xlim(min(all_events) - 1.0, max(all_events) + 1.0)
fig.tight_layout()
fig.savefig(str(output_path), dpi=150)
plt.close(fig)
print(f" Plot saved to {output_path}")
def main():
parser = argparse.ArgumentParser(description="Move sync test via SDK play_move")
parser.add_argument("--host", default="reachy-mini.local")
args = parser.parse_args()
print(f"\n{'='*60}")
print("Move Sync Test (SDK play_move)")
print(f"{'='*60}")
print(f" Beep times: {BEEP_TIMES}")
print(f" Collision times: {COLLISION_TIMES}")
print(f" Expected interval: {BEEP_COLLISION_OFFSET:.1f}s\n")
# Step 1: Stop running apps
print("[1/6] Stopping any running app...")
subprocess.run(
["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}",
"curl -sf -X POST http://127.0.0.1:8000/api/apps/stop-current-app >/dev/null 2>&1 || true"],
capture_output=True, timeout=10,
)
time.sleep(1)
# Step 2: Generate move files
print("[2/6] Generating Marionette-format move...")
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
json_path, wav_path = generate_move_files(tmpdir)
# Embed beep/collision times in JSON for the robot script to pass back
move_data = json.loads(json_path.read_text())
move_data["_beep_times"] = BEEP_TIMES
move_data["_collision_times"] = COLLISION_TIMES
json_path.write_text(json.dumps(move_data))
# Step 3: SCP to robot
print("[3/6] Copying move files to robot...")
subprocess.run(
["ssh", "-o", "ConnectTimeout=5", f"{ROBOT_USER}@{args.host}",
f"mkdir -p {REMOTE_DIR}"],
capture_output=True, timeout=10,
)
scp_to_robot(json_path, f"{REMOTE_DIR}/sync-test.json", args.host)
scp_to_robot(wav_path, f"{REMOTE_DIR}/sync-test.wav", args.host)
print(" Done")
# Step 4: Start mic recording
print(f"[4/6] Starting mic recording ({MIC_DURATION}s)...")
mic_start = time.monotonic()
mic_data = sd.rec(
int(MIC_DURATION * LAPTOP_SR),
samplerate=LAPTOP_SR, channels=1, dtype="float32",
)
# Step 5: Start robot playback
time.sleep(0.3)
print("[5/6] Starting SDK play_move on robot...")
proc = start_robot(args.host)
mark_start = None
print("\n--- Robot output ---")
for line in iter(proc.stdout.readline, ""):
line = line.rstrip()
if not line:
continue
laptop_time = time.monotonic()
print(f" {line}")
if "MARK_START" in line:
mark_start = laptop_time
proc.wait()
print("--- End robot output ---")
sd.wait()
captured = mic_data.flatten()
print(f"\n Mic recording done")
if mark_start is None:
print("\nFAILED: Never received MARK_START")
return 1
robot_offset = mark_start - mic_start
print(f" MARK_START at mic_t={robot_offset:.3f}s")
mic_path = Path("tests/move_sync_mic.wav")
sf.write(str(mic_path), captured, LAPTOP_SR)
print(f" Saved mic to {mic_path}")
# Fetch robot position data
print("\n[6/6] Fetching robot data + analyzing...")
local_results = Path("tests/move_sync_positions.json")
scp_from_robot(REMOTE_RESULTS, local_results, args.host)
with open(local_results) as f:
robot_data = json.load(f)
# Detect beeps and collisions
detected_beeps = detect_beep_onsets(
captured, LAPTOP_SR, freq=BEEP_FREQ, bandwidth=150.0, threshold_db=-12.0,
min_separation=1.0,
)
detected_collisions = detect_transient_onsets(
captured, LAPTOP_SR, highpass_freq=3000.0,
)
print(f" Detected {len(detected_beeps)} beeps at: "
f"{[f'{t:.3f}' for t in detected_beeps]}")
print(f" Detected {len(detected_collisions)} collisions at: "
f"{[f'{t:.3f}' for t in detected_collisions]}")
# Match pairs
print(f"\n{'='*60}")
print("Beep → Collision Interval Analysis (SDK play_move)")
print(f" (Expected interval: {BEEP_COLLISION_OFFSET*1000:.0f}ms)")
print(f"{'='*60}")
pairs = []
for i, bt in enumerate(detected_beeps):
candidates = [ct for ct in detected_collisions if 0.3 < (ct - bt) < 2.0]
if not candidates:
print(f" Beep {i+1} at {bt:.3f}s: NO COLLISION FOUND")
continue
nearest = min(candidates, key=lambda ct: abs((ct - bt) - BEEP_COLLISION_OFFSET))
interval_ms = (nearest - bt) * 1000
error_ms = interval_ms - BEEP_COLLISION_OFFSET * 1000
pairs.append({
"beep_mic_t": bt,
"collision_mic_t": nearest,
"interval_ms": interval_ms,
"error_ms": error_ms,
})
print(f" Pair {len(pairs)}: beep {bt:.3f}s → collision {nearest:.3f}s = "
f"{interval_ms:.0f}ms (error {error_ms:+.0f}ms)")
if pairs:
errors = [p["error_ms"] for p in pairs]
intervals = [p["interval_ms"] for p in pairs]
print(f"\n Pairs matched: {len(pairs)}/{len(BEEP_TIMES)}")
print(f" Mean interval: {np.mean(intervals):.0f}ms (expected {BEEP_COLLISION_OFFSET*1000:.0f}ms)")
print(f" Mean error: {np.mean(errors):+.0f}ms")
print(f" Std error: {np.std(errors):.0f}ms")
print(f" Min/Max error: {min(errors):+.0f}ms / {max(errors):+.0f}ms")
plot_path = Path("tests/move_sync_plot.png")
plot_combined(
captured, LAPTOP_SR, mic_start, mark_start,
robot_data, detected_beeps, detected_collisions, pairs, plot_path,
)
success = len(pairs) >= len(BEEP_TIMES) - 1
print(f"\n{'='*60}")
if success:
print("RESULT: PASS — Beep-collision pairs detected via SDK play_move")
else:
print("RESULT: FAIL — Could not reliably detect pairs")
print(f"{'='*60}\n")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())