#!/usr/bin/env python3 import argparse import io import sys from pathlib import Path import requests sys.path.append(str(Path(__file__).parent)) import fev import pandas as pd from src.task_groups import ALL_TASKS, DOMAIN_GROUPS, FREQUENCY_GROUPS, MINI_TASKS from src.utils import format_leaderboard GITHUB_REPO = "autogluon/fev" RESULTS_PATH = "benchmarks/fev_bench/results" # Constants from the main app BASELINE_MODEL = "Seasonal Naive" LEAKAGE_IMPUTATION_MODEL = "Chronos-Bolt" SORT_COL = "win_rate" N_RESAMPLES_FOR_CI = 1000 TOP_K_MODELS_TO_PLOT = 15 AVAILABLE_METRICS = ["SQL", "MASE", "WQL", "WAPE"] # All task groups to generate tables for TASK_GROUPS = { "full": ALL_TASKS, "mini": MINI_TASKS, "frequency_sub_hourly": FREQUENCY_GROUPS["Sub-hourly"], "frequency_hourly": FREQUENCY_GROUPS["Hourly"], "frequency_daily": FREQUENCY_GROUPS["Daily"], "frequency_weekly": FREQUENCY_GROUPS["Weekly"], "frequency_monthly_plus": FREQUENCY_GROUPS["Monthly+"], "domain_energy": DOMAIN_GROUPS["Energy"], "domain_nature": DOMAIN_GROUPS["Nature"], "domain_cloud": DOMAIN_GROUPS["Cloud"], "domain_mobility": DOMAIN_GROUPS["Mobility"], "domain_econ": DOMAIN_GROUPS["Econ"], "domain_health": DOMAIN_GROUPS["Health"], "domain_retail": DOMAIN_GROUPS["Retail"], } def get_csv_files_from_github(commit: str) -> list[str]: """Get list of CSV file paths from the GitHub repo at a specific commit.""" api_url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{RESULTS_PATH}?ref={commit}" response = requests.get(api_url) response.raise_for_status() files = response.json() csv_files = [f["path"] for f in files if f["name"].endswith(".csv")] if not csv_files: raise FileNotFoundError(f"No CSV files found in {RESULTS_PATH} at commit {commit}") return csv_files def load_summaries_from_github(commit: str) -> pd.DataFrame: """Load and concatenate all CSV summaries from the GitHub repo at a specific commit.""" csv_files = get_csv_files_from_github(commit) print(f"Found {len(csv_files)} CSV files") dfs = [] for file_path in csv_files: raw_url = f"https://raw.githubusercontent.com/{GITHUB_REPO}/{commit}/{file_path}" response = requests.get(raw_url) response.raise_for_status() df = pd.read_csv(io.StringIO(response.text)) dfs.append(df) print(f" Loaded: {Path(file_path).name}") return pd.concat(dfs, ignore_index=True) def compute_leaderboard(summaries: pd.DataFrame, metric_name: str) -> pd.DataFrame: lb = fev.analysis.leaderboard( summaries=summaries, metric_column=metric_name, missing_strategy="impute", baseline_model=BASELINE_MODEL, leakage_imputation_model=LEAKAGE_IMPUTATION_MODEL, normalize_time_per_n_forecasts=100, ) lb = lb.astype("float64").reset_index() lb["skill_score"] = lb["skill_score"] * 100 lb["win_rate"] = lb["win_rate"] * 100 lb["num_failures"] = lb["num_failures"] / summaries["task_name"].nunique() * 100 return lb def compute_pairwise(summaries: pd.DataFrame, metric_name: str, included_models: list[str]) -> pd.DataFrame: if BASELINE_MODEL not in included_models: included_models = included_models + [BASELINE_MODEL] return ( fev.analysis.pairwise_comparison( summaries, included_models=included_models, metric_column=metric_name, baseline_model=BASELINE_MODEL, missing_strategy="impute", n_resamples=N_RESAMPLES_FOR_CI, leakage_imputation_model=LEAKAGE_IMPUTATION_MODEL, ) .round(3) .reset_index() ) def compute_pivot_table(summaries: pd.DataFrame, metric_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: errors = fev.pivot_table(summaries=summaries, metric_column=metric_name, task_columns=["task_name"]) train_overlap = ( fev.pivot_table(summaries=summaries, metric_column="trained_on_this_dataset", task_columns=["task_name"]) .fillna(False) .astype(bool) ) is_imputed_baseline = errors.isna() is_leakage_imputed = train_overlap # Handle imputations errors = errors.mask(train_overlap, errors[LEAKAGE_IMPUTATION_MODEL], axis=0) for col in errors.columns: if col != BASELINE_MODEL: errors[col] = errors[col].fillna(errors[BASELINE_MODEL]) errors = errors[errors.rank(axis=1).mean().sort_values().index] is_imputed_baseline = is_imputed_baseline[errors.columns] is_leakage_imputed = is_leakage_imputed[errors.columns] errors.index.rename("Task name", inplace=True) is_imputed_baseline.index.rename("Task name", inplace=True) is_leakage_imputed.index.rename("Task name", inplace=True) return errors.reset_index(), is_imputed_baseline.reset_index(), is_leakage_imputed.reset_index() def main(): parser = argparse.ArgumentParser(description="Generate leaderboard tables from CSV summaries in the fev repo") parser.add_argument( "commit", nargs="?", default="main", help=f"Git commit SHA or branch name in the {GITHUB_REPO} repository (default: main)", ) args = parser.parse_args() # Create tables directory tables_dir = Path("tables") tables_dir.mkdir(exist_ok=True) print(f"Loading summaries from {GITHUB_REPO} at commit {args.commit}...") summaries = load_summaries_from_github(args.commit) # Save raw summaries for on-the-fly subset computation summaries.to_csv(tables_dir / "summaries.csv", index=False) print("Saved: summaries.csv") # Generate pivot tables (full version only, at root level) for metric in AVAILABLE_METRICS: print(f"Processing pivot table for {metric}...") pivot_df, baseline_imputed, leakage_imputed = compute_pivot_table(summaries, metric) pivot_df.to_csv(tables_dir / f"pivot_{metric}.csv", index=False) baseline_imputed.to_csv(tables_dir / f"pivot_{metric}_baseline_imputed.csv", index=False) leakage_imputed.to_csv(tables_dir / f"pivot_{metric}_leakage_imputed.csv", index=False) print(f" Saved: pivot_{metric}.csv") # Generate leaderboard and pairwise tables for each task group for group_name, task_list in TASK_GROUPS.items(): print(f"\nProcessing group: {group_name} ({len(task_list)} tasks)...") # Create subdirectory for this group group_dir = tables_dir / group_name group_dir.mkdir(exist_ok=True) # Filter summaries to only include tasks in this group group_summaries = summaries[summaries["task_name"].isin(task_list)] if group_summaries.empty: print(f" WARNING: No matching tasks found for group {group_name}") continue actual_tasks = group_summaries["task_name"].nunique() print(f" Found {actual_tasks} tasks in summaries") for metric in AVAILABLE_METRICS: # Compute leaderboard for this group leaderboard_df = compute_leaderboard(group_summaries, metric) leaderboard_df.to_csv(group_dir / f"leaderboard_{metric}.csv", index=False) # Get top models for pairwise comparison top_k_models = ( leaderboard_df.sort_values(by=SORT_COL, ascending=False).head(TOP_K_MODELS_TO_PLOT)["model_name"].tolist() ) # Compute pairwise comparison pairwise_df = compute_pairwise(group_summaries, metric, top_k_models) pairwise_df.to_csv(group_dir / f"pairwise_{metric}.csv", index=False) print(f" Saved: {group_name}/leaderboard_{metric}.csv, {group_name}/pairwise_{metric}.csv") print(f"\nAll tables saved to {tables_dir}/") if __name__ == "__main__": main()