RFTSystems's picture
Update app.py
acdfbb5 verified
import math
import random
import json
import os
import numpy as np
import gradio as gr
# ============================================================
# RFT Predator Space — First-Person Observer View (Pseudo-3D)
# FIXES:
# 1) NO flashing progression panel (do NOT update it on timer ticks)
# 2) AutoRun works in first-person POV (autopilot moves current POV agent)
# 3) queue() enabled for reliable timer updates on Spaces
# ============================================================
# -----------------------------
# View config (render)
# -----------------------------
VIEW_W, VIEW_H = 560, 360
RAY_W = 280
FOV_DEG = 78
MAX_DEPTH = 18
MOVE_STEP = 1
AUTO_TICK_HZ = 8
SKY = np.array([14, 16, 26], dtype=np.uint8)
FLOOR_NEAR = np.array([20, 22, 34], dtype=np.uint8)
FLOOR_FAR = np.array([10, 11, 18], dtype=np.uint8)
WALL_BASE = np.array([210, 210, 225], dtype=np.uint8)
WALL_SIDE = np.array([150, 150, 170], dtype=np.uint8)
AGENT_OTHER_COLOR = np.array([255, 140, 90], dtype=np.uint8) # billboard for the "other" observer
RETICLE = np.array([120, 190, 255], dtype=np.uint8)
# 0=E,1=S,2=W,3=N
DIRS = [(1,0),(0,1),(-1,0),(0,-1)]
ORI_DEG = [0, 90, 180, 270]
DIR_TO_ORI = {(1,0):0, (0,1):1, (-1,0):2, (0,-1):3}
# -----------------------------
# Progression / unlocks
# -----------------------------
MAP_UNLOCKS = [
("Training Bay", 0),
("Arena+", 1),
("Corridor Maze", 3),
("Rooms", 6),
("Labyrinth", 10),
("Dense Field", 15),
]
# -----------------------------
# Saves
# -----------------------------
SAVE_DIR = "saves"
os.makedirs(SAVE_DIR, exist_ok=True)
def _slot_path(slot: str) -> str:
slot = (slot or "slot1").strip().replace(" ", "_")
if not slot:
slot = "slot1"
if not slot.lower().endswith(".json"):
slot += ".json"
return os.path.join(SAVE_DIR, slot)
def list_save_slots():
try:
files = [fn for fn in os.listdir(SAVE_DIR) if fn.lower().endswith(".json")]
files.sort()
return files
except Exception:
return []
# -----------------------------
# Utility
# -----------------------------
def clamp(x, lo, hi):
return lo if x < lo else hi if x > hi else x
def angle_diff_rad(a, b):
return (a - b + math.pi) % (2*math.pi) - math.pi
def seeded_rng(seed: int):
return random.Random(int(seed) & 0xFFFFFFFF)
def neighbors4(x, y):
return [(x+1,y),(x-1,y),(x,y+1),(x,y-1)]
def bfs_reachable(grid, start):
H, W = grid.shape
sx, sy = start
if grid[sy, sx] == 1:
return set()
q = [(sx, sy)]
seen = set([(sx, sy)])
while q:
x, y = q.pop(0)
for nx, ny in neighbors4(x, y):
if 0 <= nx < W and 0 <= ny < H and (nx, ny) not in seen and grid[ny, nx] == 0:
seen.add((nx, ny))
q.append((nx, ny))
return seen
def pick_spawn_pair(grid, rng, min_dist=8):
H, W = grid.shape
empties = [(x, y) for y in range(1, H-1) for x in range(1, W-1) if grid[y, x] == 0]
rng.shuffle(empties)
for pred in empties[:800]:
reach = bfs_reachable(grid, pred)
if len(reach) < 30:
continue
candidates = [p for p in reach if (p[0]-pred[0])**2 + (p[1]-pred[1])**2 >= min_dist*min_dist]
if candidates:
prey = rng.choice(candidates)
return pred, prey
pred = empties[0] if empties else (1, 1)
prey = empties[-1] if len(empties) > 1 else (2, 2)
return pred, prey
def add_border_walls(grid):
H, W = grid.shape
grid[0, :] = 1
grid[H-1, :] = 1
grid[:, 0] = 1
grid[:, W-1] = 1
return grid
def compute_unlocks(catches: int):
unlocked = set()
for name, need in MAP_UNLOCKS:
if catches >= need:
unlocked.add(name)
return unlocked
# -----------------------------
# Map generators (deterministic by seed)
# -----------------------------
def map_training(seed, w=23, h=23):
rng = seeded_rng(seed)
grid = np.zeros((h, w), dtype=np.int8)
add_border_walls(grid)
for y in range(2, h-2):
for x in range(2, w-2):
if rng.random() < 0.08:
grid[y, x] = 1
return grid
def map_arena_plus(seed, w=23, h=23):
rng = seeded_rng(seed)
grid = np.zeros((h, w), dtype=np.int8)
add_border_walls(grid)
cx, cy = w//2, h//2
for y in range(1, h-1):
for x in range(1, w-1):
r2 = (x-cx)**2 + (y-cy)**2
if 36 <= r2 <= 44 and rng.random() < 0.85:
grid[y, x] = 1
for _ in range(8):
x = rng.randint(3, w-4)
y = rng.randint(3, h-4)
grid[y, x] = 1
return grid
def map_corridor_maze(seed, w=23, h=23):
rng = seeded_rng(seed)
grid = np.ones((h, w), dtype=np.int8)
add_border_walls(grid)
for y in range(1, h-1):
for x in range(1, w-1):
if x % 2 == 1 and y % 2 == 1:
grid[y, x] = 0
start = (1, 1)
stack = [start]
visited = set([start])
def carve_between(a, b):
ax, ay = a; bx, by = b
mx, my = (ax+bx)//2, (ay+by)//2
grid[my, mx] = 0
while stack:
x, y = stack[-1]
dirs = [(2,0),(-2,0),(0,2),(0,-2)]
rng.shuffle(dirs)
moved = False
for dx, dy in dirs:
nx, ny = x+dx, y+dy
if 1 <= nx < w-1 and 1 <= ny < h-1 and (nx, ny) not in visited:
visited.add((nx, ny))
carve_between((x, y), (nx, ny))
stack.append((nx, ny))
moved = True
break
if not moved:
stack.pop()
grid[1,1] = 0
grid[1,2] = 0
grid[2,1] = 0
return grid
def map_rooms(seed, w=25, h=25):
rng = seeded_rng(seed)
grid = np.ones((h, w), dtype=np.int8)
add_border_walls(grid)
rooms = []
for _ in range(10):
rw = rng.randint(4, 7)
rh = rng.randint(4, 7)
rx = rng.randint(1, w-rw-2)
ry = rng.randint(1, h-rh-2)
grid[ry:ry+rh, rx:rx+rw] = 0
rooms.append((rx, ry, rw, rh))
for i in range(len(rooms)-1):
x1 = rooms[i][0] + rooms[i][2]//2
y1 = rooms[i][1] + rooms[i][3]//2
x2 = rooms[i+1][0] + rooms[i+1][2]//2
y2 = rooms[i+1][1] + rooms[i+1][3]//2
if rng.random() < 0.5:
grid[y1, min(x1,x2):max(x1,x2)+1] = 0
grid[min(y1,y2):max(y1,y2)+1, x2] = 0
else:
grid[min(y1,y2):max(y1,y2)+1, x1] = 0
grid[y2, min(x1,x2):max(x1,x2)+1] = 0
return grid
def map_labyrinth(seed, w=31, h=23):
rng = seeded_rng(seed)
grid = np.zeros((h, w), dtype=np.int8)
add_border_walls(grid)
for y in range(1, h-1):
for x in range(1, w-1):
if (x % 2 == 0 and rng.random() < 0.85) or (y % 3 == 0 and rng.random() < 0.55):
grid[y, x] = 1
for x in range(1, w-1):
grid[h//2, x] = 0
for x in range(3, w-3, 6):
for y in range(2, h-2):
if rng.random() < 0.75:
grid[y, x] = 0
return grid
def map_dense_field(seed, w=23, h=23):
rng = seeded_rng(seed)
grid = np.zeros((h, w), dtype=np.int8)
add_border_walls(grid)
for y in range(1, h-1):
for x in range(1, w-1):
if rng.random() < 0.22:
grid[y, x] = 1
for _ in range(6):
cx = rng.randint(3, w-4)
cy = rng.randint(3, h-4)
for yy in range(cy-2, cy+3):
for xx in range(cx-2, cx+3):
if 1 <= xx < w-1 and 1 <= yy < h-1:
grid[yy, xx] = 0
return grid
MAP_BUILDERS = {
"Training Bay": map_training,
"Arena+": map_arena_plus,
"Corridor Maze": map_corridor_maze,
"Rooms": map_rooms,
"Labyrinth": map_labyrinth,
"Dense Field": map_dense_field,
}
# -----------------------------
# State construction
# -----------------------------
def build_state(seed, map_name, progress=None, override=None):
rng = seeded_rng(seed)
grid = MAP_BUILDERS[map_name](seed)
pred, prey = pick_spawn_pair(grid, rng, min_dist=8)
pred_ori = rng.randint(0, 3)
prey_ori = (pred_ori + 2) % 4
if progress is None:
progress = {"catches": 0, "unlocked": compute_unlocks(0)}
st = {
"seed": int(seed),
"grid": grid,
"pred": pred,
"prey": prey,
"ori": pred_ori,
"prey_ori": prey_ori,
"control": "pred", # "pred" or "prey" (view + manual inputs)
"overlay": False, # coherence overlay
"disturbance": 0.0,
"last_impulse": 0.0,
"step": 0,
"caught": False,
"auto_chase": False,
"auto_run": False,
"log": [f"Reset into map: {map_name}"],
"map_name": map_name,
"progress": progress,
}
if override:
for k, v in override.items():
if k == "grid":
continue
st[k] = v
return st
# -----------------------------
# Save / Load helpers
# -----------------------------
def serialize_state(st):
catches = int(st["progress"]["catches"])
payload = {
"version": 2,
"seed": int(st["seed"]),
"map_name": str(st["map_name"]),
"step": int(st["step"]),
"pred": [int(st["pred"][0]), int(st["pred"][1])],
"prey": [int(st["prey"][0]), int(st["prey"][1])],
"ori": int(st["ori"]),
"prey_ori": int(st.get("prey_ori", 0)),
"control": str(st.get("control", "pred")),
"overlay": bool(st.get("overlay", False)),
"disturbance": float(st.get("disturbance", 0.0)),
"caught": bool(st["caught"]),
"auto_chase": bool(st["auto_chase"]),
"auto_run": bool(st["auto_run"]),
"catches": catches,
"log_tail": st["log"][-20:],
}
return payload
def deserialize_state(payload):
seed = int(payload.get("seed", 1))
map_name = str(payload.get("map_name", "Training Bay"))
if map_name not in MAP_BUILDERS:
map_name = "Training Bay"
catches = int(payload.get("catches", 0))
progress = {"catches": catches, "unlocked": compute_unlocks(catches)}
override = {
"step": int(payload.get("step", 0)),
"pred": tuple(payload.get("pred", [1, 1])),
"prey": tuple(payload.get("prey", [2, 2])),
"ori": int(payload.get("ori", 0)) % 4,
"prey_ori": int(payload.get("prey_ori", 0)) % 4,
"control": str(payload.get("control", "pred")) if str(payload.get("control", "pred")) in ("pred", "prey") else "pred",
"overlay": bool(payload.get("overlay", False)),
"disturbance": float(payload.get("disturbance", 0.0)),
"last_impulse": 0.0,
"caught": bool(payload.get("caught", False)),
"auto_chase": bool(payload.get("auto_chase", False)),
"auto_run": bool(payload.get("auto_run", False)),
"log": (payload.get("log_tail", []) or [])[:],
}
st = build_state(seed, map_name, progress=progress, override=override)
grid = st["grid"]
H, W = grid.shape
px, py = st["pred"]
qx, qy = st["prey"]
ok = (
0 <= px < W and 0 <= py < H and 0 <= qx < W and 0 <= qy < H
and grid[py, px] == 0 and grid[qy, qx] == 0
)
if not ok:
rng = seeded_rng(seed + 777)
st["pred"], st["prey"] = pick_spawn_pair(grid, rng, min_dist=8)
st["log"].append("Loaded save had invalid positions; respawned safely.")
st["log"].append("Loaded save.")
return st
def save_to_path(st, path):
payload = serialize_state(st)
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
st["log"].append(f"Saved to: {path}")
def load_from_path(path):
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
return deserialize_state(payload)
# -----------------------------
# Perception + rendering
# -----------------------------
def los_clear(grid, a, b):
ax, ay = a[0] + 0.5, a[1] + 0.5
bx, by = b[0] + 0.5, b[1] + 0.5
dx, dy = bx - ax, by - ay
dist = math.hypot(dx, dy)
if dist < 1e-6:
return True
dx /= dist
dy /= dist
x, y = ax, ay
steps = int(dist * 20)
H, W = grid.shape
for _ in range(steps):
x += dx * (dist / steps)
y += dy * (dist / steps)
cx, cy = int(x), int(y)
cx = clamp(cx, 0, W-1)
cy = clamp(cy, 0, H-1)
if grid[cy, cx] == 1:
return False
return True
def dda_raycast(grid, px, py, ray_dx, ray_dy, max_depth=MAX_DEPTH):
H, W = grid.shape
map_x = int(px)
map_y = int(py)
delta_dist_x = abs(1.0 / ray_dx) if abs(ray_dx) > 1e-9 else 1e9
delta_dist_y = abs(1.0 / ray_dy) if abs(ray_dy) > 1e-9 else 1e9
if ray_dx < 0:
step_x = -1
side_dist_x = (px - map_x) * delta_dist_x
else:
step_x = 1
side_dist_x = (map_x + 1.0 - px) * delta_dist_x
if ray_dy < 0:
step_y = -1
side_dist_y = (py - map_y) * delta_dist_y
else:
step_y = 1
side_dist_y = (map_y + 1.0 - py) * delta_dist_y
hit = False
side = 0
for _ in range(max_depth * 10):
if side_dist_x < side_dist_y:
side_dist_x += delta_dist_x
map_x += step_x
side = 0
else:
side_dist_y += delta_dist_y
map_y += step_y
side = 1
if map_x < 0 or map_x >= W or map_y < 0 or map_y >= H:
break
if grid[map_y, map_x] == 1:
hit = True
break
if not hit:
return max_depth, 0, map_x, map_y
if side == 0:
denom = ray_dx if abs(ray_dx) > 1e-9 else 1e-9
perp = (map_x - px + (1 - step_x) / 2) / denom
else:
denom = ray_dy if abs(ray_dy) > 1e-9 else 1e-9
perp = (map_y - py + (1 - step_y) / 2) / denom
perp = abs(perp)
perp = clamp(perp, 0.0005, max_depth)
return perp, side, map_x, map_y
def _apply_coherence_overlay(img, disturbance: float):
d = float(disturbance)
if d <= 0.001:
return img
alpha = clamp(d * 0.06, 0.0, 0.22) # subtle
h, w, _ = img.shape
cx, cy = w // 2, h // 2
edge = int(min(w, h) * 0.08)
if edge >= 2:
tint = np.array([22, 8, 18], dtype=np.float32)
img[:edge, :, :] = np.clip(img[:edge, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8)
img[h-edge:, :, :] = np.clip(img[h-edge:, :, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8)
img[:, :edge, :] = np.clip(img[:, :edge, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8)
img[:, w-edge:, :] = np.clip(img[:, w-edge:, :].astype(np.float32) + tint * alpha, 0, 255).astype(np.uint8)
line_col = np.array([180, 70, 160], dtype=np.float32)
for i in range(-40, 41):
x = cx + i
y = cy + int(i * 0.35)
if 0 <= x < w and 0 <= y < h:
img[y:y+1, x:x+1, :] = np.clip(img[y:y+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8)
y2 = cy - int(i * 0.35)
if 0 <= x < w and 0 <= y2 < h:
img[y2:y2+1, x:x+1, :] = np.clip(img[y2:y2+1, x:x+1, :].astype(np.float32) * (1-alpha) + line_col * alpha, 0, 255).astype(np.uint8)
return img
def render_first_person(st):
grid = st["grid"]
if st["control"] == "prey":
view_cell = st["prey"]
view_ori = st["prey_ori"]
other_cell = st["pred"]
else:
view_cell = st["pred"]
view_ori = st["ori"]
other_cell = st["prey"]
(cx, cy) = view_cell
px = cx + 0.5
py = cy + 0.5
fov = math.radians(FOV_DEG)
base = math.radians(ORI_DEG[view_ori])
img = np.zeros((VIEW_H, VIEW_W, 3), dtype=np.uint8)
img[:VIEW_H//2, :, :] = SKY
for y in range(VIEW_H//2, VIEW_H):
t = (y - VIEW_H//2) / max(1, (VIEW_H//2 - 1))
col = (FLOOR_NEAR * (1 - t) + FLOOR_FAR * t).astype(np.uint8)
img[y, :, :] = col
wall_dists = np.full(RAY_W, MAX_DEPTH, dtype=np.float32)
for x in range(RAY_W):
u = (x / (RAY_W - 1)) if RAY_W > 1 else 0.5
ang = base + (u - 0.5) * fov
ray_dx = math.cos(ang)
ray_dy = math.sin(ang)
dist, side, hitx, hity = dda_raycast(grid, px, py, ray_dx, ray_dy, MAX_DEPTH)
dist *= math.cos(ang - base)
dist = clamp(dist, 0.001, MAX_DEPTH)
wall_dists[x] = dist
slice_h = int((VIEW_H * 0.92) / dist)
slice_h = clamp(slice_h, 1, VIEW_H)
top = (VIEW_H - slice_h) // 2
bot = top + slice_h
shade = 1.0 / (1.0 + dist * 0.12)
shade = clamp(shade, 0.12, 1.0)
base_col = WALL_SIDE if side == 1 else WALL_BASE
checker = ((hitx + hity) & 1)
tex = 0.90 if checker == 0 else 1.05
col = np.clip(base_col.astype(np.float32) * shade * tex, 0, 255).astype(np.uint8)
x0 = int(x * VIEW_W / RAY_W)
x1 = int((x + 1) * VIEW_W / RAY_W)
if x1 <= x0:
x1 = x0 + 1
img[top:bot, x0:x1, :] = col
other_vis = False
if not st["caught"] and los_clear(grid, view_cell, other_cell):
vx = (other_cell[0] + 0.5) - px
vy = (other_cell[1] + 0.5) - py
other_dist = math.hypot(vx, vy)
other_ang = math.atan2(vy, vx)
rel = angle_diff_rad(other_ang, base)
if abs(rel) <= fov * 0.5 and other_dist < MAX_DEPTH:
other_vis = True
u = (rel / fov) + 0.5
sx_ray = int(u * (RAY_W - 1))
sx_ray = clamp(sx_ray, 0, RAY_W - 1)
sprite_h = int((VIEW_H * 0.75) / max(0.2, other_dist))
sprite_w = int(sprite_h * 0.45)
sprite_h = clamp(sprite_h, 8, VIEW_H)
sprite_w = clamp(sprite_w, 6, VIEW_W)
sx = int(sx_ray * VIEW_W / RAY_W)
sy = VIEW_H // 2
x0 = clamp(sx - sprite_w // 2, 0, VIEW_W - 1)
x1 = clamp(sx + sprite_w // 2, 0, VIEW_W - 1)
y0 = clamp(sy - sprite_h // 2, 0, VIEW_H - 1)
y1 = clamp(sy + sprite_h // 2, 0, VIEW_H - 1)
for vxcol in range(x0, x1):
rx = int(vxcol * RAY_W / VIEW_W)
rx = clamp(rx, 0, RAY_W - 1)
if other_dist < wall_dists[rx]:
img[y0:y1, vxcol:vxcol+1, :] = AGENT_OTHER_COLOR
cxh, cyh = VIEW_W // 2, VIEW_H // 2
img[cyh-1:cyh+2, cxh-12:cxh+13, :] = RETICLE
img[cyh-12:cyh+13, cxh-1:cxh+2, :] = RETICLE
hud_h = 26
img[:hud_h, :, :] = np.clip(img[:hud_h, :, :].astype(np.int16) + 20, 0, 255).astype(np.uint8)
def dot(x, y, c):
img[y:y+6, x:x+6, :] = c
dot(8, 10, np.array([90, 255, 140], np.uint8) if st["auto_chase"] else np.array([60, 60, 70], np.uint8))
dot(20, 10, np.array([120, 190, 255], np.uint8) if st["auto_run"] else np.array([60, 60, 70], np.uint8))
dot(32, 10, np.array([255, 140, 90], np.uint8) if other_vis else np.array([60, 60, 70], np.uint8))
if st.get("overlay", False):
img = _apply_coherence_overlay(img, st.get("disturbance", 0.0))
return img
def render_minimap(st, scale=14):
grid = st["grid"]
H, W = grid.shape
img = np.zeros((H*scale, W*scale, 3), dtype=np.uint8)
img[:, :, :] = np.array([18, 20, 32], dtype=np.uint8)
wall = np.array([220, 220, 235], dtype=np.uint8)
for y in range(H):
for x in range(W):
if grid[y, x] == 1:
img[y*scale:(y+1)*scale, x*scale:(x+1)*scale, :] = wall
px, py = st["pred"]
qx, qy = st["prey"]
pred_col = np.array([120, 190, 255], np.uint8)
prey_col = np.array([255, 140, 90], np.uint8)
img[py*scale:(py+1)*scale, px*scale:(px+1)*scale, :] = pred_col
img[qy*scale:(qy+1)*scale, qx*scale:(qx+1)*scale, :] = prey_col
dx, dy = DIRS[st["ori"]]
hx, hy = px + dx, py + dy
if 0 <= hx < W and 0 <= hy < H:
img[hy*scale:(hy+1)*scale, hx*scale:(hx+1)*scale, :] = np.array([80, 255, 160], np.uint8)
dx2, dy2 = DIRS[st["prey_ori"]]
hx2, hy2 = qx + dx2, qy + dy2
if 0 <= hx2 < W and 0 <= hy2 < H:
img[hy2*scale:(hy2+1)*scale, hx2*scale:(hx2+1)*scale, :] = np.array([255, 220, 120], np.uint8)
if st["control"] == "pred":
x0, y0 = px*scale, py*scale
else:
x0, y0 = qx*scale, qy*scale
ring = np.array([240, 240, 140], np.uint8)
img[y0:y0+scale, x0:x0+2, :] = ring
img[y0:y0+scale, x0+scale-2:x0+scale, :] = ring
img[y0:y0+2, x0:x0+scale, :] = ring
img[y0+scale-2:y0+scale, x0:x0+scale, :] = ring
return img
def unlock_summary(st):
catches = st["progress"]["catches"]
unlocked = st["progress"]["unlocked"]
lines = []
for name, need in MAP_UNLOCKS:
if name in unlocked:
lines.append(f"✅ {name} (unlocked)")
else:
lines.append(f"🔒 {name} (needs {need} catches)")
return "### Map progression\n" + "\n".join(lines) + f"\n\n**Total catches:** {catches}"
def status(st):
pred_ori_txt = ["E", "S", "W", "N"][st["ori"]]
prey_ori_txt = ["E", "S", "W", "N"][st["prey_ori"]]
tail = st["log"][-10:]
catches = st["progress"]["catches"]
current = st["map_name"]
mode = "Manual"
if st["auto_run"] and st["auto_chase"]:
mode = "AutoRun+AutoChase"
elif st["auto_run"] and not st["auto_chase"]:
mode = "Hybrid AutoRun (wander)"
ctrl = "Predator" if st["control"] == "pred" else "Prey"
coh = st.get("disturbance", 0.0)
return (
f"Map: {current} | Catches: {catches} | Step: {st['step']} | Mode: {mode} | Control: {ctrl} | Overlay: {st.get('overlay', False)}\n"
f"Predator: {st['pred']} {pred_ori_txt} | Prey: {st['prey']} {prey_ori_txt} | "
f"AutoChase: {st['auto_chase']} | AutoRun: {st['auto_run']} | Caught: {st['caught']} | Coherence: {coh:.2f}\n\n"
+ "\n".join(tail)
)
# -----------------------------
# Actions (manual + autonomous)
# -----------------------------
def _add_impulse(st, x):
st["last_impulse"] = float(st.get("last_impulse", 0.0)) + float(x)
def _step_disturbance(st):
d = float(st.get("disturbance", 0.0))
imp = float(st.get("last_impulse", 0.0))
st["disturbance"] = 0.92 * d + imp
st["last_impulse"] = 0.0
def _agent_pos_ori(st, who):
if who == "prey":
return st["prey"], st["prey_ori"]
return st["pred"], st["ori"]
def _set_agent_pos_ori(st, who, pos=None, ori=None):
if who == "prey":
if pos is not None: st["prey"] = pos
if ori is not None: st["prey_ori"] = int(ori) % 4
else:
if pos is not None: st["pred"] = pos
if ori is not None: st["ori"] = int(ori) % 4
def _turn(st, who, direction):
if st["caught"]:
return
_, ori = _agent_pos_ori(st, who)
ori = (ori + direction) % 4
_set_agent_pos_ori(st, who, ori=ori)
_add_impulse(st, 0.9)
def _forward(st, who):
if st["caught"]:
return
(x, y), ori = _agent_pos_ori(st, who)
dx, dy = DIRS[ori]
nx, ny = x + dx * MOVE_STEP, y + dy * MOVE_STEP
if st["grid"][ny, nx] == 0:
_set_agent_pos_ori(st, who, pos=(nx, ny))
_add_impulse(st, 0.25)
else:
st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} bumped wall.")
_add_impulse(st, 0.7)
def _check_catch_and_unlock(st):
if st["pred"] == st["prey"] and not st["caught"]:
st["caught"] = True
st["log"].append("CAUGHT the prey.")
st["progress"]["catches"] += 1
st["progress"]["unlocked"] = compute_unlocks(st["progress"]["catches"])
st["log"].append(f"Catches now {st['progress']['catches']}. Unlocks updated.")
_add_impulse(st, 1.2)
return True
return False
def prey_flee_step(st):
if st["caught"]:
return
rng = seeded_rng(st["seed"] + 1337 + st["step"] * 19)
px, py = st["prey"]
ax, ay = st["pred"]
options = [(0,0),(1,0),(-1,0),(0,1),(0,-1)]
scored = []
for dx, dy in options:
nx, ny = px + dx, py + dy
if st["grid"][ny, nx] == 1:
continue
dist = (nx-ax)**2 + (ny-ay)**2
scored.append((dist + rng.random()*0.1, (nx, ny), (dx, dy)))
if scored:
scored.sort(reverse=True)
pick = scored[0] if rng.random() < 0.78 else rng.choice(scored)
_, (nx, ny), (dx, dy) = pick
st["prey"] = (nx, ny)
if (dx, dy) in DIR_TO_ORI and (dx, dy) != (0,0):
st["prey_ori"] = DIR_TO_ORI[(dx, dy)]
def predator_wander_step(st):
if st["caught"]:
return
rng = seeded_rng(st["seed"] + 4242 + st["step"] * 23)
(x, y) = st["pred"]
ori = st["ori"]
dx, dy = DIRS[ori]
front_blocked = (st["grid"][y+dy, x+dx] == 1)
r = rng.random()
if front_blocked:
if r < 0.5:
_turn(st, "pred", -1); st["log"].append("AutoWander: avoid left.")
else:
_turn(st, "pred", +1); st["log"].append("AutoWander: avoid right.")
else:
if r < 0.72:
_forward(st, "pred"); st["log"].append("AutoWander: forward.")
elif r < 0.86:
_turn(st, "pred", -1); st["log"].append("AutoWander: turn left.")
else:
_turn(st, "pred", +1); st["log"].append("AutoWander: turn right.")
def predator_chase_step(st):
if st["caught"]:
return
grid = st["grid"]
px = st["pred"][0] + 0.5
py = st["pred"][1] + 0.5
base = math.radians(ORI_DEG[st["ori"]])
fov = math.radians(FOV_DEG)
prey = st["prey"]
if los_clear(grid, st["pred"], prey):
vx = (prey[0] + 0.5) - px
vy = (prey[1] + 0.5) - py
ang = math.atan2(vy, vx)
rel = angle_diff_rad(ang, base)
if abs(rel) <= fov * 0.5:
if rel < -0.10:
_turn(st, "pred", -1); st["log"].append("AutoChase: turn left.")
elif rel > 0.10:
_turn(st, "pred", +1); st["log"].append("AutoChase: turn right.")
else:
_forward(st, "pred"); st["log"].append("AutoChase: forward.")
return
predator_wander_step(st)
def prey_autopilot_step(st):
# if user is viewing prey and AutoRun is on, prey should still behave like prey (flee)
prey_flee_step(st)
st["log"].append("AutoPrey: flee.")
def tick(st):
if st["caught"]:
return False # unlock did not change
st["step"] += 1
unlock_changed = False
# AutoRun = autopilot for the currently viewed observer
if st["auto_run"]:
if st["control"] == "pred":
if st["auto_chase"]:
predator_chase_step(st)
else:
predator_wander_step(st)
else:
prey_autopilot_step(st)
# The non-controlled agent still runs its own policy each step
if st["control"] != "pred":
if st["auto_chase"]:
predator_chase_step(st)
else:
predator_wander_step(st)
if st["control"] != "prey":
prey_flee_step(st)
# capture + unlock
unlock_changed = _check_catch_and_unlock(st)
_step_disturbance(st)
if st["step"] >= 600:
st["caught"] = True
st["log"].append("Max steps reached (freeze).")
return unlock_changed
# -----------------------------
# Gradio handlers
# -----------------------------
def ui_refresh_slots(current_value=None):
choices = list_save_slots()
if current_value and current_value in choices:
value = current_value
else:
value = choices[0] if choices else "slot1.json"
return gr.Dropdown(choices=choices if choices else ["slot1.json"], value=value)
def ui_reset(seed, map_choice, st=None):
seed = int(seed)
progress = st["progress"] if st else {"catches": 0, "unlocked": compute_unlocks(0)}
if map_choice not in progress["unlocked"]:
map_choice = "Training Bay"
new_st = build_state(seed, map_choice, progress=progress)
if st:
new_st["control"] = st.get("control", "pred")
new_st["overlay"] = st.get("overlay", False)
return new_st, render_first_person(new_st), render_minimap(new_st), status(new_st), unlock_summary(new_st)
def ui_toggle_control(st):
st["control"] = "prey" if st["control"] == "pred" else "pred"
st["log"].append(f"Control switched to: {'Prey' if st['control']=='prey' else 'Predator'}.")
_add_impulse(st, 0.15)
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
def ui_turn_left(st):
who = st["control"]
_turn(st, who, -1)
st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn left.")
unlock_changed = tick(st)
if unlock_changed:
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
return st, render_first_person(st), render_minimap(st), status(st), gr.update() # avoid re-render if unchanged
def ui_turn_right(st):
who = st["control"]
_turn(st, who, +1)
st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} turn right.")
unlock_changed = tick(st)
if unlock_changed:
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_forward(st):
who = st["control"]
_forward(st, who)
st["log"].append(f"{'Prey' if who=='prey' else 'Predator'} forward.")
unlock_changed = tick(st)
if unlock_changed:
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_toggle_chase(st):
st["auto_chase"] = not st["auto_chase"]
st["log"].append(f"AutoChase set to {st['auto_chase']}.")
_add_impulse(st, 0.10)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_toggle_run(st):
st["auto_run"] = not st["auto_run"]
st["log"].append(f"AutoRun set to {st['auto_run']}.")
_add_impulse(st, 0.10)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_toggle_overlay(st):
st["overlay"] = not st.get("overlay", False)
st["log"].append(f"Overlay set to {st['overlay']}.")
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_tick(st):
unlock_changed = tick(st)
if unlock_changed:
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
def ui_timer(st):
# IMPORTANT: do NOT update unlock markdown here (prevents flashing)
if st["auto_run"] and not st["caught"]:
_ = tick(st)
return st, render_first_person(st), render_minimap(st), status(st)
def ui_swap_roles(st):
if st["caught"]:
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
st["pred"], st["prey"] = st["prey"], st["pred"]
st["ori"], st["prey_ori"] = st["prey_ori"], st["ori"]
st["log"].append("Swapped roles (Predator ⇄ Prey).")
_add_impulse(st, 0.35)
changed = _check_catch_and_unlock(st)
if changed:
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st)
return st, render_first_person(st), render_minimap(st), status(st), gr.update()
# ---- Save/load UI handlers ----
def ui_save_slot(st, slot_name):
try:
path = _slot_path(slot_name)
save_to_path(st, path)
export_path = path
except Exception as e:
st["log"].append(f"Save failed: {e}")
export_path = None
dd = ui_refresh_slots(os.path.basename(export_path) if export_path else None)
return st, render_first_person(st), render_minimap(st), status(st), unlock_summary(st), export_path, dd
def ui_load_slot(st, selected_slot):
path = os.path.join(SAVE_DIR, selected_slot) if selected_slot else _slot_path("slot1")
try:
if not os.path.exists(path):
st["log"].append(f"No save found at {path}")
dd = ui_refresh_slots(selected_slot)
return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd
loaded = load_from_path(path)
dd = ui_refresh_slots(os.path.basename(path))
return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd
except Exception as e:
st["log"].append(f"Load failed: {e}")
dd = ui_refresh_slots(selected_slot)
return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd
def ui_import_save(st, uploaded_file):
try:
if uploaded_file is None:
st["log"].append("Import: no file provided.")
dd = ui_refresh_slots()
return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd
loaded = load_from_path(uploaded_file)
dd = ui_refresh_slots()
return loaded, render_first_person(loaded), render_minimap(loaded), status(loaded), unlock_summary(loaded), None, dd
except Exception as e:
st["log"].append(f"Import failed: {e}")
dd = ui_refresh_slots()
return st, render_first_person(st), render_minimap(st), status(st), gr.update(), None, dd
# -----------------------------
# App
# -----------------------------
all_map_names = [name for name, _ in MAP_UNLOCKS]
initial_progress = {"catches": 0, "unlocked": compute_unlocks(0)}
initial_state = build_state(seed=1, map_name="Training Bay", progress=initial_progress)
initial_slots = list_save_slots()
initial_slot_value = initial_slots[0] if initial_slots else "slot1.json"
with gr.Blocks(title="RFT Predator Space — Symmetric Observers") as demo:
gr.Markdown(
"## Experience reality through an RFT observer agent’s perspective\n"
"**Accessibility note:** the progression panel is now event-driven (no flashing).\n\n"
"**AutoRun:** moves the current POV observer (first-person autopilot)."
)
st = gr.State(initial_state)
with gr.Row():
seed = gr.Number(label="Seed", value=1, precision=0)
map_choice = gr.Dropdown(label="Map (locked unless unlocked)", choices=all_map_names, value="Training Bay")
btn_reset = gr.Button("Reset")
btn_control = gr.Button("Toggle Control (Pred ↔ Prey)")
btn_tick = gr.Button("Tick")
with gr.Row():
btn_left = gr.Button("Turn Left")
btn_fwd = gr.Button("Forward")
btn_right = gr.Button("Turn Right")
with gr.Row():
btn_chase = gr.Button("Toggle AutoChase")
btn_run = gr.Button("Toggle AutoRun")
btn_overlay = gr.Button("Toggle Overlay (optional)")
btn_swap = gr.Button("Swap Roles (Pred ⇄ Prey)")
with gr.Row():
view = gr.Image(label="First-person observer view", type="numpy")
mini = gr.Image(label="Minimap (debug)", type="numpy")
with gr.Row():
info = gr.Textbox(label="Run log", lines=12)
unlocks = gr.Markdown(value=unlock_summary(initial_state))
gr.Markdown("### Save / Load")
with gr.Row():
slot_pick = gr.Dropdown(label="Existing saves", choices=initial_slots if initial_slots else ["slot1.json"], value=initial_slot_value)
slot_name = gr.Textbox(label="New save name (optional)", value="slot1")
with gr.Row():
btn_refresh = gr.Button("Refresh Saves List")
btn_save = gr.Button("Save (to name)")
btn_load = gr.Button("Load (selected)")
with gr.Row():
export_file = gr.File(label="Exported Save File (download this)", interactive=False)
import_file = gr.File(label="Import Save File (upload)", interactive=True)
btn_import = gr.Button("Import (Load Uploaded File)")
demo.load(
lambda: (st.value, render_first_person(st.value), render_minimap(st.value), status(st.value), unlock_summary(st.value), None, ui_refresh_slots(initial_slot_value)),
outputs=[st, view, mini, info, unlocks, export_file, slot_pick]
)
btn_reset.click(ui_reset, inputs=[seed, map_choice, st], outputs=[st, view, mini, info, unlocks])
btn_control.click(ui_toggle_control, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_left.click(ui_turn_left, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_right.click(ui_turn_right, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_fwd.click(ui_forward, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_chase.click(ui_toggle_chase, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_run.click(ui_toggle_run, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_overlay.click(ui_toggle_overlay, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_swap.click(ui_swap_roles, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_tick.click(ui_tick, inputs=[st], outputs=[st, view, mini, info, unlocks])
btn_refresh.click(lambda cur: ui_refresh_slots(cur), inputs=[slot_pick], outputs=[slot_pick])
btn_save.click(
lambda st_, name_, pick_: ui_save_slot(st_, name_ if (name_ and name_.strip()) else pick_),
inputs=[st, slot_name, slot_pick],
outputs=[st, view, mini, info, unlocks, export_file, slot_pick]
)
btn_load.click(
ui_load_slot,
inputs=[st, slot_pick],
outputs=[st, view, mini, info, unlocks, export_file, slot_pick]
)
btn_import.click(
ui_import_save,
inputs=[st, import_file],
outputs=[st, view, mini, info, unlocks, export_file, slot_pick]
)
# Timer outputs DO NOT include unlocks (prevents flashing)
if hasattr(gr, "Timer"):
gr.Timer(1.0 / AUTO_TICK_HZ).tick(
ui_timer,
inputs=[st],
outputs=[st, view, mini, info]
)
# queue() helps Timer behave reliably in Spaces
demo.queue().launch()