Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| CR Application Tool β Streamlit frontend. | |
| Three-step UI: | |
| 1. UPLOAD β upload Excel contribution list | |
| 2. PREVIEW β review accepted CRs | |
| 3. RUNNING β pipeline subprocess with live log | |
| 4. DONE/ERROR β download ZIP of results | |
| """ | |
| import hashlib | |
| import io | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import uuid | |
| import zipfile | |
| from collections import defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| import streamlit as st | |
| # ββ Load .env from the same directory as app.py βββββββββββββββββββββββββββββββ | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv(Path(__file__).parent / ".env") | |
| except ImportError: | |
| pass # python-dotenv not installed; rely on environment variables | |
| # ββ EOL credential verification βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def verify_eol_credentials(username: str, password: str) -> bool: | |
| import json as _json | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| import requests as _req | |
| session = _req.Session() | |
| session.get( | |
| "https://portal.etsi.org/LoginRedirection.aspx", | |
| verify=False, | |
| timeout=10, | |
| ) | |
| resp = session.post( | |
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", | |
| data=_json.dumps({"username": username, "password": password}), | |
| headers={"Content-Type": "application/json; charset=UTF-8"}, | |
| verify=False, | |
| allow_redirects=False, | |
| timeout=10, | |
| ) | |
| return resp.text.strip() != "Failed" | |
| # ββ Scripts dir (same folder as app.py / scripts/) βββββββββββββββββββββββββββ | |
| SCRIPTS_DIR = Path(__file__).parent / "scripts" | |
| sys.path.insert(0, str(SCRIPTS_DIR)) | |
| # ββ Session persistence βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_session_base() -> Path: | |
| """Use /data/cr_sessions if writable (HF persistent storage), else /tmp.""" | |
| candidate = Path("/data/cr_sessions") | |
| try: | |
| candidate.mkdir(parents=True, exist_ok=True) | |
| probe = candidate / ".write_test" | |
| probe.write_text("x") | |
| probe.unlink() | |
| return candidate | |
| except OSError: | |
| fallback = Path("/tmp/cr_sessions") | |
| fallback.mkdir(parents=True, exist_ok=True) | |
| return fallback | |
| SESSION_BASE = _get_session_base() | |
| def session_dir(sid: str) -> Path: | |
| d = SESSION_BASE / sid | |
| d.mkdir(parents=True, exist_ok=True) | |
| return d | |
| def _state_path(sid: str) -> Path: | |
| return session_dir(sid) / "state.json" | |
| def load_state(sid: str) -> dict | None: | |
| p = _state_path(sid) | |
| if p.exists(): | |
| try: | |
| return json.loads(p.read_text()) | |
| except Exception: | |
| return None | |
| return None | |
| def save_state(sid: str, state: dict) -> None: | |
| _state_path(sid).write_text(json.dumps(state, indent=2, default=str)) | |
| def new_state(sid: str) -> dict: | |
| return { | |
| "session_id": sid, | |
| "status": "login", | |
| "excel_filename": None, | |
| "person_name": "Ly Thanh PHAN", | |
| "cr_list": [], | |
| "pid": None, | |
| "output_dir": None, | |
| "log_path": None, | |
| "started_at": None, | |
| "completed_at": None, | |
| "return_code": None, | |
| # TS mode fields | |
| "mode": "contributor", # "contributor" | "ts" | |
| "excel_hash": "", | |
| "hf_repo": "OrganizedProgrammers/CR_Index", | |
| "index_log": "", | |
| "ts_id": "", | |
| # Logs for the current pipeline run (main + retry); reset on new run | |
| "run_log_paths": [], | |
| } | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _rc_path(sid: str) -> Path: | |
| return session_dir(sid) / "returncode" | |
| def _run_and_save_rc(proc: subprocess.Popen, rc_path: Path) -> None: | |
| """Background thread: wait for process, write return code to disk.""" | |
| proc.wait() | |
| rc_path.write_text(str(proc.returncode)) | |
| def read_return_code(sid: str) -> int | None: | |
| p = _rc_path(sid) | |
| if p.exists(): | |
| try: | |
| return int(p.read_text().strip()) | |
| except ValueError: | |
| return None | |
| return None | |
| def is_process_alive(pid: int) -> bool: | |
| try: | |
| os.kill(pid, 0) | |
| return True | |
| except (ProcessLookupError, PermissionError): | |
| return False | |
| def tail_log(log_path: str, n: int = 100) -> str: | |
| p = Path(log_path) | |
| if not p.exists(): | |
| return "(log not yet availableβ¦)" | |
| lines = p.read_text(errors="replace").splitlines() | |
| return "\n".join(lines[-n:]) | |
| def parse_log_results(log_path: str) -> list[dict]: | |
| """Extract per-TS result lines and warning messages from the Final/Retry Report.""" | |
| p = Path(log_path) | |
| if not p.exists(): | |
| return [] | |
| lines = p.read_text(errors="replace").splitlines() | |
| results, in_report = [], False | |
| current = None | |
| for line in lines: | |
| if "Final Report" in line or "Retry Summary" in line: | |
| in_report = True | |
| continue | |
| if not in_report: | |
| continue | |
| matched = False | |
| for tag in ("OK", "WARN", "FAIL", "SKIP"): | |
| if f"[{tag}]" in line: | |
| if current is not None: | |
| results.append(current) | |
| ts_name = line.split(f"[{tag}]", 1)[-1].strip() | |
| current = {"Status": tag, "TS": ts_name, "warnings": []} | |
| matched = True | |
| break | |
| if not matched and current is not None: | |
| stripped = line.strip() | |
| if stripped.startswith("! "): | |
| current["warnings"].append(stripped[2:]) | |
| if current is not None: | |
| results.append(current) | |
| return results | |
| def peek_submitted_by(excel_path: Path, max_names: int = 20) -> list[str]: | |
| """Return unique non-empty SubmittedBy values from the Excel (best-effort).""" | |
| try: | |
| ext = excel_path.suffix.lower() | |
| names: set[str] = set() | |
| if ext == ".xls": | |
| import xlrd | |
| wb = xlrd.open_workbook(str(excel_path)) | |
| try: | |
| ws = wb.sheet_by_name("Contributions") | |
| except xlrd.XLRDError: | |
| ws = wb.sheet_by_index(0) | |
| headers = [str(ws.cell_value(0, c)).strip() for c in range(ws.ncols)] | |
| by_col = next( | |
| (i for i, h in enumerate(headers) | |
| if h.lower() in ("submittedby", "submitted by")), | |
| None, | |
| ) | |
| if by_col is not None: | |
| for r in range(1, ws.nrows): | |
| v = str(ws.cell_value(r, by_col)).strip() | |
| if v: | |
| names.add(v) | |
| elif ext == ".xlsx": | |
| import openpyxl | |
| wb = openpyxl.load_workbook(str(excel_path), read_only=True, data_only=True) | |
| ws = wb["Contributions"] if "Contributions" in wb.sheetnames else wb.active | |
| rows = iter(ws.iter_rows(values_only=True)) | |
| headers = [str(c).strip() if c is not None else "" for c in next(rows, [])] | |
| by_col = next( | |
| (i for i, h in enumerate(headers) | |
| if h.lower() in ("submittedby", "submitted by")), | |
| None, | |
| ) | |
| if by_col is not None: | |
| for row in rows: | |
| v = str(row[by_col]).strip() if row[by_col] is not None else "" | |
| if v and v != "None": | |
| names.add(v) | |
| return sorted(names)[:max_names] | |
| except Exception: | |
| return [] | |
| def make_zip(output_dir: Path) -> bytes: | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for f in output_dir.rglob("*"): | |
| if f.is_file(): | |
| zf.write(f, f.relative_to(output_dir.parent)) | |
| buf.seek(0) | |
| return buf.read() | |
| def load_hf_index_cached(hf_token: str, hf_repo: str) -> list[dict]: | |
| """Load HF CR index, caching result in session_state to avoid redundant fetches.""" | |
| key = f"hf_index_{hf_repo}" | |
| if key not in st.session_state: | |
| from hf_cr_index import load_hf_index | |
| try: | |
| st.session_state[key] = load_hf_index(hf_token, hf_repo) | |
| except Exception: | |
| st.session_state[key] = [] | |
| return st.session_state[key] | |
| def _launch_proc(cmd, env, log_path, sid, state, extra_state: dict): | |
| """Open log_path, Popen cmd, start rc-writer thread, update state, rerun.""" | |
| rc_path = _rc_path(sid) | |
| rc_path.unlink(missing_ok=True) | |
| log_file = open(str(log_path), "w") | |
| proc = subprocess.Popen(cmd, stdout=log_file, stderr=subprocess.STDOUT, env=env) | |
| log_file.close() | |
| threading.Thread(target=_run_and_save_rc, args=(proc, rc_path), daemon=True).start() | |
| st.session_state.proc = proc | |
| state.update(extra_state) | |
| state["pid"] = proc.pid | |
| state["started_at"] = datetime.now().isoformat() | |
| save_state(sid, state) | |
| st.rerun() | |
| # ββ Page config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.set_page_config( | |
| page_title="CR Application Tool", | |
| page_icon="π", | |
| layout="centered", | |
| ) | |
| st.title("π CR Application Tool") | |
| st.caption("Upload an ETSI/3GPP Excel contribution list β preview accepted CRs β apply all β download ZIP.") | |
| # ββ Session init ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| params = st.query_params | |
| if "sid" not in st.session_state: | |
| if "sid" in params: | |
| candidate = params["sid"] | |
| existing = load_state(candidate) | |
| if existing: | |
| st.session_state.sid = candidate | |
| st.session_state.state = existing | |
| else: | |
| sid = str(uuid.uuid4()) | |
| st.session_state.sid = sid | |
| st.session_state.state = new_state(sid) | |
| st.query_params["sid"] = sid | |
| else: | |
| sid = str(uuid.uuid4()) | |
| st.session_state.sid = sid | |
| st.session_state.state = new_state(sid) | |
| st.query_params["sid"] = sid | |
| sid: str = st.session_state.sid | |
| state: dict = st.session_state.state | |
| # Credential guard: if credentials are not in memory (e.g. page refresh after login), | |
| # force re-login regardless of the persisted status. | |
| if state.get("status") not in ("login",) and "eol_user" not in st.session_state: | |
| state["status"] = "login" | |
| # ββ Sidebar βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with st.sidebar: | |
| st.header("Session") | |
| st.caption(f"ID: `{sid[:8]}β¦`") | |
| st.divider() | |
| st.subheader("Resume a session") | |
| resume_sid = st.text_input("Paste a session ID") | |
| if st.button("Resume") and resume_sid.strip(): | |
| existing = load_state(resume_sid.strip()) | |
| if existing: | |
| st.session_state.sid = resume_sid.strip() | |
| st.session_state.state = existing | |
| st.query_params["sid"] = resume_sid.strip() | |
| st.rerun() | |
| else: | |
| st.error("Session not found.") | |
| # ββ State machine βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| status: str = state["status"] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOGIN | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if status == "login": | |
| st.subheader("Connect with your ETSI EOL account") | |
| st.info( | |
| "Your credentials are used only for this session and are never stored on disk.", | |
| icon="π", | |
| ) | |
| username = st.text_input("EOL Username") | |
| password = st.text_input("EOL Password", type="password") | |
| if st.button("Connect", type="primary"): | |
| if not username or not password: | |
| st.error("Please enter both username and password.") | |
| else: | |
| with st.spinner("Verifying credentialsβ¦"): | |
| ok = verify_eol_credentials(username, password) | |
| if ok: | |
| st.session_state.eol_user = username | |
| st.session_state.eol_password = password | |
| state["status"] = "upload" | |
| save_state(sid, state) | |
| st.rerun() | |
| else: | |
| st.error("Login failed β check your EOL username and password.") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UPLOAD | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "upload": | |
| st.subheader("Step 1 β Upload contribution list") | |
| mode_label = st.radio( | |
| "Pipeline mode", | |
| ["By contributor name", "By TS (all CRs for a spec)"], | |
| key="mode_radio", | |
| ) | |
| pipeline_mode = "contributor" if mode_label.startswith("By contributor") else "ts" | |
| state["mode"] = pipeline_mode | |
| # ββ Resolve active Excel (saved from a previous run, or freshly uploaded) ββ | |
| saved_name = state.get("excel_filename") | |
| saved_path = session_dir(sid) / saved_name if saved_name else None | |
| has_saved = saved_path is not None and saved_path.exists() | |
| if has_saved: | |
| st.success(f"Using: **{saved_name}**") | |
| with st.expander("Replace Excel file"): | |
| uploaded = st.file_uploader( | |
| "Upload a different Excel file (.xlsx or .xls)", | |
| type=["xlsx", "xls"], | |
| key="excel_replace", | |
| ) | |
| else: | |
| uploaded = st.file_uploader( | |
| "Excel contribution list (.xlsx or .xls)", | |
| type=["xlsx", "xls"], | |
| ) | |
| # When a new file is dropped, save it immediately and update state | |
| if uploaded: | |
| active_path = session_dir(sid) / uploaded.name | |
| active_bytes = bytes(uploaded.getbuffer()) | |
| active_path.write_bytes(active_bytes) | |
| state["excel_filename"] = uploaded.name | |
| state["excel_hash"] = hashlib.sha256(active_bytes).hexdigest()[:16] | |
| save_state(sid, state) | |
| elif has_saved: | |
| active_path = saved_path | |
| else: | |
| active_path = None | |
| if pipeline_mode == "contributor": | |
| person_name = st.text_input( | |
| "Contributor name (must match SubmittedBy column)", | |
| value=state.get("person_name", "Ly Thanh PHAN"), | |
| ) | |
| if active_path and st.button("Parse CR list β", type="primary"): | |
| with st.spinner("Parsing Excelβ¦"): | |
| try: | |
| from fetch_crs import parse_excel | |
| cr_list = parse_excel(str(active_path), person_name) | |
| state["status"] = "preview" | |
| state["person_name"] = person_name | |
| state["cr_list"] = [list(row) for row in cr_list] | |
| save_state(sid, state) | |
| st.rerun() | |
| except Exception as exc: | |
| st.error(f"Failed to parse Excel: {exc}") | |
| else: # TS mode | |
| if active_path: | |
| excel_hash = state.get("excel_hash") or hashlib.sha256(active_path.read_bytes()).hexdigest()[:16] | |
| state["excel_hash"] = excel_hash | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| hf_repo = state.get("hf_repo", "OrganizedProgrammers/CR_Index") | |
| # Check whether this Excel is already indexed | |
| existing = load_hf_index_cached(hf_token, hf_repo) | |
| already_indexed = any(r.get("excel_hash") == excel_hash for r in existing) | |
| if already_indexed: | |
| st.success(f"This Excel (`{excel_hash}`) is already indexed in HF.") | |
| if st.button("Select TS β", type="primary"): | |
| state["status"] = "ts_select" | |
| save_state(sid, state) | |
| st.rerun() | |
| else: | |
| st.info(f"Excel hash: `{excel_hash}` β not yet indexed.") | |
| if st.button("Build CR Index", type="primary"): | |
| # CRs downloaded during indexing go to the session-level cache | |
| cr_cache_dir = session_dir(sid) / "CRs" | |
| cr_cache_dir.mkdir(parents=True, exist_ok=True) | |
| index_log = str(session_dir(sid) / "index.log") | |
| rc_path = _rc_path(sid) | |
| rc_path.unlink(missing_ok=True) | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "build_cr_index.py"), | |
| str(active_path), | |
| "--output-dir", str(session_dir(sid)), | |
| "--hf-repo", hf_repo, | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| # HF_TOKEN is already in env via os.environ | |
| _launch_proc(cmd, env, index_log, sid, state, { | |
| "status": "indexing", | |
| "index_log": index_log, | |
| "output_dir": "", # no pipeline output yet | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # INDEXING (build_cr_index.py running) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "indexing": | |
| pid = state["pid"] | |
| index_log = state.get("index_log", "") | |
| proc = st.session_state.get("proc") | |
| alive = False | |
| if proc is not None: | |
| alive = proc.poll() is None | |
| else: | |
| rc = read_return_code(sid) | |
| if rc is None: | |
| alive = is_process_alive(pid) | |
| if alive: | |
| st.subheader("β³ Building CR Indexβ¦") | |
| st.info(f"PID {pid} β started {state.get('started_at', '')[:19]}") | |
| log_text = tail_log(index_log, 50) | |
| st.text_area("Live log (last 50 lines)", value=log_text, height=400) | |
| time.sleep(2) | |
| st.rerun() | |
| else: | |
| rc = read_return_code(sid) | |
| if rc is None and proc is not None: | |
| rc = proc.returncode | |
| state["return_code"] = rc | |
| state["completed_at"] = datetime.now().isoformat() | |
| if rc == 0: | |
| # Invalidate cached HF index so ts_select gets fresh data | |
| st.session_state.pop(f"hf_index_{state.get('hf_repo', '')}", None) | |
| state["status"] = "ts_select" | |
| else: | |
| # Expose the index log as log_path so the error state can display it | |
| state["log_path"] = state.get("index_log", "") | |
| state["status"] = "error" | |
| save_state(sid, state) | |
| st.rerun() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TS_SELECT (index ready β pick a spec to process) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "ts_select": | |
| st.subheader("Step 2 β Select target TS spec") | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| hf_repo = state.get("hf_repo", "OrganizedProgrammers/CR_Index") | |
| excel_hash = state.get("excel_hash", "") | |
| with st.spinner("Loading CR index from HuggingFaceβ¦"): | |
| records = load_hf_index_cached(hf_token, hf_repo) | |
| records_for_excel = [r for r in records if r.get("excel_hash") == excel_hash] | |
| if not records_for_excel: | |
| st.error(f"No records found for excel_hash `{excel_hash}`. Try rebuilding the index.") | |
| if st.button("β Back to upload"): | |
| state["status"] = "upload" | |
| save_state(sid, state) | |
| st.rerun() | |
| else: | |
| by_spec = defaultdict(list) | |
| for r in records_for_excel: | |
| by_spec[r["spec_number"]].append(r) | |
| spec_options = sorted(by_spec.keys()) | |
| selected_spec = st.selectbox("Select target TS spec", spec_options) | |
| if selected_spec: | |
| versions = defaultdict(list) | |
| for r in by_spec[selected_spec]: | |
| versions[r["version"]].append(r["uid"]) | |
| st.write("**Versions found:**") | |
| for ver, uids in sorted(versions.items()): | |
| st.write(f" v{ver}: {len(uids)} CR(s) β {', '.join(uids)}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("β Back"): | |
| state["status"] = "upload" | |
| save_state(sid, state) | |
| st.rerun() | |
| with col2: | |
| if st.button("βΆ Apply CRs for this TS", type="primary"): | |
| # Per-run directory so each pipeline's outputs are isolated | |
| run_id = int(time.time()) | |
| output_dir = session_dir(sid) / f"run_{run_id}" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| cr_cache_dir = session_dir(sid) / "CRs" | |
| cr_cache_dir.mkdir(parents=True, exist_ok=True) | |
| log_path = session_dir(sid) / f"pipeline_{run_id}.log" | |
| rc_path = _rc_path(sid) | |
| rc_path.unlink(missing_ok=True) | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "orchestrate_cr.py"), | |
| "--output-dir", str(output_dir), | |
| "--cr-cache-dir", str(cr_cache_dir), | |
| "--ts-mode", | |
| "--ts-id", selected_spec, | |
| "--excel-hash", excel_hash, | |
| "--hf-repo", hf_repo, | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| # HF_TOKEN already in env via os.environ | |
| _launch_proc(cmd, env, log_path, sid, state, { | |
| "ts_id": selected_spec, | |
| "status": "running", | |
| "output_dir": str(output_dir), | |
| "log_path": str(log_path), | |
| "run_log_paths": [str(log_path)], | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PREVIEW | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "preview": | |
| cr_list = state["cr_list"] | |
| st.subheader(f"Step 2 β {len(cr_list)} Accepted CR(s) found") | |
| if cr_list: | |
| import pandas as pd | |
| df = pd.DataFrame(cr_list, columns=["UID", "Title"]) | |
| st.dataframe(df, use_container_width=True) | |
| else: | |
| st.warning( | |
| f"No Accepted CRs found for **{state['person_name']}** in this file." | |
| ) | |
| # Diagnostic: show what names are in the SubmittedBy column | |
| excel_path = session_dir(sid) / state["excel_filename"] | |
| found_names = peek_submitted_by(excel_path) | |
| if found_names: | |
| st.info( | |
| "**Names found in SubmittedBy column** β copy the exact one into the field above and re-upload:\n\n" | |
| + "\n".join(f"- `{n}`" for n in found_names) | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("β Back"): | |
| state["status"] = "upload" | |
| state["cr_list"] = [] | |
| save_state(sid, state) | |
| st.rerun() | |
| with col2: | |
| if cr_list and st.button("βΆ Start Pipeline", type="primary"): | |
| excel_path = session_dir(sid) / state["excel_filename"] | |
| # Per-run directory so each pipeline's outputs are isolated | |
| run_id = int(time.time()) | |
| output_dir = session_dir(sid) / f"run_{run_id}" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| cr_cache_dir = session_dir(sid) / "CRs" | |
| cr_cache_dir.mkdir(parents=True, exist_ok=True) | |
| log_path = session_dir(sid) / f"pipeline_{run_id}.log" | |
| rc_path = _rc_path(sid) | |
| rc_path.unlink(missing_ok=True) | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "orchestrate_cr.py"), | |
| str(excel_path), | |
| state["person_name"], | |
| "--output-dir", str(output_dir), | |
| "--cr-cache-dir", str(cr_cache_dir), | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| _launch_proc(cmd, env, log_path, sid, state, { | |
| "status": "running", | |
| "output_dir": str(output_dir), | |
| "log_path": str(log_path), | |
| "run_log_paths": [str(log_path)], | |
| }) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RUNNING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status == "running": | |
| pid = state["pid"] | |
| log_path = state["log_path"] | |
| # Determine whether process is still alive | |
| proc = st.session_state.get("proc") | |
| alive = False | |
| if proc is not None: | |
| alive = proc.poll() is None | |
| else: | |
| # Session reloaded β check returncode file, then PID | |
| rc = read_return_code(sid) | |
| if rc is None: | |
| alive = is_process_alive(pid) | |
| if alive: | |
| st.subheader("β³ Pipeline runningβ¦") | |
| st.info(f"PID {pid} β started {state.get('started_at', '')[:19]}") | |
| log_text = tail_log(log_path, 100) | |
| st.text_area("Live log (last 100 lines)", value=log_text, height=400) | |
| time.sleep(2) | |
| st.rerun() | |
| else: | |
| # Process finished β determine return code | |
| rc = read_return_code(sid) | |
| if rc is None and proc is not None: | |
| rc = proc.returncode | |
| state["return_code"] = rc | |
| state["completed_at"] = datetime.now().isoformat() | |
| state["status"] = "done" if rc == 0 else "error" | |
| save_state(sid, state) | |
| st.rerun() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DONE / ERROR | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif status in ("done", "error"): | |
| log_path = state.get("log_path", "") | |
| output_dir = Path(state.get("output_dir", "")) | |
| rc = state.get("return_code") | |
| if status == "done": | |
| st.success("β Pipeline completed successfully!") | |
| else: | |
| st.error(f"β Pipeline finished with errors (return code: {rc})") | |
| # Per-TS results table β merge this run's logs so retry results supersede | |
| # the original failed entries for the same TS key. | |
| _merged: dict[str, dict] = {} | |
| for _lf in state.get("run_log_paths", []): | |
| for _r in parse_log_results(_lf): | |
| _merged[_r["TS"]] = _r | |
| results = list(_merged.values()) | |
| if results: | |
| st.subheader("Results per TS") | |
| import pandas as pd | |
| n_warn = sum(1 for r in results if r["warnings"]) | |
| warn_label = f"Warnings ({n_warn})" if n_warn else "Warnings" | |
| tab_summary, tab_warnings = st.tabs(["Summary", warn_label]) | |
| def _color_status(val): | |
| return { | |
| "OK": "background-color: #d4edda; color: #155724", | |
| "WARN": "background-color: #fff3cd; color: #856404", | |
| "FAIL": "background-color: #f8d7da; color: #721c24", | |
| "SKIP": "background-color: #e2e3e5; color: #383d41", | |
| }.get(val, "") | |
| with tab_summary: | |
| df = pd.DataFrame([{"Status": r["Status"], "TS": r["TS"]} for r in results]) | |
| st.dataframe( | |
| df.style.map(_color_status, subset=["Status"]), | |
| use_container_width=True, | |
| ) | |
| with tab_warnings: | |
| warned = [r for r in results if r["warnings"]] | |
| if warned: | |
| for r in warned: | |
| with st.expander(f"β οΈ {r['TS']} β {len(r['warnings'])} warning(s)"): | |
| for w in r["warnings"]: | |
| st.text(w) | |
| else: | |
| st.success("No warnings.") | |
| # Download ZIP | |
| if output_dir.exists() and any(output_dir.rglob("*")): | |
| st.subheader("Download results") | |
| zip_bytes = make_zip(output_dir) | |
| st.download_button( | |
| label="β¬ Download results ZIP", | |
| data=zip_bytes, | |
| file_name=f"cr_results_{sid[:8]}.zip", | |
| mime="application/zip", | |
| type="primary", | |
| ) | |
| else: | |
| st.warning("Output directory is empty β nothing to download.") | |
| # Full log | |
| with st.expander("Full pipeline log"): | |
| if log_path and Path(log_path).exists(): | |
| st.text(Path(log_path).read_text(errors="replace")) | |
| else: | |
| st.text("Log not found.") | |
| # ββ TS Recovery βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| failed_ts_path = output_dir / "failed_ts.json" | |
| if failed_ts_path.exists(): | |
| failed_ts_entries = json.loads(failed_ts_path.read_text()) | |
| if failed_ts_entries: | |
| st.divider() | |
| st.subheader("β οΈ Recover failed TS downloads") | |
| st.info( | |
| f"{len(failed_ts_entries)} TS(s) could not be downloaded. " | |
| "Retry or upload each one manually, then apply the CRs." | |
| ) | |
| for entry in failed_ts_entries: | |
| spec_key = f"{entry['spec_number']} v{entry['version']}" | |
| dest_path = Path(entry["spec_dir"]) / entry["expected_filename"] | |
| ready = dest_path.exists() | |
| label = f"{'β ' if ready else 'β'} TS {spec_key} β CRs: {', '.join(entry['cr_uids'])}" | |
| with st.expander(label, expanded=not ready): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π Retry download", | |
| key=f"retry_{entry['spec_compact']}_{entry['version']}"): | |
| from fetch_crs import download_ts as _dl_ts | |
| with st.spinner(f"Downloading TS {spec_key}β¦"): | |
| fn, note = _dl_ts( | |
| entry["spec_number"], entry["version"], | |
| Path(entry["spec_dir"]), | |
| st.session_state.eol_user, | |
| st.session_state.eol_password, | |
| ) | |
| if fn: | |
| st.success(f"Downloaded: {fn}") | |
| st.rerun() | |
| else: | |
| st.error(f"Failed: {note}") | |
| with col2: | |
| uploaded_ts = st.file_uploader( | |
| f"Or upload `{entry['expected_filename']}`", | |
| type=["docx"], | |
| key=f"upload_{entry['spec_compact']}_{entry['version']}", | |
| ) | |
| if uploaded_ts is not None: | |
| Path(entry["spec_dir"]).mkdir(parents=True, exist_ok=True) | |
| dest_path.write_bytes(uploaded_ts.read()) | |
| st.success("Saved β") | |
| st.rerun() | |
| # Global apply button β enabled when β₯1 TS is now on disk | |
| ready_entries = [ | |
| e for e in failed_ts_entries | |
| if (Path(e["spec_dir"]) / e["expected_filename"]).exists() | |
| ] | |
| remaining = len(failed_ts_entries) - len(ready_entries) | |
| if ready_entries: | |
| if remaining: | |
| st.warning(f"{len(ready_entries)} ready, {remaining} will be skipped.") | |
| else: | |
| st.success(f"All {len(ready_entries)} TS(s) ready.") | |
| if st.button("βΆ Apply CRs to recovered TSs", type="primary"): | |
| retry_log = str(session_dir(sid) / f"pipeline_{int(time.time())}_retry.log") | |
| cmd = [ | |
| sys.executable, | |
| str(SCRIPTS_DIR / "orchestrate_cr.py"), | |
| "--output-dir", state["output_dir"], | |
| "--retry-mode", | |
| ] | |
| env = os.environ.copy() | |
| env["EOL_USER"] = st.session_state.eol_user | |
| env["EOL_PASSWORD"] = st.session_state.eol_password | |
| _launch_proc(cmd, env, retry_log, sid, state, { | |
| "status": "running", | |
| "log_path": retry_log, | |
| "run_log_paths": state.get("run_log_paths", []) + [retry_log], | |
| }) | |
| else: | |
| st.warning("No TSs available yet β retry download or upload DOCX files above.") | |
| # Navigation | |
| st.divider() | |
| col_restart, col_new = st.columns(2) | |
| with col_restart: | |
| if st.button("β Run another pipeline", type="primary"): | |
| # Reset pipeline fields, keep session ID and credentials | |
| for _k in ("cr_list", "pid", "output_dir", "log_path", "index_log", | |
| "started_at", "completed_at", "return_code", "ts_id"): | |
| state[_k] = None if _k != "cr_list" else [] | |
| # excel_filename, excel_hash and person_name are intentionally kept | |
| # so the user does not have to re-upload on the next run. | |
| state["run_log_paths"] = [] | |
| state["status"] = "upload" | |
| if "proc" in st.session_state: | |
| del st.session_state.proc | |
| _rc_path(sid).unlink(missing_ok=True) | |
| save_state(sid, state) | |
| st.rerun() | |
| with col_new: | |
| if st.button("Start new session"): | |
| new_sid = str(uuid.uuid4()) | |
| st.session_state.sid = new_sid | |
| st.session_state.state = new_state(new_sid) | |
| if "proc" in st.session_state: | |
| del st.session_state.proc | |
| st.query_params["sid"] = new_sid | |
| save_state(new_sid, st.session_state.state) | |
| st.rerun() | |