| |
| """ |
| logs_interpreter.py |
| |
| Parse log files, call the CBORG model to diagnose root causes of failures (or confirm success), and output its analysis. |
| """ |
| import os |
| import sys |
| import argparse |
|
|
| try: |
| from openai import OpenAI |
| except ImportError: |
| print("Please install openai (pip install openai)") |
| sys.exit(1) |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser( |
| description="Analyze run logs and ask CBORG model for root-cause analysis" |
| ) |
| parser.add_argument( |
| "--log_dir", default=".", |
| help="Directory containing .txt log files (default: current directory)" |
| ) |
| parser.add_argument( |
| "--model", default="lbl/cborg-deepthought", |
| help="CBORG model to use (default: lbl/cborg-deepthought)" |
| ) |
| parser.add_argument( |
| "--output", default=None, |
| help="File to write the model's analysis (default: stdout)" |
| ) |
| return parser.parse_args() |
|
|
|
|
| def gather_logs(log_dir): |
| |
| if os.path.isdir(os.path.join(log_dir, 'logs')): |
| log_base = os.path.join(log_dir, 'logs') |
| else: |
| log_base = log_dir |
| |
| files = [f for f in sorted(os.listdir(log_base)) if f.endswith('.txt')] |
| groups = {} |
| for fname in files: |
| if '_' in fname: |
| base = fname.rsplit('_', 1)[0] |
| else: |
| base = fname.rsplit('.', 1)[0] |
| groups.setdefault(base, []).append(fname) |
| segments = [] |
| |
| for base, flist in groups.items(): |
| segments.append(f"=== Log group: {base} ===") |
| for fname in flist: |
| path = os.path.join(log_dir, fname) |
| try: |
| with open(path, 'r') as f: |
| content = f.read().strip() |
| except Exception as e: |
| content = f"<could not read: {e}>" |
| segments.append(f"-- {fname} --\n{content}") |
| segments.append("") |
|
|
| |
| |
| |
| candidates = [os.path.join(log_dir, 'snakemake_log'), |
| os.path.join(log_dir, '.snakemake', 'log')] |
| for sn_dir in candidates: |
| if os.path.isdir(sn_dir): |
| for fname in sorted(os.listdir(sn_dir)): |
| if fname.endswith('.log'): |
| path = os.path.join(sn_dir, fname) |
| try: |
| with open(path, 'r') as f: |
| content = f.read().strip() |
| except Exception as e: |
| content = f"<could not read: {e}>" |
| segments.append(f"=== Snakemake Log File: {fname} ===") |
| segments.append(content) |
| segments.append("") |
| return "\n".join(segments) |
|
|
|
|
| def call_cborg(prompt, model): |
| api_key = os.getenv("CBORG_API_KEY") or os.getenv("OPENAI_API_KEY") |
| if not api_key: |
| print("Error: CBORG_API_KEY or OPENAI_API_KEY environment variable not set.") |
| sys.exit(1) |
| |
| cborg_url = os.getenv("CBORG_API_URL", "https://api.cborg.lbl.gov") |
| client = OpenAI(api_key=api_key, base_url=cborg_url) |
| |
| response = client.chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are a log root-cause analyzer. Provide a concise diagnosis."}, |
| {"role": "user", "content": prompt}, |
| ], |
| temperature=0.2, |
| ) |
| |
| choice = response.choices[0] |
| content = None |
| if hasattr(choice, 'message') and choice.message: |
| content = getattr(choice.message, 'content', None) |
| if content is None and hasattr(choice, 'text'): |
| content = choice.text |
| if content is None: |
| content = '' |
| return content.strip() |
|
|
|
|
| def main(): |
| args = parse_args() |
| |
| runs = [d for d in sorted(os.listdir(args.log_dir)) |
| if os.path.isdir(os.path.join(args.log_dir, d)) and d != '.snakemake'] |
| |
| |
| log_folder = os.path.join(args.log_dir, 'logs') if os.path.isdir(os.path.join(args.log_dir, 'logs')) else args.log_dir |
| if runs and os.path.isdir(os.path.join(args.log_dir, runs[0], 'logs')): |
| combined = [] |
| for run in runs: |
| combined.append(f"=== Run: {run} ===") |
| run_log_dir = os.path.join(args.log_dir, run, 'logs') |
| combined.append(gather_logs(run_log_dir)) |
| |
| root_snake = os.path.join(args.log_dir, '.snakemake', 'log') |
| if os.path.isdir(root_snake): |
| combined.append("=== Root Snakemake Logs ===") |
| for fname in sorted(os.listdir(root_snake)): |
| if fname.endswith('.log'): |
| path = os.path.join(root_snake, fname) |
| try: |
| content = open(path).read().strip() |
| except Exception: |
| content = "<could not read>" |
| combined.append(f"-- {fname} --\n{content}") |
| logs = "\n\n".join(combined) |
| else: |
| |
| logs = gather_logs(log_folder) |
| |
| try: |
| entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) |
| listing = "=== Logs directory files (txt) ===\n" + "\n".join(entries) + "\n\n" |
| except Exception: |
| listing = "" |
| logs = listing + logs |
| if not logs: |
| print(f"No log files found in {args.log_dir}") |
| sys.exit(0) |
|
|
| |
| stats_file = os.path.join(args.log_dir, 'stats.csv') |
| if os.path.isfile(stats_file): |
| try: |
| with open(stats_file, 'r') as sf: |
| stats_content = sf.read().strip() |
| except Exception as e: |
| stats_content = f"<could not read stats.csv: {e}>" |
| |
| logs = f"=== Stats Summary ===\n{stats_content}\n\n" |
| |
| try: |
| with open(stats_file, 'r') as sf: |
| |
| content = sf.read().strip() |
| lines = content.split('\n') |
| |
| |
| data_line = None |
| for line in lines: |
| if line.strip().startswith('* '): |
| data_line = line.strip()[2:] |
| break |
| |
| if data_line: |
| |
| parts = [part.strip() for part in data_line.split(',')] |
| if len(parts) >= 16: |
| stats_row = { |
| 'step 1 success?': parts[1], |
| 'step 2 success?': parts[6], |
| 'step 3 success?': parts[11], |
| } |
| else: |
| stats_row = {} |
| else: |
| stats_row = {} |
| except Exception as e: |
| print(f"Warning: Could not parse CSV: {e}") |
| stats_row = {} |
| |
| step_rules = { |
| '1': ['create_numpy', 'insert_root_summary', 'preprocess', 'summarize_root'], |
| '2': ['scores'], |
| '3': ['categorization'], |
| } |
| |
| entries = [] |
| try: |
| entries = sorted(f for f in os.listdir(log_folder) if f.endswith('.txt')) |
| except Exception: |
| pass |
| |
| filtered = [] |
| |
| |
| filtered.append("=== STEP STATUS FROM STATS.CSV ===") |
| for step, rules in step_rules.items(): |
| key = f'step {step} success?' |
| status = stats_row.get(key, 'Unknown').strip() |
| filtered.append(f"Step {step}: {status}") |
| filtered.append("") |
| |
| |
| failed_steps = [] |
| for step, rules in step_rules.items(): |
| key = f'step {step} success?' |
| if stats_row.get(key, '').lower() != 'true': |
| failed_steps.append(step) |
| filtered.append(f"=== FAILED STEP {step} LOGS ===") |
| |
| for rule in rules: |
| filtered.append(f"--- Rule: {rule} ---") |
| matched = [f for f in entries if f.startswith(rule + '_')] |
| if matched: |
| for fname in matched: |
| path = os.path.join(log_folder, fname) |
| try: |
| content = open(path).read().strip() |
| |
| if len(content) > 5000: |
| lines = content.split('\n') |
| content = '\n'.join(lines[:100]) + "\n...[TRUNCATED]...\n" + '\n'.join(lines[-50:]) |
| except Exception as e: |
| content = f"<could not read: {e}>" |
| filtered.append(f"Log file: {fname}") |
| filtered.append(content) |
| else: |
| filtered.append("No log files found for this rule.") |
| filtered.append("") |
| |
| |
| snakemake_dir = os.path.join(args.log_dir, 'snakemake_log') |
| if os.path.isdir(snakemake_dir): |
| filtered.append("=== SNAKEMAKE EXECUTION LOGS ===") |
| for fname in sorted(os.listdir(snakemake_dir)): |
| if fname.endswith('.log'): |
| path = os.path.join(snakemake_dir, fname) |
| try: |
| content = open(path).read().strip() |
| |
| lines = content.split('\n') |
| important_lines = [] |
| for line in lines: |
| if any(keyword in line.lower() for keyword in ['error', 'exception', 'failed', 'warning', 'killed']): |
| important_lines.append(line) |
| if important_lines: |
| filtered.append(f"Snakemake log: {fname} (errors/warnings only)") |
| filtered.append('\n'.join(important_lines[-20:])) |
| else: |
| filtered.append(f"Snakemake log: {fname} - No errors detected") |
| except Exception as e: |
| filtered.append(f"<could not read {fname}: {e}>") |
| filtered.append("") |
| |
| |
| logs += "\n".join(filtered) |
|
|
| |
| prompt = f"""You are analyzing a machine learning pipeline failure. Your task is to diagnose root causes by examining three sources: |
| |
| 1) stats.csv: Shows pass/fail status for 3 steps: |
| - Step 1 (Data Preparation): create_numpy, insert_root_summary, preprocess, summarize_root |
| - Step 2 (Scoring): scores |
| - Step 3 (Categorization): categorization |
| |
| 2) Individual .txt logs in logs/: Contain detailed execution output for each rule attempt |
| 3) Snakemake logs: Show workflow execution status and any workflow-level errors |
| |
| ANALYSIS REQUIREMENTS: |
| Create a diagnostic report using this format for each step: |
| |
| ------ |
| Step X (Category of failure) |
| ------ |
| Rule: [rule_name] |
| ------ |
| Status: [Pass/Fail from stats.csv] | [Snakemake execution status] |
| ------ |
| Root Cause Analysis: [detailed analysis] |
| ------ |
| |
| For each failed step (False in stats.csv): |
| - Examine ALL relevant .txt log files for that step's rules |
| - Look for specific error messages, exceptions, or failure indicators |
| - Identify the probable root cause (e.g., missing files, API failures, memory issues, logic errors, syntax errors) |
| - If logs show success messages but stats.csv shows failure, investigate this discrepancy |
| - Categorize the failure type (Data/API/Logic/Infrastructure/Other) |
| |
| For passed steps (True in stats.csv): |
| - Simply mark as "OK" in Root Cause Analysis |
| |
| After the table, provide: |
| 1. Overall Status: SUCCESS or FAILURE using similar format as above. |
| 2. Primary Failure Category (if applicable): Data/API/Logic/Infrastructure/Other |
| 3. Recommended Next Steps |
| |
| DATA TO ANALYZE: |
| {logs} |
| """ |
| |
| |
| |
| |
| analysis = call_cborg(prompt, args.model) |
| |
| if not analysis or not analysis.strip(): |
| analysis = ( |
| "Warning: CBORG model returned no analysis.\n" |
| "Below is the prompt sent to the model for debugging:\n\n" + prompt |
| ) |
|
|
| |
| |
| output_file = args.output or os.path.join(args.log_dir, 'logs_analysis.txt') |
| try: |
| with open(output_file, 'w') as f: |
| f.write(analysis + "\n") |
| print(f"Analysis written to {output_file}") |
| except Exception as e: |
| print(f"Error writing analysis to {output_file}: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|