Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| from collections import defaultdict | |
| from unidecode import unidecode | |
| from sklearn.metrics import log_loss, accuracy_score | |
| def prepare_features(data_raw: pd.DataFrame, window: int = 7, verbose: bool = True): | |
| """Prepare features from raw EPL data. | |
| Returns (feat_df, X_cols, WINDOW, base_df) | |
| - feat_df: DataFrame with features aligned to training | |
| - X_cols: list of feature column names used for modeling | |
| - WINDOW: the rolling window used | |
| - base_df: base match DataFrame with cleaned columns (date, home, away, ftr, ...) | |
| """ | |
| RENAME = { | |
| "Date":"date","Time":"time","HomeTeam":"home","AwayTeam":"away", | |
| "FTHG":"fthg","FTAG":"ftag","FTR":"ftr", | |
| "HTHG":"hthg","HTAG":"htag","HTR":"htr", | |
| "Referee":"ref", | |
| "HS":"hs","AS":"as","HST":"hst","AST":"ast", | |
| "HF":"hf","AF":"af","HC":"hc","AC":"ac", | |
| "HY":"hy","AY":"ay","HR":"hr","AR":"ar", | |
| # odds (Bet365, William Hill, Pinnacle(PS), VC) | |
| "B365H":"b365h","B365D":"b365d","B365A":"b365a", | |
| "WHH":"whh","WHD":"whd","WHA":"wha", | |
| "PSH":"psh","PSD":"psd","PSA":"psa", | |
| "VCH":"vch","VCD":"vcd","VCA":"vca", | |
| } | |
| df = data_raw.rename(columns=RENAME).copy() | |
| # parse date | |
| from datetime import datetime | |
| def parse_date(x): | |
| for fmt in ("%d/%m/%Y", "%d/%m/%y", "%Y-%m-%d"): | |
| try: | |
| return datetime.strptime(str(x), fmt) | |
| except Exception: | |
| pass | |
| return pd.NaT | |
| df["date"] = df["date"].map(parse_date) | |
| df = df[~df["date"].isna()].copy() | |
| # clean team names | |
| def clean_team(s): | |
| if pd.isna(s): return s | |
| s = unidecode(str(s)).strip() | |
| s = " ".join(s.split()) | |
| return s | |
| df["home"] = df["home"].map(clean_team) | |
| df["away"] = df["away"].map(clean_team) | |
| # keep valid rows | |
| df = df[(df["ftr"].isin(["H","D","A"])) & (~df["home"].isna()) & (~df["away"].isna())].copy() | |
| df.sort_values(["date","home","away"], inplace=True, ignore_index=True) | |
| # target | |
| label_map = {"H":0, "D":1, "A":2} | |
| df["y"] = df["ftr"].map(label_map) | |
| # ----------------------------- | |
| # 3) Odds → implied probabilities (normalize overround) | |
| # ----------------------------- | |
| def implied_probs(row, prefix): | |
| h,d,a = row.get(prefix+"h"), row.get(prefix+"d"), row.get(prefix+"a") | |
| if any(pd.isna([h,d,a])): return pd.Series([np.nan,np.nan,np.nan]) | |
| if min(h,d,a) <= 1.0: return pd.Series([np.nan,np.nan,np.nan]) | |
| inv = np.array([1/h, 1/d, 1/a], dtype=float) | |
| s = inv.sum() | |
| if s <= 0: return pd.Series([np.nan,np.nan,np.nan]) | |
| return pd.Series(inv / s) | |
| for bk in ["b365","wh","ps","vc"]: | |
| cols_exist = all([(bk+c) in df.columns for c in ["h","d","a"]]) | |
| if cols_exist: | |
| probs = df.apply(lambda r: implied_probs(r, bk), axis=1, result_type="expand") | |
| df[[f"p_{bk}_H", f"p_{bk}_D", f"p_{bk}_A"]] = probs | |
| prob_cols = [c for c in df.columns if c.startswith("p_") and c[-2:] in ["_H","_D","_A"]] | |
| def avg_prob(suffix): | |
| cols = [c for c in prob_cols if c.endswith(suffix)] | |
| return df[cols].mean(axis=1) | |
| df["p_odds_H"] = avg_prob("_H") | |
| df["p_odds_D"] = avg_prob("_D") | |
| df["p_odds_A"] = avg_prob("_A") | |
| # ----------------------------- | |
| # 4) Leak-free features: rolling form + simple Elo | |
| # ----------------------------- | |
| def result_points(ftr, is_home): | |
| if ftr == "D": return 1 | |
| if ftr == "H": return 3 if is_home else 0 | |
| if ftr == "A": return 0 if is_home else 3 | |
| return 0 | |
| tm_rows = [] | |
| for i, r in df.iterrows(): | |
| # home perspective | |
| tm_rows.append({ | |
| "match_id": i, "date": r["date"], "team": r["home"], "opp": r["away"], "is_home": 1, | |
| "gf": r["fthg"], "ga": r["ftag"], | |
| "shots_f": r.get("hs", np.nan), "shots_a": r.get("as", np.nan), | |
| "sot_f": r.get("hst", np.nan), "sot_a": r.get("ast", np.nan), | |
| "corn_f": r.get("hc", np.nan), "corn_a": r.get("ac", np.nan), | |
| "y_f": r.get("hy", np.nan), "y_a": r.get("ay", np.nan), | |
| "r_f": r.get("hr", np.nan), "r_a": r.get("ar", np.nan), | |
| "points": result_points(r["ftr"], True), | |
| }) | |
| # away perspective | |
| tm_rows.append({ | |
| "match_id": i, "date": r["date"], "team": r["away"], "opp": r["home"], "is_home": 0, | |
| "gf": r["ftag"], "ga": r["fthg"], | |
| "shots_f": r.get("as", np.nan), "shots_a": r.get("hs", np.nan), | |
| "sot_f": r.get("ast", np.nan), "sot_a": r.get("hst", np.nan), | |
| "corn_f": r.get("ac", np.nan), "corn_a": r.get("hc", np.nan), | |
| "y_f": r.get("ay", np.nan), "y_a": r.get("hy", np.nan), | |
| "r_f": r.get("ar", np.nan), "r_a": r.get("hr", np.nan), | |
| "points": result_points(r["ftr"], False), | |
| }) | |
| tm = pd.DataFrame(tm_rows).sort_values(["team","date"]).reset_index(drop=True) | |
| WINDOW = int(window) | |
| agg_cols = ["gf","ga","shots_f","shots_a","sot_f","sot_a","corn_f","corn_a","y_f","r_f","points"] | |
| for col in agg_cols: | |
| tm[f"roll_{col}"] = (tm.groupby("team")[col] | |
| .rolling(WINDOW, min_periods=1).mean() | |
| .shift(1) # ใช้ข้อมูลก่อนหน้าเท่านั้น | |
| .reset_index(level=0, drop=True)) | |
| # Elo (ง่าย) | |
| BASE_ELO = 1500.0 | |
| K = 20.0 | |
| HOME_ADV = 60.0 | |
| elo = defaultdict(lambda: BASE_ELO) | |
| elo_before_home, elo_before_away = [], [] | |
| df_sorted = df.sort_values("date").reset_index(drop=True) | |
| for i, r in df_sorted.iterrows(): | |
| h, a = r["home"], r["away"] | |
| eh, ea = elo[h], elo[a] | |
| elo_before_home.append(eh); elo_before_away.append(ea) | |
| ph = 1.0/(1.0 + 10**(-((eh+HOME_ADV)-ea)/400)) | |
| if r["ftr"] == "H": oh, oa = 1.0, 0.0 | |
| elif r["ftr"] == "D": oh, oa = 0.5, 0.5 | |
| else: oh, oa = 0.0, 1.0 | |
| elo[h] = eh + K*(oh - ph) | |
| elo[a] = ea + K*((1.0-oh) - (1.0-ph)) | |
| df_sorted["elo_home"] = elo_before_home | |
| df_sorted["elo_away"] = elo_before_away | |
| df_sorted["elo_diff"] = df_sorted["elo_home"] - df_sorted["elo_away"] | |
| # Merge rolling features into match rows | |
| home_tm = tm[tm["is_home"]==1].copy() | |
| away_tm = tm[tm["is_home"]==0].copy() | |
| home_feats = home_tm.filter(regex="^roll_").columns.tolist() | |
| hf = home_tm[["match_id"] + home_feats].rename(columns={c: f"home_{c}" for c in home_feats}) | |
| af = away_tm[["match_id"] + home_feats].rename(columns={c: f"away_{c}" for c in home_feats}) | |
| feat_df = df_sorted.merge(hf, left_index=True, right_on="match_id", how="left") \ | |
| .merge(af, left_index=True, right_on="match_id", how="left") | |
| # Fill odds missing (keep baseline) | |
| for c in ["p_odds_H","p_odds_D","p_odds_A"]: | |
| if c in feat_df.columns: | |
| feat_df[c] = feat_df[c].astype(float).fillna(feat_df[c].mean()) | |
| role_feats = [f"home_{c}" for c in home_feats] + [f"away_{c}" for c in home_feats] | |
| elo_feats = ["elo_home","elo_away","elo_diff"] | |
| odds_feats = ["p_odds_H","p_odds_D","p_odds_A"] | |
| X_cols = role_feats + elo_feats + odds_feats | |
| for c in X_cols: | |
| if c not in feat_df.columns: | |
| feat_df[c] = np.nan | |
| feat_df[c] = feat_df[c].astype(float).fillna(feat_df[c].median()) | |
| # ----------------------------- | |
| # 5) Time-based split (kept for compatibility, but not returned) | |
| # ----------------------------- | |
| n = len(feat_df) | |
| idx_train = int(n*0.70) | |
| idx_valid = int(n*0.85) | |
| if verbose and n > 0: | |
| dates_train = feat_df["date"].iloc[:idx_train].max() | |
| dates_valid = (feat_df["date"].iloc[idx_train:idx_valid].min(), | |
| feat_df["date"].iloc[idx_train:idx_valid].max()) | |
| dates_test = (feat_df["date"].iloc[idx_valid:].min(), | |
| feat_df["date"].iloc[idx_valid:].max()) | |
| print(f"Train up to: {dates_train:%Y-%m-%d}") | |
| print(f"Valid: {dates_valid[0]:%Y-%m-%d} .. {dates_valid[1]:%Y-%m-%d}") | |
| print(f"Test : {dates_test[0]:%Y-%m-%d} .. {dates_test[1]:%Y-%m-%d}") | |
| return feat_df, X_cols, WINDOW, df_sorted | |