# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>
"""Build a combined SM3 summary table from a suite directory.
This command scans an SM3 suite root, collects per-regime summary CSV
files, and writes combined CSV and JSON outputs.
Examples
--------
Use the versatile root entry point::
geoprior build sm3-collect-summaries \
--suite-root results/sm3_tau_suite_20260303-120000
Use the family-specific entry point::
geoprior-build sm3-summaries \
--results-dir results \
--outdir results/sm3_tau_suite_20260303-120000
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
import pandas as pd
from .config import (
add_city_arg,
add_config_args,
add_model_arg,
add_outdir_arg,
add_output_stem_arg,
add_results_dir_arg,
bootstrap_runtime_config,
ensure_outdir,
find_latest_dir,
)
DEFAULT_SUMMARY_NAME = "sm3_synth_summary.csv"
DEFAULT_STEM = "sm3_combined_summary"
DEFAULT_PATTERN = r"sm3_(?:tau|both)_(.+?)_50$"
[docs]
def infer_regime(
folder_name: str,
*,
pattern: str = DEFAULT_PATTERN,
) -> str:
"""Infer regime label from a run directory name."""
match = re.search(pattern, folder_name)
if match:
return str(match.group(1))
return folder_name
[docs]
def discover_suite_root(
results_dir: str | Path,
*,
summary_name: str = DEFAULT_SUMMARY_NAME,
) -> Path | None:
"""Return the newest suite-like directory under results root."""
root = Path(results_dir).expanduser().resolve()
if not root.exists():
return None
direct = []
for child in root.iterdir():
if not child.is_dir():
continue
if any(child.rglob(summary_name)):
direct.append(child)
if direct:
return max(direct, key=lambda p: p.stat().st_mtime)
latest = find_latest_dir(
root,
pattern="*",
must_contain=summary_name,
)
if latest is not None:
return latest
if any(root.rglob(summary_name)):
return root
return None
[docs]
def resolve_suite_root(
args: argparse.Namespace,
cfg: dict,
) -> Path:
"""Resolve suite root from explicit args, config, or results root."""
if args.suite_root:
path = Path(args.suite_root).expanduser().resolve()
if not path.exists():
raise FileNotFoundError(
f"Suite root not found: {path}"
)
return path
for key in ("SM3_SUITE_ROOT", "SUITE_ROOT"):
value = cfg.get(key)
if isinstance(value, str) and value.strip():
path = Path(value).expanduser().resolve()
if path.exists():
return path
results_dir = args.results_dir or cfg.get("RESULTS_DIR")
if results_dir:
guess = discover_suite_root(
results_dir,
summary_name=args.summary_name,
)
if guess is not None:
return guess
raise FileNotFoundError(
"Could not resolve suite root. Pass --suite-root or "
"provide a results root that contains an SM3 suite."
)
[docs]
def collect_rows(
suite_root: Path,
*,
summary_name: str,
pattern: str,
strict: bool,
) -> list[pd.DataFrame]:
"""Collect per-regime summary tables from a suite root."""
rows: list[pd.DataFrame] = []
for path in suite_root.rglob(summary_name):
run_dir = path.parent
regime = infer_regime(run_dir.name, pattern=pattern)
try:
frame = pd.read_csv(path)
except Exception as exc:
msg = f"failed to read {path}: {exc}"
if strict:
raise RuntimeError(msg) from exc
print(f"[skip] {msg}")
continue
if frame.empty or "metric" not in frame.columns:
msg = f"unexpected format: {path}"
if strict:
raise RuntimeError(msg)
print(f"[skip] {msg}")
continue
frame = frame.copy()
frame.insert(0, "regime", regime)
frame.insert(1, "run_dir", str(run_dir))
rows.append(frame)
return rows
[docs]
def resolve_outputs(
args: argparse.Namespace,
suite_root: Path,
) -> tuple[Path, Path]:
"""Resolve output CSV and JSON paths."""
base_dir = (
ensure_outdir(args.outdir)
if args.outdir
else suite_root
)
stem = args.output_stem or DEFAULT_STEM
out_csv = (
Path(args.out_csv).expanduser().resolve()
if args.out_csv
else base_dir / f"{stem}.csv"
)
out_json = (
Path(args.out_json).expanduser().resolve()
if args.out_json
else base_dir / f"{stem}.json"
)
out_csv.parent.mkdir(parents=True, exist_ok=True)
out_json.parent.mkdir(parents=True, exist_ok=True)
return out_csv, out_json
[docs]
def build_sm3_collect_parser() -> argparse.ArgumentParser:
"""Build parser for SM3 summary collection."""
parser = argparse.ArgumentParser(
prog="sm3-collect-summaries",
description=(
"Collect SM3 per-regime summary CSV files into "
"combined CSV and JSON outputs."
),
)
add_config_args(parser)
add_city_arg(parser)
add_model_arg(parser)
add_results_dir_arg(parser)
add_outdir_arg(parser)
add_output_stem_arg(parser, default=DEFAULT_STEM)
parser.add_argument(
"--suite-root",
type=str,
default=None,
help=(
"SM3 suite root. If omitted, the command tries to "
"discover the newest suite under --results-dir."
),
)
parser.add_argument(
"--summary-name",
type=str,
default=DEFAULT_SUMMARY_NAME,
help="Summary CSV filename to collect.",
)
parser.add_argument(
"--regime-pattern",
type=str,
default=DEFAULT_PATTERN,
help=(
"Regular expression used to infer the regime from "
"each run directory name."
),
)
parser.add_argument(
"--out-csv",
type=str,
default=None,
help="Explicit combined CSV output path.",
)
parser.add_argument(
"--out-json",
type=str,
default=None,
help="Explicit combined JSON output path.",
)
parser.add_argument(
"--strict",
action="store_true",
help=(
"Fail on unreadable or malformed summary files "
"instead of skipping them."
),
)
return parser
[docs]
def run_sm3_collect_summaries(
args: argparse.Namespace,
) -> None:
"""Execute the SM3 collection workflow."""
cfg = bootstrap_runtime_config(
args,
field_map={
"city": "CITY_NAME",
"model": "MODEL_NAME",
"results_dir": "RESULTS_DIR",
},
)
suite_root = resolve_suite_root(args, cfg)
rows = collect_rows(
suite_root,
summary_name=args.summary_name,
pattern=args.regime_pattern,
strict=bool(args.strict),
)
if not rows:
raise RuntimeError(
"No summary CSV files were found under the suite "
"root."
)
frame = pd.concat(rows, ignore_index=True)
sort_cols = [
col
for col in ["metric", "regime"]
if col in frame.columns
]
if sort_cols:
frame = frame.sort_values(sort_cols).reset_index(
drop=True
)
out_csv, out_json = resolve_outputs(args, suite_root)
frame.to_csv(out_csv, index=False)
with open(out_json, "w", encoding="utf-8") as handle:
json.dump(frame.to_dict("records"), handle, indent=2)
print("[OK] suite_root:", str(suite_root))
print("[OK] wrote:", str(out_csv))
print("[OK] wrote:", str(out_json))
print("[OK] rows:", len(frame))
[docs]
def build_sm3_collect_main(
argv: list[str] | None = None,
) -> None:
"""CLI entry point for SM3 summary collection."""
parser = build_sm3_collect_parser()
args = parser.parse_args(argv)
run_sm3_collect_summaries(args)
[docs]
def main(argv: list[str] | None = None) -> None:
"""Alias for the command entry point."""
build_sm3_collect_main(argv)
if __name__ == "__main__": # pragma: no cover
main()