Spaces:
Running
Running
| import math | |
| import random | |
| import json | |
| import os | |
| import numpy as np | |
| import gradio as gr | |
| # ============================================================ | |
| # RFT Predator Space — First-Person Observer View (Pseudo-3D) | |
| # FIXES: | |
| # 1) NO flashing progression panel (do NOT update it on timer ticks) | |
| # 2) AutoRun works in first-person POV (autopilot moves current POV agent) | |
| # 3) queue() enabled for reliable timer updates on Spaces | |
| # ============================================================ | |
| # ----------------------------- | |
| # View config (render) | |
| # ----------------------------- | |
| VIEW_W, VIEW_H = 560, 360 | |
| RAY_W = 280 | |
| FOV_DEG = 78 | |
| MAX_DEPTH = 18 | |
| MOVE_STEP = 1 | |
| AUTO_TICK_HZ = 8 | |
| SKY = np.array([14, 16, 26], dtype=np.uint8) | |
| FLOOR_NEAR = np.array([20, 22, 34], dtype=np.uint8) | |
| FLOOR_FAR = np.array([10, 11, 18], dtype=np.uint8) | |
| WALL_BASE = np.array([210, 210, 225], dtype=np.uint8) | |
| WALL_SIDE = np.array([150, 150, 170], dtype=np.uint8) | |
| AGENT_OTHER_COLOR = np.array([255, 140, 90], dtype=np.uint8) # billboard for the "other" observer | |
| RETICLE = np.array([120, 190, 255], dtype=np.uint8) | |
| # 0=E,1=S,2=W,3=N | |
| DIRS = [(1,0),(0,1),(-1,0),(0,-1)] | |
| ORI_DEG = [0, 90, 180, 270] | |
| DIR_TO_ORI = {(1,0):0, (0,1):1, (-1,0):2, (0,-1):3} | |
| # ----------------------------- | |
| # Progression / unlocks | |
| # ----------------------------- | |
| MAP_UNLOCKS = [ | |
| ("Training Bay", 0), | |
| ("Arena+", 1), | |
| ("Corridor Maze", 3), | |
| ("Rooms", 6), | |
| ("Labyrinth", 10), | |
| ("Dense Field", 15), | |
| ] | |
| # ----------------------------- | |
| # Saves | |
| # ----------------------------- | |
| SAVE_DIR = "saves" | |
| os.makedirs(SAVE_DIR, exist_ok=True) | |
| def _slot_path(slot: str) -> str: | |
| slot = (slot or "slot1").strip().replace(" ", "_") | |
| if not slot: | |
| slot = "slot1" | |
| if not slot.lower().endswith(".json"): | |
| slot += ".json" | |
| return os.path.join(SAVE_DIR, slot) | |
| def list_save_slots(): | |
| try: | |
| files = [fn for fn in os.listdir(SAVE_DIR) if fn.lower().endswith(".json")] | |
| files.sort() | |
| return files | |
| except Exception: | |
| return [] | |
| # ----------------------------- | |
| # Utility | |
| # ----------------------------- | |
| def clamp(x, lo, hi): | |
| return lo if x < lo else hi if x > hi else x | |
| def angle_diff_rad(a, b): | |
| return (a - b + math.pi) % (2*math.pi) - math.pi | |
| def seeded_rng(seed: int): | |
| return random.Random(int(seed) & 0xFFFFFFFF) | |
| def neighbors4(x, y): | |
| return [(x+1,y),(x-1,y),(x,y+1),(x,y-1)] | |
| def bfs_reachable(grid, start): | |
| H, W = grid.shape | |
| sx, sy = start | |
| if grid[sy, sx] == 1: | |
| return set() | |
| q = [(sx, sy)] | |
| seen = set([(sx, sy)]) | |
| while q: | |
| x, y = q.pop(0) | |
| for nx, ny in neighbors4(x, y): | |
| if 0 <= nx < W and 0 <= ny < H and (nx, ny) not in seen and grid[ny, nx] == 0: | |
| seen.add((nx, ny)) | |
| q.append((nx, ny)) | |
| return seen | |
| def pick_spawn_pair(grid, rng, min_dist=8): | |
| H, W = grid.shape | |
| empties = [(x, y) for y in range(1, H-1) for x in range(1, W-1) if grid[y, x] == 0] | |
| rng.shuffle(empties) | |
| for pred in empties[:800]: | |
| reach = bfs_reachable(grid, pred) | |
| if len(reach) < 30: | |
| continue | |
| candidates = [p for p in reach if (p[0]-pred[0])**2 + (p[1]-pred[1])**2 >= min_dist*min_dist] | |
| if candidates: | |
| prey = rng.choice(candidates) | |
| return pred, prey | |
| pred = empties[0] if empties else (1, 1) | |
| prey = empties[-1] if len(empties) > 1 else (2, 2) | |
| return pred, prey | |
| def add_border_walls(grid): | |
| H, W = grid.shape | |
| grid[0, :] = 1 | |
| grid[H-1, :] = 1 | |
| grid[:, 0] = 1 | |
| grid[:, W-1] = 1 | |
| return grid | |
| def compute_unlocks(catches: int): | |
| unlocked = set() | |
| for name, need in MAP_UNLOCKS: | |
| if catches >= need: | |
| unlocked.add(name) | |
| return unlocked | |
| # ----------------------------- | |
| # Map generators (deterministic by seed) | |
| # ----------------------------- | |
| def map_training(seed, w=23, h=23): | |
| rng = seeded_rng(seed) | |
| grid = np.zeros((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| for y in range(2, h-2): | |
| for x in range(2, w-2): | |
| if rng.random() < 0.08: | |
| grid[y, x] = 1 | |
| return grid | |
| def map_arena_plus(seed, w=23, h=23): | |
| rng = seeded_rng(seed) | |
| grid = np.zeros((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| cx, cy = w//2, h//2 | |
| for y in range(1, h-1): | |
| for x in range(1, w-1): | |
| r2 = (x-cx)**2 + (y-cy)**2 | |
| if 36 <= r2 <= 44 and rng.random() < 0.85: | |
| grid[y, x] = 1 | |
| for _ in range(8): | |
| x = rng.randint(3, w-4) | |
| y = rng.randint(3, h-4) | |
| grid[y, x] = 1 | |
| return grid | |
| def map_corridor_maze(seed, w=23, h=23): | |
| rng = seeded_rng(seed) | |
| grid = np.ones((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| for y in range(1, h-1): | |
| for x in range(1, w-1): | |
| if x % 2 == 1 and y % 2 == 1: | |
| grid[y, x] = 0 | |
| start = (1, 1) | |
| stack = [start] | |
| visited = set([start]) | |
| def carve_between(a, b): | |
| ax, ay = a; bx, by = b | |
| mx, my = (ax+bx)//2, (ay+by)//2 | |
| grid[my, mx] = 0 | |
| while stack: | |
| x, y = stack[-1] | |
| dirs = [(2,0),(-2,0),(0,2),(0,-2)] | |
| rng.shuffle(dirs) | |
| moved = False | |
| for dx, dy in dirs: | |
| nx, ny = x+dx, y+dy | |
| if 1 <= nx < w-1 and 1 <= ny < h-1 and (nx, ny) not in visited: | |
| visited.add((nx, ny)) | |
| carve_between((x, y), (nx, ny)) | |
| stack.append((nx, ny)) | |
| moved = True | |
| break | |
| if not moved: | |
| stack.pop() | |
| grid[1,1] = 0 | |
| grid[1,2] = 0 | |
| grid[2,1] = 0 | |
| return grid | |
| def map_rooms(seed, w=25, h=25): | |
| rng = seeded_rng(seed) | |
| grid = np.ones((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| rooms = [] | |
| for _ in range(10): | |
| rw = rng.randint(4, 7) | |
| rh = rng.randint(4, 7) | |
| rx = rng.randint(1, w-rw-2) | |
| ry = rng.randint(1, h-rh-2) | |
| grid[ry:ry+rh, rx:rx+rw] = 0 | |
| rooms.append((rx, ry, rw, rh)) | |
| for i in range(len(rooms)-1): | |
| x1 = rooms[i][0] + rooms[i][2]//2 | |
| y1 = rooms[i][1] + rooms[i][3]//2 | |
| x2 = rooms[i+1][0] + rooms[i+1][2]//2 | |
| y2 = rooms[i+1][1] + rooms[i+1][3]//2 | |
| if rng.random() < 0.5: | |
| grid[y1, min(x1,x2):max(x1,x2)+1] = 0 | |
| grid[min(y1,y2):max(y1,y2)+1, x2] = 0 | |
| else: | |
| grid[min(y1,y2):max(y1,y2)+1, x1] = 0 | |
| grid[y2, min(x1,x2):max(x1,x2)+1] = 0 | |
| return grid | |
| def map_labyrinth(seed, w=31, h=23): | |
| rng = seeded_rng(seed) | |
| grid = np.zeros((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| for y in range(1, h-1): | |
| for x in range(1, w-1): | |
| if (x % 2 == 0 and rng.random() < 0.85) or (y % 3 == 0 and rng.random() < 0.55): | |
| grid[y, x] = 1 | |
| for x in range(1, w-1): | |
| grid[h//2, x] = 0 | |
| for x in range(3, w-3, 6): | |
| for y in range(2, h-2): | |
| if rng.random() < 0.75: | |
| grid[y, x] = 0 | |
| return grid | |
| def map_dense_field(seed, w=23, h=23): | |
| rng = seeded_rng(seed) | |
| grid = np.zeros((h, w), dtype=np.int8) | |
| add_border_walls(grid) | |
| for y in range(1, h-1): | |
| for x in range(1, w-1): | |
| if rng.random() < 0.22: | |
| grid[y, x] = 1 | |
| for _ in range(6): | |
| cx = rng.randint(3, w-4) | |
| cy = rng.randint(3, h-4) | |
| for yy in range(cy-2, cy+3): | |
| for xx in range(cx-2, cx+3): | |
| if 1 <= xx < w-1 and 1 <= yy < h-1: | |
| grid[yy, xx] = 0 | |
| return grid | |
| MAP_BUILDERS = { | |
| "Training Bay": map_training, | |
| "Arena+": map_arena_plus, | |
| "Corridor Maze": map_corridor_maze, | |
| "Rooms": map_rooms, | |
| "Labyrinth": map_labyrinth, | |
| "Dense Field": map_dense_field, | |
| } | |
| # ----------------------------- | |
| # State construction | |
| # ----------------------------- | |
| def build_state(seed, map_name, progress=None, override=None): | |
| rng = seeded_rng(seed) | |
| grid = MAP_BUILDERS[map_name](seed) | |
| pred, prey = pick_spawn_pair(grid, rng, min_dist=8) | |
| pred_ori = rng.randint(0, 3) | |
| prey_ori = (pred_ori + 2) % 4 | |
| if progress is None: | |
| progress = {"catches": 0, "unlocked": compute_unlocks(0)} | |
| st = { | |
| "seed": int(seed), | |
| "grid": grid, | |
| "pred": pred, | |
| "prey": prey, | |
| "ori": pred_ori, | |
| "prey_ori": prey_ori, | |
| "control": "pred", # "pred" or "prey" (view + manual inputs) | |
| "overlay": False, # coherence overlay | |
| "disturbance": 0.0, | |
| "last_impulse": 0.0, | |
| "step": 0, | |
| "caught": False, | |
| "auto_chase": False, | |
| "auto_run": False, | |
| "log": [f"Reset into map: {map_name}"], | |
| "map_name": map_name, | |
| "progress": progress, | |
| } | |
| if override: | |
| for k, v in override.items(): | |
| if k == "grid": | |
| continue | |
| st[k] = v | |
| return st | |
| # ----------------------------- | |
| # Save / Load helpers | |
| # ----------------------------- | |
| def serialize_state(st): | |
| catches = int(st["progress"]["catches"]) | |
| payload = { | |
| "version": 2, | |
| "seed": int(st["seed"]), | |
| "map_name": str(st["map_name"]), | |
| "step": int(st["step"]), | |
| "pred": [int(st["pred"][0]), int(st["pred"][1])], | |
| "prey": [int(st["prey"][0]), int(st["prey"][1])], | |
| "ori": int(st["ori"]), | |
| "prey_ori": int(st.get("prey_ori", 0)), | |
| "control": str(st.get("control", "pred")), | |
| "overlay": bool(st.get("overlay", False)), | |
| "disturbance": float(st.get("disturbance", 0.0)), | |
| "caught": bool(st["caught"]), | |
| "auto_chase": bool(st["auto_chase"]), | |
| "auto_run": bool(st["auto_run"]), | |
| "catches": catches, | |
| "log_tail": st["log"][-20:], | |
| } | |
| return payload | |
| def deserialize_state(payload): | |
| seed = int(payload.get("seed", 1)) | |
| map_name = str(payload.get("map_name", "Training Bay")) | |
| if map_name not in MAP_BUILDERS: | |
| map_name = "Training Bay" | |
| catches = int(payload.get("catches", 0)) | |
| progress = {"catches": catches, "unlocked": compute_unlocks(catches)} | |
| override = { | |
| "step": int(payload.get("step", 0)), | |
| "pred": tuple(payload.get("pred", [1, 1])), | |
| "prey": tuple(payload.get("prey", [2, 2])), | |
| "ori": int(payload.get("ori", 0)) % 4, | |
| "prey_ori": int(payload.get("prey_ori", 0)) % 4, | |
| "control": str(payload.get("control", "pred")) if str(payload.get("control", "pred")) in ("pred", "prey") else "pred", | |
| "overlay": bool(payload.get("overlay", False)), | |
| "disturbance": float(payload.get("disturbance", 0.0)), | |
| "last_impulse": 0.0, | |
| "caught": bool(payload.get("caught", False)), | |
| "auto_chase": bool(payload.get("auto_chase", False)), | |
| "auto_run": bool(payload.get("auto_run", False)), | |
| "log": (payload.get("log_tail", []) or [])[:], | |
| } | |
| st = build_state(seed, map_name, progress=progress, override=override) | |
| grid = st["grid"] | |
| H, W = grid.shape | |
| px, py = st["pred"] | |
| qx, qy = st["prey"] | |
| ok = ( | |
| 0 <= px < W and 0 <= py < H and 0 <= qx < W and 0 <= qy < H | |
| and grid[py, px] == 0 and grid[qy, qx] == 0 | |
| ) | |
| if not ok: | |
| rng = seeded_rng(seed + 777) | |
| st["pred"], st["prey"] = pick_spawn_pair(grid, rng, min_dist=8) | |
| st["log"].append("Loaded save had invalid positions; respawned safely.") | |
| st["log"].append("Loaded save.") | |
| return st | |
| def save_to_path(st, path): | |
| payload = serialize_state(st) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, indent=2) | |
| st["log"].append(f"Saved to: {path}") | |
| def load_from_path(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| payload = json.load(f) | |
| return deserialize_state(payload) | |
| # ----------------------------- | |
| # Perception + rendering | |
| # ----------------------------- | |
| def los_clear(grid, a, b): | |
| ax, ay = a[0] + 0.5, a[1] + 0.5 | |
| bx, by = b[0] + 0.5, b[1] + 0.5 | |
| dx, dy = bx - ax, by - ay | |
| dist = math.hypot(dx, dy) | |
| if dist < 1e-6: | |
| return True | |
| dx /= dist | |
| dy /= dist | |
| x, y = ax, ay | |
| steps = int(dist * 20) | |
| H, W = grid.shape | |
| for _ in range(steps): | |
| x += dx * (dist / steps) | |
| y += dy * (dist / steps) | |
| cx, cy = int(x), int(y) | |
| cx = clamp(cx, 0, W-1) | |
| cy = clamp(cy, 0, H-1) | |
| if grid[cy, cx] == 1: | |
| return False | |
| return True | |
| def dda_raycast(grid, px, py, ray_dx, ray_dy, max_depth=MAX_DEPTH): | |
| H, W = grid.shape | |
| map_x = int(px) | |
| map_y = int(py) | |
| delta_dist_x = abs(1.0 / ray_dx) if abs(ray_dx) > 1e-9 else 1e9 | |
| delta_dist_y = abs(1.0 / ray_dy) if abs(ray_dy) > 1e-9 else 1e9 | |
| if ray_dx < 0: | |
| step_x = -1 | |
| side_dist_x = (px - map_x) * delta_dist_x | |
| else: | |
| step_x = 1 | |
| side_dist_x = (map_x + 1.0 - px) * delta_dist_x | |
| if ray_dy < 0: | |
| step_y = -1 | |
| side_dist_y = (py - map_y) * delta_dist_y | |
| else: | |
| step_y = 1 | |
| side_dist_y = (map_y + 1.0 - py) * delta_dist_y | |
| hit = False | |
| side = 0 | |
| for _ in range(max_depth * 10): | |
| if side_dist_x < side_dist_y: | |
| side_dist_x += delta_dist_x | |
| map_x += step_x | |
| side = 0 | |
| else: | |
| side_dist_y += delta_dist_y | |
| map_y += step_y | |
| side = 1 | |
| if map_x < 0 or map_x >= W or map_y < 0 or map_y >= H: | |
| break | |
| if grid[map_y, map_x] == 1: | |
| hit = True | |
| break | |
| if not hit: | |
| return max_depth, 0, map_x, map_y | |
| if side == 0: | |
| denom = ray_dx if abs(ray_dx) > 1e-9 else 1e-9 | |
| perp = (map_x - px + (1 - step_x) / 2) / denom | |
| else: | |
| denom = ray_dy if abs(ray_dy) > 1e-9 else 1e-9 | |
| perp = (map_y - py + (1 - step_y) / 2) / denom | |
| perp = abs(perp) | |
| perp = clamp(perp, 0.0005, max_depth) | |
| return perp, side, map_x, map_y | |
| def _apply_coherence_overlay(img, disturbance: float): | |
| d = float(disturbance) | |
| if d <= 0.001: | |
| return img | |
| alpha = clamp(d * 0.06, 0.0, 0.22) # subtle | |
| h, w, _ = img.shape | |
| cx, cy = w // 2, h // 2 | |
| edge = int(min(w, h) * 0.08) | |
| if edge >= 2: | |
| tint = np.array([22, 8, 18], dtype=np.float32) | |
| img[:edge, :, :] = np.clip(img[:edge, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) | |
| img[h-edge:, :, :] = np.clip(img[h-edge:, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) | |
| img[:, :edge, :] = np.clip(img[:, :edge, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) | |
| img[:, w-edge:, :] = np.clip(img[:, w-edge:, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8) | |
| line_col = np.array([180, 70, 160], dtype=np.float32) | |
| for i in range(-40, 41): | |
| x = cx + i | |
| y = cy + int(i * 0.35) | |
| if 0 <= x < w and 0 <= y < h: | |
| img[y:y+1, x:x+1, :] = np.clip(img[y:y+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8) | |
| y2 = cy - int(i * 0.35) | |
| if 0 <= x < w and 0 <= y2 < h: | |
| img[y2:y2+1, x:x+1, :] = np.clip(img[y2:y2+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8) | |
| return img | |
| def render_first_person(st): | |
| grid = st["grid"] | |
| if st["control"] == "prey": | |
| view_cell = st["prey"] | |
| view_ori = st["prey_ori"] | |
| other_cell = st["pred"] | |
| else: | |
| view_cell = st["pred"] | |
| view_ori = st["ori"] | |
| other_cell = st["prey"] | |
| (cx, cy) = view_cell | |
| px = cx + 0.5 | |
| py = cy + 0.5 | |
| fov = math.radians(FOV_DEG) | |
| base = math.radians(ORI_DEG[view_ori]) | |
| img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8) | |
| img[:VIEW_H//2, :, :] = SKY | |
| for y in range(VIEW_H//2, VIEW_H): | |
| t = (y - VIEW_H//2) / max(1, (VIEW_H//2 - 1)) | |
| col = (FLOOR_NEAR * (1 - t) + FLOOR_FAR * t).astype(np.uint8) | |
| img[y, :, :] = col | |
| wall_dists = np.full(RAY_W, MAX_DEPTH, dtype=np.float32) | |
| for x in range(RAY_W): | |
| u = (x / (RAY_W - 1)) if RAY_W > 1 else 0.5 | |
| ang = base + (u - 0.5) * fov | |
| ray_dx = math.cos(ang) | |
| ray_dy = math.sin(ang) | |
| dist, side, hitx, hity = dda_raycast(grid, px, py, ray_dx, ray_dy, MAX_DEPTH) | |
| dist *= math.cos(ang - base) | |
| dist = clamp(dist, 0.001, MAX_DEPTH) | |
| wall_dists[x] = dist | |
| slice_h = int((VIEW_H * 0.92) / dist) | |
| slice_h = clamp(slice_h, 1, VIEW_H) | |
| top = (VIEW_H - slice_h) // 2 | |
| bot = top + slice_h | |
| shade = 1.0 / (1.0 + dist * 0.12) | |
| shade = clamp(shade, 0.12, 1.0) | |
| base_col = WALL_SIDE if side == 1 else WALL_BASE | |
| checker = ((hitx + hity) & 1) | |
| tex = 0.90 if checker == 0 else 1.05 | |
| col = np.clip(base_col.astype(np.float32) * shade * tex, 0, 255).astype(np.uint8) | |
| x0 = int(x * VIEW_W / RAY_W) | |
| x1 = int((x + 1) * VIEW_W / RAY_W) | |
| if x1 <= x0: | |
| x1 = x0 + 1 | |
| img[top:bot, x0:x1, :] = col | |
| other_vis = False | |
| if not st["caught"] and los_clear(grid, view_cell, other_cell): | |
| vx = (other_cell[0] + 0.5) - px | |
| vy = (other_cell[1] + 0.5) - py | |
| other_dist = math.hypot(vx, vy) | |
| other_ang = math.atan2(vy, vx) | |
| rel = angle_diff_rad(other_ang, base) | |
| if abs(rel) <= fov * 0.5 and other_dist < MAX_DEPTH: | |
| other_vis = True | |
| u = (rel / fov) + 0.5 | |
| sx_ray = int(u * (RAY_W - 1)) | |
| sx_ray = clamp(sx_ray, 0, RAY_W - 1) | |
| sprite_h = int((VIEW_H * 0.75) / max(0.2, other_dist)) | |
| sprite_w = int(sprite_h * 0.45) | |
| sprite_h = clamp(sprite_h, 8, VIEW_H) | |
| sprite_w = clamp(sprite_w, 6, VIEW_W) | |
| sx = int(sx_ray * VIEW_W / RAY_W) | |
| sy = VIEW_H // 2 | |
| x0 = clamp(sx - sprite_w // 2, 0, VIEW_W - 1) | |
| x1 = clamp(sx + sprite_w // 2, 0, VIEW_W - 1) | |
| y0 = clamp(sy - sprite_h // 2, 0, VIEW_H - 1) | |
| y1 = clamp(sy + sprite_h // 2, 0, VIEW_H - 1) | |
| for vxcol in range(x0, x1): | |
| rx = int(vxcol * RAY_W / VIEW_W) | |
| rx = clamp(rx, 0, RAY_W - 1) | |
| if other_dist < wall_dists[rx]: | |
| img[y0:y1, vxcol:vxcol+1, :] = AGENT_OTHER_COLOR | |
| cxh, cyh = VIEW_W // 2, VIEW_H // 2 | |
| img[cyh-1:cyh+2, cxh-12:cxh+13, :] = RETICLE | |
| img[cyh-12:cyh+13, cxh-1:cxh+2, :] = RETICLE | |
| hud_h = 26 | |
| img[:hud_h, :, :] = np.clip(img[:hud_h, :, :].astype(np.int16) + 20, 0, 255).astype(np.uint8) | |
| def dot(x, y, c): | |
| img[y:y+6, x:x+6, :] = c | |
| dot(8, 10, np.array([90, 255, 140], np.uint8) if st["auto_chase"] else np.array([60, 60, 70], np.uint8)) | |
| dot(20, 10, np.array([120, 190, 255], np.uint8) if st["auto_run"] else np.array([60, 60, 70], np.uint8)) | |
| dot(32, 10, np.array([255, 140, 90], np.uint8) if other_vis else np.array([60, 60, 70], np.uint8)) | |
| if st.get("overlay", False): | |
| img = _apply_coherence_overlay(img, st.get("disturbance", 0.0)) | |
| return img | |
| def render_minimap(st, scale=14): | |
| grid = st["grid"] | |
| H, W = grid.shape | |
| img = np.zeros((H*scale, W*scale, 3), dtype=np.uint8) | |
| img[:, :, :] = np.array([18, 20, 32], dtype=np.uint8) | |
| wall = np.array([220, 220, 235], dtype=np.uint8) | |
| for y in range(H): | |
| for x in range(W): | |
| if grid[y, x] == 1: | |
| img[y*scale:(y+1)*scale, x*scale:(x+1)*scale, :] = wall | |
| px, py = st["pred"] | |
| qx, qy = st["prey"] | |
| pred_col = np.array([120, 190, 255], np.uint8) | |
| prey_col = np.array([255, 140, 90], np.uint8) | |
| img[py*scale:(py+1)*scale, px*scale:(px+1)*scale, :] = pred_col | |
| img[qy*scale:(qy+1)*scale, qx*scale:(qx+1)*scale, :] = prey_col | |
| dx, dy = DIRS[st["ori"]] | |
| hx, hy = px + dx, py + dy | |
| if 0 <= hx < W and 0 <= hy < H: | |
| img[hy*scale:(hy+1)*scale, hx*scale:(hx+1)*scale, :] = np.array([80, 255, 160], np.uint8) | |
| dx2, dy2 = DIRS[st["prey_ori"]] | |
| hx2, hy2 = qx + dx2, qy + dy2 | |
| if 0 <= hx2 < W and 0 <= hy2 < H: | |
| img[hy2*scale:(hy2+1)*scale, hx2*scale:(hx2+1)*scale, :] = np.array([255, 220, 120], np.uint8) | |
| if st["control"] == "pred": | |
| x0, y0 = px*scale, py*scale | |
| else: | |
| x0, y0 = qx*scale, qy*scale | |
| ring = np.array([240, 240, 140], np.uint8) | |
| img[y0:y0+scale, x0:x0+2, :] = ring | |
| img[y0:y0+scale, x0+scale-2:x0+scale, :] = ring | |
| img[y0:y0+2, x0:x0+scale, :] = ring | |
| img[y0+scale-2:y0+scale, x0:x0+scale, :] = ring | |
| return img | |
| def unlock_summary(st): | |
| catches = st["progress"]["catches"] | |
| unlocked = st["progress"]["unlocked"] | |
| lines = [] | |
| for name, need in MAP_UNLOCKS: | |
| if name in unlocked: | |
| lines.append(f"✅ {name} (unlocked)") | |
| else: | |
| lines.append(f"🔒 {name} (needs {need} catches)") | |
| return "### Map progression\n" + "\n".join(lines) + f"\n\n**Total catches:** {catches}" | |
| def status(st): | |
| pred_ori_txt = ["E", "S", "W", "N"][st["ori"]] | |
| prey_ori_txt = ["E", "S", "W", "N"][st["prey_ori"]] | |
| tail = st["log"][-10:] | |
| catches = st["progress"]["catches"] | |
| current = st["map_name"] | |
| mode = "Manual" | |
| if st["auto_run"] and st["auto_chase"]: | |
| mode = "AutoRun+AutoChase" | |
| elif st["auto_run"] and not st["auto_chase"]: | |
| mode = "Hybrid AutoRun (wander)" | |
| ctrl = "Predator" if st["control"] == "pred" else "Prey" | |
| coh = st.get("disturbance", 0.0) | |
| return ( | |
| f"Map: {current} | Catches: {catches} | Step: {st['step']} | Mode: {mode} | Control: {ctrl} | Overlay: {st.get('overlay', False)}\n" | |
| f"Predator: {st['pred']} {pred_ori_txt} | Prey: {st['prey']} {prey_ori_txt} | " | |
| f"AutoChase: {st['auto_chase']} | AutoRun: {st['auto_run']} | Caught: {st['caught']} | Coherence: {coh:.2f}\n\n" | |
| + "\n".join(tail) | |
| ) | |
| # ----------------------------- | |
| # Actions (manual + autonomous) | |
| # ----------------------------- | |
| def _add_impulse(st, x): | |
| st["last_impulse"] = float(st.get("last_impulse", 0.0)) + float(x) | |
| def _step_disturbance(st): | |
| d = float(st.get("disturbance", 0.0)) | |
| imp = float(st.get("last_impulse", 0.0)) | |
| st["disturbance"] = 0.92 * d + imp | |
| st["last_impulse"] = 0.0 | |
| def _agent_pos_ori(st, who): | |
| if who == "prey": | |
| return st["prey"], st["prey_ori"] | |
| return st["pred"], st["ori"] | |
| def _set_agent_pos_ori(st, who, pos=None, ori=None): | |
| if who == "prey": | |
| if pos is not None: st["prey"] = pos | |
| if ori is not None: st["prey_ori"] = int(ori) % 4 | |
| else: | |
| if pos is not None: st["pred"] = pos | |
| if ori is not None: st["ori"] = int(ori) % 4 | |
| def _turn(st, who, direction): | |
| if st["caught"]: | |
| return | |
| _, ori = _agent_pos_ori(st, who) | |
| ori = (ori + direction) % 4 | |
| _set_agent_pos_ori(st, who, ori=ori) | |
| _add_impulse(st, 0.9) | |
| def _forward(st, who): | |
| if st["caught"]: | |
| return | |
| (x, y), ori = _agent_pos_ori(st, who) | |
| dx, dy = DIRS[ori] | |
| nx, ny = x + dx * MOVE_STEP, y + dy * MOVE_STEP | |
| if st["grid"][ny, nx] == 0: | |
| _set_agent_pos_ori(st, who, pos=(nx, ny)) | |
| _add_impulse(st, 0.25) | |
| else: | |
| st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} bumped wall.") | |
| _add_impulse(st, 0.7) | |
| def _check_catch_and_unlock(st): | |
| if st["pred"] == st["prey"] and not st["caught"]: | |
| st["caught"] = True | |
| st["log"].append("CAUGHT the prey.") | |
| st["progress"]["catches"] += 1 | |
| st["progress"]["unlocked"] = compute_unlocks(st["progress"]["catches"]) | |
| st["log"].append(f"Catches now {st['progress']['catches']}. Unlocks updated.") | |
| _add_impulse(st, 1.2) | |
| return True | |
| return False | |
| def prey_flee_step(st): | |
| if st["caught"]: | |
| return | |
| rng = seeded_rng(st["seed"] + 1337 + st["step"] * 19) | |
| px, py = st["prey"] | |
| ax, ay = st["pred"] | |
| options = [(0,0),(1,0),(-1,0),(0,1),(0,-1)] | |
| scored = [] | |
| for dx, dy in options: | |
| nx, ny = px + dx, py + dy | |
| if st["grid"][ny, nx] == 1: | |
| continue | |
| dist = (nx-ax)**2 + (ny-ay)**2 | |
| scored.append((dist + rng.random()*0.1, (nx, ny), (dx, dy))) | |
| if scored: | |
| scored.sort(reverse=True) | |
| pick = scored[0] if rng.random() < 0.78 else rng.choice(scored) | |
| _, (nx, ny), (dx, dy) = pick | |
| st["prey"] = (nx, ny) | |
| if (dx, dy) in DIR_TO_ORI and (dx, dy) != (0,0): | |
| st["prey_ori"] = DIR_TO_ORI[(dx, dy)] | |
| def predator_wander_step(st): | |
| if st["caught"]: | |
| return | |
| rng = seeded_rng(st["seed"] + 4242 + st["step"] * 23) | |
| (x, y) = st["pred"] | |
| ori = st["ori"] | |
| dx, dy = DIRS[ori] | |
| front_blocked = (st["grid"][y+dy, x+dx] == 1) | |
| r = rng.random() | |
| if front_blocked: | |
| if r < 0.5: | |
| _turn(st, "pred", -1); st["log"].append("AutoWander: avoid left.") | |
| else: | |
| _turn(st, "pred", +1); st["log"].append("AutoWander: avoid right.") | |
| else: | |
| if r < 0.72: | |
| _forward(st, "pred"); st["log"].append("AutoWander: forward.") | |
| elif r < 0.86: | |
| _turn(st, "pred", -1); st["log"].append("AutoWander: turn left.") | |
| else: | |
| _turn(st, "pred", +1); st["log"].append("AutoWander: turn right.") | |
| def predator_chase_step(st): | |
| if st["caught"]: | |
| return | |
| grid = st["grid"] | |
| px = st["pred"][0] + 0.5 | |
| py = st["pred"][1] + 0.5 | |
| base = math.radians(ORI_DEG[st["ori"]]) | |
| fov = math.radians(FOV_DEG) | |
| prey = st["prey"] | |
| if los_clear(grid, st["pred"], prey): | |
| vx = (prey[0] + 0.5) - px | |
| vy = (prey[1] + 0.5) - py | |
| ang = math.atan2(vy, vx) | |
| rel = angle_diff_rad(ang, base) | |
| if abs(rel) <= fov * 0.5: | |
| if rel < -0.10: | |
| _turn(st, "pred", -1); st["log"].append("AutoChase: turn left.") | |
| elif rel > 0.10: | |
| _turn(st, "pred", +1); st["log"].append("AutoChase: turn right.") | |
| else: | |
| _forward(st, "pred"); st["log"].append("AutoChase: forward.") | |
| return | |
| predator_wander_step(st) | |
| def prey_autopilot_step(st): | |
| # if user is viewing prey and AutoRun is on, prey should still behave like prey (flee) | |
| prey_flee_step(st) | |
| st["log"].append("AutoPrey: flee.") | |
| def tick(st): | |
| if st["caught"]: | |
| return False # unlock did not change | |
| st["step"] += 1 | |
| unlock_changed = False | |
| # AutoRun = autopilot for the currently viewed observer | |
| if st["auto_run"]: | |
| if st["control"] == "pred": | |
| if st["auto_chase"]: | |
| predator_chase_step(st) | |
| else: | |
| predator_wander_step(st) | |
| else: | |
| prey_autopilot_step(st) | |
| # The non-controlled agent still runs its own policy each step | |
| if st["control"] != "pred": | |
| if st["auto_chase"]: | |
| predator_chase_step(st) | |
| else: | |
| predator_wander_step(st) | |
| if st["control"] != "prey": | |
| prey_flee_step(st) | |
| # capture + unlock | |
| unlock_changed = _check_catch_and_unlock(st) | |
| _step_disturbance(st) | |
| if st["step"] >= 600: | |
| st["caught"] = True | |
| st["log"].append("Max steps reached (freeze).") | |
| return unlock_changed | |
| # ----------------------------- | |
| # Gradio handlers | |
| # ----------------------------- | |
| def ui_refresh_slots(current_value=None): | |
| choices = list_save_slots() | |
| if current_value and current_value in choices: | |
| value = current_value | |
| else: | |
| value = choices[0] if choices else "slot1.json" | |
| return gr.Dropdown(choices=choices if choices else ["slot1.json"], value=value) | |
| def ui_reset(seed, map_choice, st=None): | |
| seed = int(seed) | |
| progress = st["progress"] if st else {"catches": 0, "unlocked": compute_unlocks(0)} | |
| if map_choice not in progress["unlocked"]: | |
| map_choice = "Training Bay" | |
| new_st = build_state(seed, map_choice, progress=progress) | |
| if st: | |
| new_st["control"] = st.get("control", "pred") | |
| new_st["overlay"] = st.get("overlay", False) | |
| return new_st, render_first_person(new_st), render_minimap(new_st), status(new_st), unlock_summary(new_st) | |
| def ui_toggle_control(st): | |
| st["control"] = "prey" if st["control"] == "pred" else "pred" | |
| st["log"].append(f"Control switched to: {'Prey' if st['control']=='prey' else 'Predator'}.") | |
| _add_impulse(st, 0.15) | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| def ui_turn_left(st): | |
| who = st["control"] | |
| _turn(st, who, -1) | |
| st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn left.") | |
| unlock_changed = tick(st) | |
| if unlock_changed: | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() # avoid re-render if unchanged | |
| def ui_turn_right(st): | |
| who = st["control"] | |
| _turn(st, who, +1) | |
| st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn right.") | |
| unlock_changed = tick(st) | |
| if unlock_changed: | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_forward(st): | |
| who = st["control"] | |
| _forward(st, who) | |
| st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} forward.") | |
| unlock_changed = tick(st) | |
| if unlock_changed: | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_toggle_chase(st): | |
| st["auto_chase"] = not st["auto_chase"] | |
| st["log"].append(f"AutoChase set to {st['auto_chase']}.") | |
| _add_impulse(st, 0.10) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_toggle_run(st): | |
| st["auto_run"] = not st["auto_run"] | |
| st["log"].append(f"AutoRun set to {st['auto_run']}.") | |
| _add_impulse(st, 0.10) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_toggle_overlay(st): | |
| st["overlay"] = not st.get("overlay", False) | |
| st["log"].append(f"Overlay set to {st['overlay']}.") | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_tick(st): | |
| unlock_changed = tick(st) | |
| if unlock_changed: | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| def ui_timer(st): | |
| # IMPORTANT: do NOT update unlock markdown here (prevents flashing) | |
| if st["auto_run"] and not st["caught"]: | |
| _ = tick(st) | |
| return st, render_first_person(st), render_minimap(st), status(st) | |
| def ui_swap_roles(st): | |
| if st["caught"]: | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| st["pred"], st["prey"] = st["prey"], st["pred"] | |
| st["ori"], st["prey_ori"] = st["prey_ori"], st["ori"] | |
| st["log"].append("Swapped roles (Predator ⇄ Prey).") | |
| _add_impulse(st, 0.35) | |
| changed = _check_catch_and_unlock(st) | |
| if changed: | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update() | |
| # ---- Save/load UI handlers ---- | |
| def ui_save_slot(st, slot_name): | |
| try: | |
| path = _slot_path(slot_name) | |
| save_to_path(st, path) | |
| export_path = path | |
| except Exception as e: | |
| st["log"].append(f"Save failed: {e}") | |
| export_path = None | |
| dd = ui_refresh_slots(os.path.basename(export_path) if export_path else None) | |
| return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st), export_path, dd | |
| def ui_load_slot(st, selected_slot): | |
| path = os.path.join(SAVE_DIR, selected_slot) if selected_slot else _slot_path("slot1") | |
| try: | |
| if not os.path.exists(path): | |
| st["log"].append(f"No save found at {path}") | |
| dd = ui_refresh_slots(selected_slot) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd | |
| loaded = load_from_path(path) | |
| dd = ui_refresh_slots(os.path.basename(path)) | |
| return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd | |
| except Exception as e: | |
| st["log"].append(f"Load failed: {e}") | |
| dd = ui_refresh_slots(selected_slot) | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd | |
| def ui_import_save(st, uploaded_file): | |
| try: | |
| if uploaded_file is None: | |
| st["log"].append("Import: no file provided.") | |
| dd = ui_refresh_slots() | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd | |
| loaded = load_from_path(uploaded_file) | |
| dd = ui_refresh_slots() | |
| return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd | |
| except Exception as e: | |
| st["log"].append(f"Import failed: {e}") | |
| dd = ui_refresh_slots() | |
| return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd | |
| # ----------------------------- | |
| # App | |
| # ----------------------------- | |
| all_map_names = [name for name, _ in MAP_UNLOCKS] | |
| initial_progress = {"catches": 0, "unlocked": compute_unlocks(0)} | |
| initial_state = build_state(seed=1, map_name="Training Bay", progress=initial_progress) | |
| initial_slots = list_save_slots() | |
| initial_slot_value = initial_slots[0] if initial_slots else "slot1.json" | |
| with gr.Blocks(title="RFT Predator Space — Symmetric Observers") as demo: | |
| gr.Markdown( | |
| "## Experience reality through an RFT observer agent’s perspective\n" | |
| "**Accessibility note:** the progression panel is now event-driven (no flashing).\n\n" | |
| "**AutoRun:** moves the current POV observer (first-person autopilot)." | |
| ) | |
| st = gr.State(initial_state) | |
| with gr.Row(): | |
| seed = gr.Number(label="Seed", value=1, precision=0) | |
| map_choice = gr.Dropdown(label="Map (locked unless unlocked)", choices=all_map_names, value="Training Bay") | |
| btn_reset = gr.Button("Reset") | |
| btn_control = gr.Button("Toggle Control (Pred ↔ Prey)") | |
| btn_tick = gr.Button("Tick") | |
| with gr.Row(): | |
| btn_left = gr.Button("Turn Left") | |
| btn_fwd = gr.Button("Forward") | |
| btn_right = gr.Button("Turn Right") | |
| with gr.Row(): | |
| btn_chase = gr.Button("Toggle AutoChase") | |
| btn_run = gr.Button("Toggle AutoRun") | |
| btn_overlay = gr.Button("Toggle Overlay (optional)") | |
| btn_swap = gr.Button("Swap Roles (Pred ⇄ Prey)") | |
| with gr.Row(): | |
| view = gr.Image(label="First-person observer view", type="numpy") | |
| mini = gr.Image(label="Minimap (debug)", type="numpy") | |
| with gr.Row(): | |
| info = gr.Textbox(label="Run log", lines=12) | |
| unlocks = gr.Markdown(value=unlock_summary(initial_state)) | |
| gr.Markdown("### Save / Load") | |
| with gr.Row(): | |
| slot_pick = gr.Dropdown(label="Existing saves", choices=initial_slots if initial_slots else ["slot1.json"], value=initial_slot_value) | |
| slot_name = gr.Textbox(label="New save name (optional)", value="slot1") | |
| with gr.Row(): | |
| btn_refresh = gr.Button("Refresh Saves List") | |
| btn_save = gr.Button("Save (to name)") | |
| btn_load = gr.Button("Load (selected)") | |
| with gr.Row(): | |
| export_file = gr.File(label="Exported Save File (download this)", interactive=False) | |
| import_file = gr.File(label="Import Save File (upload)", interactive=True) | |
| btn_import = gr.Button("Import (Load Uploaded File)") | |
| demo.load( | |
| lambda: (st.value, render_first_person(st.value), render_minimap(st.value), status(st.value), unlock_summary(st.value), None, ui_refresh_slots(initial_slot_value)), | |
| outputs=[st, view, mini, info, unlocks, export_file, slot_pick] | |
| ) | |
| btn_reset.click(ui_reset, inputs=[seed, map_choice, st], outputs=[st, view, mini, info, unlocks]) | |
| btn_control.click(ui_toggle_control, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_left.click(ui_turn_left, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_right.click(ui_turn_right, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_fwd.click(ui_forward, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_chase.click(ui_toggle_chase, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_run.click(ui_toggle_run, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_overlay.click(ui_toggle_overlay, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_swap.click(ui_swap_roles, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_tick.click(ui_tick, inputs=[st], outputs=[st, view, mini, info, unlocks]) | |
| btn_refresh.click(lambda cur: ui_refresh_slots(cur), inputs=[slot_pick], outputs=[slot_pick]) | |
| btn_save.click( | |
| lambda st_, name_, pick_: ui_save_slot(st_, name_ if (name_ and name_.strip()) else pick_), | |
| inputs=[st, slot_name, slot_pick], | |
| outputs=[st, view, mini, info, unlocks, export_file, slot_pick] | |
| ) | |
| btn_load.click( | |
| ui_load_slot, | |
| inputs=[st, slot_pick], | |
| outputs=[st, view, mini, info, unlocks, export_file, slot_pick] | |
| ) | |
| btn_import.click( | |
| ui_import_save, | |
| inputs=[st, import_file], | |
| outputs=[st, view, mini, info, unlocks, export_file, slot_pick] | |
| ) | |
| # Timer outputs DO NOT include unlocks (prevents flashing) | |
| if hasattr(gr, "Timer"): | |
| gr.Timer(1.0 / AUTO_TICK_HZ).tick( | |
| ui_timer, | |
| inputs=[st], | |
| outputs=[st, view, mini, info] | |
| ) | |
| # queue() helps Timer behave reliably in Spaces | |
| demo.queue().launch() |