Source code for geoprior.utils.inspect.eval_physics

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>

"""
Evaluation-physics generation and inspection helpers.

This module focuses on the richer Stage-2 interpretable
physics-evaluation artifact usually written as something like
``geoprior_eval_phys_<stamp>_interpretable.json``.

Unlike the compact ``eval_diagnostics`` artifact, this payload
bridges multiple inspection concerns:

- point and interval forecast metrics,
- physics loss and epsilon diagnostics,
- calibration factors and before/after interval stats,
- optional censor-stratified summaries,
- per-horizon point metrics,
- unit metadata for interpretable reporting.

The functions are designed for two common uses:

1. Sphinx-Gallery examples that need a realistic
   physics-evaluation payload without rerunning Stage-2.
2. Real workflow inspection when a user wants to review
   forecast quality, physics residual diagnostics, interval
   calibration, and reporting units in one place.
"""

from __future__ import annotations

from collections.abc import Mapping
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from .utils import (
    ArtifactRecord,
    clone_artifact,
    deep_update,
    empty_plot,
    filter_plot_kwargs,
    finalize_plot,
    flatten_dict,
    load_artifact,
    metrics_frame,
    plot_boolean_checks,
    plot_metric_bars,
    plot_series_map,
    prepare_plot,
    read_json,
    write_json,
)

PathLike = str | Path
EvalPhysicsLike = (
    ArtifactRecord | Mapping[str, Any] | str | Path
)

__all__ = [
    "default_eval_physics_payload",
    "eval_physics_calibration_frame",
    "eval_physics_calibration_per_horizon_frame",
    "eval_physics_censor_frame",
    "eval_physics_metrics_frame",
    "eval_physics_per_horizon_frame",
    "eval_physics_point_metrics_frame",
    "eval_physics_units_frame",
    "generate_eval_physics",
    "inspect_eval_physics",
    "load_eval_physics",
    "plot_eval_physics_boolean_summary",
    "plot_eval_physics_calibration_factors",
    "plot_eval_physics_epsilons",
    "plot_eval_physics_metrics",
    "plot_eval_physics_per_horizon_metrics",
    "plot_eval_physics_point_metrics",
    "summarize_eval_physics",
]

_METRICS_EVALUATE_KEYS = [
    "subs_pred_mae_q50",
    "subs_pred_mse_q50",
    "subs_pred_rmse_q50",
    "subs_pred_coverage80",
    "subs_pred_sharpness80",
    "gwl_pred_mae_q50",
    "gwl_pred_mse_q50",
    "gwl_pred_rmse_q50",
    "loss",
    "data_loss",
    "physics_loss",
    "physics_loss_scaled",
    "physics_mult",
    "lambda_offset",
    "consolidation_loss",
    "gw_flow_loss",
    "prior_loss",
    "smooth_loss",
    "mv_prior_loss",
    "bounds_loss",
    "epsilon_prior",
    "epsilon_cons",
    "epsilon_gw",
    "epsilon_cons_raw",
    "epsilon_gw_raw",
    "q_reg_loss",
    "q_rms",
    "q_gate",
    "subs_resid_gate",
]

_EPSILON_KEYS = [
    "epsilon_prior",
    "epsilon_cons",
    "epsilon_gw",
    "epsilon_cons_raw",
    "epsilon_gw_raw",
]

_POINT_KEYS = ["mae", "mse", "rmse", "r2"]

_CAL_KEYS = [
    "coverage80_uncalibrated",
    "coverage80_calibrated",
    "sharpness80_uncalibrated",
    "sharpness80_calibrated",
    "coverage80_uncalibrated_phys",
    "coverage80_calibrated_phys",
    "sharpness80_uncalibrated_phys",
    "sharpness80_calibrated_phys",
]


def _as_payload(
    payload: EvalPhysicsLike,
) -> dict[str, Any]:
    """Return a plain eval-physics payload."""
    if isinstance(payload, ArtifactRecord):
        return dict(payload.payload)

    if isinstance(payload, Mapping):
        return dict(payload)

    data = read_json(payload)
    return dict(data)


def _try_float(value: Any) -> float | None:
    """Return ``value`` as float when possible."""
    try:
        return float(value)
    except Exception:
        return None


def _numeric_subset(
    mapping: dict[str, Any] | None,
    *,
    keys: list[str] | tuple[str, ...] | None = None,
) -> dict[str, float]:
    """Return selected numeric scalar items."""
    src = mapping or {}
    keep = None if keys is None else {str(k) for k in keys}

    out: dict[str, float] = {}
    for key, value in src.items():
        if isinstance(value, bool):
            continue
        num = _try_float(value)
        if num is None:
            continue
        if keep is not None and str(key) not in keep:
            continue
        out[str(key)] = float(num)
    return out


def _horizon_sort_key(label: Any) -> tuple[int, str]:
    """Sort horizon labels such as ``H1`` or ``1``."""
    text = str(label)
    digits = "".join(ch for ch in text if ch.isdigit())
    if digits:
        return (0, f"{int(digits):06d}")
    return (1, text)


def _per_horizon_map(
    payload: dict[str, Any],
    metric: str,
) -> dict[str, float]:
    """Extract one per-horizon metric mapping."""
    src = (payload.get("per_horizon", {}) or {}).get(
        metric, {}
    )
    out: dict[str, float] = {}
    for key, value in (src or {}).items():
        num = _try_float(value)
        if num is not None:
            out[str(key)] = float(num)
    return {
        key: out[key]
        for key in sorted(out, key=_horizon_sort_key)
    }


def _calibration_nested_block(
    payload: dict[str, Any],
) -> dict[str, Any]:
    """Return nested calibration stats if present."""
    block = payload.get("interval_calibration", {}) or {}
    nested = block.get(
        "factors_per_horizon_from_cal_stats", {}
    )
    return nested if isinstance(nested, dict) else {}


def _calibration_per_horizon_rows(
    payload: dict[str, Any],
) -> list[dict[str, Any]]:
    """Build tidy per-horizon calibration rows."""
    nested = _calibration_nested_block(payload)
    fac = nested.get("factors", {}) or {}
    before = (nested.get("eval_before") or {}).get(
        "per_horizon", {}
    ) or {}
    after = (nested.get("eval_after") or {}).get(
        "per_horizon", {}
    ) or {}

    keys = sorted(
        set(fac) | set(before) | set(after),
        key=_horizon_sort_key,
    )

    rows: list[dict[str, Any]] = []
    for key in keys:
        row = {
            "horizon": str(key),
            "factor": _try_float(fac.get(key)),
            "coverage_before": _try_float(
                (before.get(key) or {}).get("coverage")
            ),
            "coverage_after": _try_float(
                (after.get(key) or {}).get("coverage")
            ),
            "sharpness_before": _try_float(
                (before.get(key) or {}).get("sharpness")
            ),
            "sharpness_after": _try_float(
                (after.get(key) or {}).get("sharpness")
            ),
        }
        rows.append(row)
    return rows


[docs] def default_eval_physics_payload( *, timestamp: str = "20260222-215049", city: str = "demo_city", model: str = "GeoPriorSubsNet", quantiles: list[float] | None = None, horizon: int = 3, batch_size: int = 32, subs_unit: str = "mm", time_units: str = "year", ) -> dict[str, Any]: """ Build a realistic default eval-physics payload. The payload is template-based. It is not meant to reproduce the Stage-2 evaluation path. Instead, it creates a stable and inspectable artifact with the same broad structure as the interpretable physics evaluation JSON. """ q = list(quantiles or [0.1, 0.5, 0.9]) metrics_evaluate = { "subs_pred_mae_q50": 30.37, "subs_pred_mse_q50": 4346.39, "subs_pred_coverage80": 0.570, "subs_pred_sharpness80": 29.94, "gwl_pred_mae_q50": 0.239, "gwl_pred_mse_q50": 0.0768, "loss": 0.1577, "total_loss": 0.1577, "data_loss": 0.1577, "physics_loss": 4.14e-9, "physics_mult": 1.0, "physics_loss_scaled": 4.14e-9, "lambda_offset": 1.0, "consolidation_loss": 1.58e-13, "gw_flow_loss": 1.56e-14, "prior_loss": 2.06e-8, "smooth_loss": 0.0, "mv_prior_loss": 0.0, "bounds_loss": 2.11e-10, "epsilon_prior": 3.54e-4, "epsilon_cons": 2.11e-6, "epsilon_gw": 1.18e-7, "epsilon_cons_raw": 0.0125, "epsilon_gw_raw": 3.94e-6, "q_reg_loss": 0.0, "q_rms": 0.0, "q_gate": 0.0, "subs_resid_gate": 0.0, "subs_pred_rmse_q50": 65.93, "gwl_pred_rmse_q50": 0.277, } payload = { "timestamp": str(timestamp), "city": city, "model": model, "tf_version": "2.20.0", "numpy_version": "2.0.2", "quantiles": q, "horizon": int(horizon), "batch_size": int(batch_size), "metrics_evaluate": metrics_evaluate, "physics_diagnostics": { "epsilon_prior": metrics_evaluate[ "epsilon_prior" ], "epsilon_cons": metrics_evaluate["epsilon_cons"], "epsilon_gw": metrics_evaluate["epsilon_gw"], }, "interval_calibration": { "target": 0.80, "factors_per_horizon": [1.0, 1.0, 1.48], "factors_per_horizon_from_cal_stats": { "target": 0.80, "interval": [0.1, 0.9], "f_max": 5.0, "tol": 0.02, "overall_key": "__overall__", "factors_source": "fit", "factors": { "1": 1.0, "2": 1.0, "3": 1.018, }, "eval_before": { "coverage": 0.865, "sharpness": 33.08, "per_horizon": { "1": { "coverage": 0.979, "sharpness": 23.24, }, "2": { "coverage": 0.822, "sharpness": 27.59, }, "3": { "coverage": 0.794, "sharpness": 48.41, }, }, }, "eval_after": { "coverage": 0.867, "sharpness": 33.38, "per_horizon": { "1": { "coverage": 0.979, "sharpness": 23.24, }, "2": { "coverage": 0.822, "sharpness": 27.59, }, "3": { "coverage": 0.800, "sharpness": 49.30, }, }, }, }, "coverage80_uncalibrated": 0.813, "coverage80_calibrated": 0.865, "sharpness80_uncalibrated": 0.0278, "sharpness80_calibrated": 0.0331, "coverage80_uncalibrated_phys": 0.813, "coverage80_calibrated_phys": 0.865, "sharpness80_uncalibrated_phys": 27.82, "sharpness80_calibrated_phys": 33.08, }, "censor_stratified": { "flag_name": "soil_thickness_censored", "threshold": 0.5, "mae_censored": 0.0, "mae_uncensored": 9.27, }, "point_metrics": { "mae": 9.27, "mse": 262.47, "r2": 0.883, "rmse": 16.20, }, "per_horizon": { "mae": { "H1": 3.52, "H2": 8.31, "H3": 15.98, }, "r2": { "H1": 0.896, "H2": 0.888, "H3": 0.874, }, }, "units": { "subs_unit_to_si": 0.001, "subs_factor_si_to_real": 1000.0, "subs_metrics_unit": str(subs_unit), "time_units": str(time_units), "seconds_per_time_unit": 31556952.0, "epsilon_cons_raw_unit": f"{subs_unit}/{time_units}", "epsilon_gw_raw_unit": f"1/{time_units}", }, } return payload
[docs] def generate_eval_physics( *, output_path: PathLike | None = None, template: EvalPhysicsLike | None = None, overrides: dict[str, Any] | None = None, **kwargs, ) -> dict[str, Any] | Path: """ Generate an eval-physics payload or file. Parameters ---------- output_path : path-like, optional Destination JSON path. If omitted, the payload is returned instead of written. template : mapping, ArtifactRecord, or path, optional Real or synthetic eval-physics template used as the generation base. overrides : dict, optional Nested overrides applied after template/default payload creation. **kwargs : dict Parameters forwarded to ``default_eval_physics_payload`` when no template is given. """ if template is None: payload = default_eval_physics_payload(**kwargs) else: payload = clone_artifact( _as_payload(template), overrides=None, ) if overrides: payload = deep_update(payload, overrides) if output_path is None: return payload return write_json(payload, output_path)
[docs] def load_eval_physics( path: PathLike, ) -> ArtifactRecord: """ Load an eval-physics artifact. Raises ------ ValueError If the artifact does not look like an eval-physics payload. """ record = load_artifact(path, kind="eval_physics") needed = { "metrics_evaluate", "physics_diagnostics", } if not needed.issubset(record.payload): raise ValueError( "The file does not contain the expected " "eval-physics sections." ) return record
[docs] def eval_physics_metrics_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """Return a tidy frame for ``metrics_evaluate``.""" data = _as_payload(payload) return metrics_frame( data.get("metrics_evaluate", {}), section="metrics_evaluate", )
[docs] def eval_physics_point_metrics_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """Return a tidy frame for point metrics.""" data = _as_payload(payload) return metrics_frame( data.get("point_metrics", {}), section="point_metrics", )
[docs] def eval_physics_units_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """Return a tidy frame for units metadata.""" data = _as_payload(payload) units = data.get("units", {}) or {} rows = [] for key, value in units.items(): rows.append( { "key": str(key), "value": value, "is_numeric": isinstance(value, (int, float)) and not isinstance(value, bool), } ) return pd.DataFrame(rows)
[docs] def eval_physics_censor_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """Return a tidy frame for censor-aware metrics.""" data = _as_payload(payload) censor = data.get("censor_stratified", {}) or {} rows = [] for key, value in censor.items(): rows.append( { "key": str(key), "value": value, "is_numeric": isinstance(value, (int, float)) and not isinstance(value, bool), } ) return pd.DataFrame(rows)
[docs] def eval_physics_per_horizon_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """ Return a tidy frame for exported per-horizon metrics. """ data = _as_payload(payload) per_h = data.get("per_horizon", {}) or {} rows: list[dict[str, Any]] = [] for metric, mapping in per_h.items(): if not isinstance(mapping, dict): continue for horizon, value in mapping.items(): num = _try_float(value) if num is None: continue rows.append( { "metric": str(metric), "horizon": str(horizon), "value": float(num), } ) frame = pd.DataFrame(rows) if not frame.empty: frame["_sort"] = frame["horizon"].map( _horizon_sort_key ) frame = frame.sort_values(["metric", "_sort"]) frame = frame.drop(columns="_sort") return frame.reset_index(drop=True)
[docs] def eval_physics_calibration_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """ Return a tidy frame for top-level calibration scalars. """ data = _as_payload(payload) cal = data.get("interval_calibration", {}) or {} base = _numeric_subset(cal, keys=_CAL_KEYS) base.update( _numeric_subset( { "target": cal.get("target"), } ) ) nested = _calibration_nested_block(data) for key in ["target", "f_max", "tol"]: num = _try_float(nested.get(key)) if num is not None: base[f"cal_stats.{key}"] = float(num) before = ( (nested.get("eval_before") or {}) if nested else {} ) after = (nested.get("eval_after") or {}) if nested else {} for key in ["coverage", "sharpness"]: num_b = _try_float(before.get(key)) num_a = _try_float(after.get(key)) if num_b is not None: base[f"eval_before.{key}"] = float(num_b) if num_a is not None: base[f"eval_after.{key}"] = float(num_a) return metrics_frame(base, section="interval_calibration")
[docs] def eval_physics_calibration_per_horizon_frame( payload: EvalPhysicsLike, ) -> pd.DataFrame: """ Return a tidy per-horizon calibration frame. """ data = _as_payload(payload) rows = _calibration_per_horizon_rows(data) frame = pd.DataFrame(rows) if not frame.empty: frame["_sort"] = frame["horizon"].map( _horizon_sort_key ) frame = frame.sort_values("_sort") frame = frame.drop(columns="_sort") return frame.reset_index(drop=True)
[docs] def summarize_eval_physics( payload: EvalPhysicsLike, ) -> dict[str, Any]: """ Build a compact semantic summary for inspection. """ data = _as_payload(payload) eval_metrics = data.get("metrics_evaluate", {}) or {} phys = data.get("physics_diagnostics", {}) or {} cal = data.get("interval_calibration", {}) or {} point = data.get("point_metrics", {}) or {} units = data.get("units", {}) or {} nested = _calibration_nested_block(data) before = ( (nested.get("eval_before") or {}) if nested else {} ) after = (nested.get("eval_after") or {}) if nested else {} factor_rows = eval_physics_calibration_per_horizon_frame( data ) if factor_rows.empty: factor_max = None factor_min = None else: factor_max = float(factor_rows["factor"].max()) factor_min = float(factor_rows["factor"].min()) summary_map = { "brief": { "kind": "eval_physics", "timestamp": data.get("timestamp"), "city": data.get("city"), "model": data.get("model"), "horizon": data.get("horizon"), "batch_size": data.get("batch_size"), "quantiles": list( data.get("quantiles", []) or [] ), }, "core_metrics": { "subs_mae_q50": _try_float( eval_metrics.get("subs_pred_mae_q50") ), "subs_rmse_q50": _try_float( eval_metrics.get("subs_pred_rmse_q50") ), "gwl_mae_q50": _try_float( eval_metrics.get("gwl_pred_mae_q50") ), "gwl_rmse_q50": _try_float( eval_metrics.get("gwl_pred_rmse_q50") ), "loss": _try_float(eval_metrics.get("loss")), "data_loss": _try_float( eval_metrics.get("data_loss") ), "physics_loss": _try_float( eval_metrics.get("physics_loss") ), "physics_loss_scaled": _try_float( eval_metrics.get("physics_loss_scaled") ), "point_mae": _try_float(point.get("mae")), "point_rmse": _try_float(point.get("rmse")), "point_r2": _try_float(point.get("r2")), }, "physics": { "epsilon_prior": _try_float( phys.get("epsilon_prior") ), "epsilon_cons": _try_float( phys.get("epsilon_cons") ), "epsilon_gw": _try_float(phys.get("epsilon_gw")), "epsilon_cons_raw": _try_float( eval_metrics.get("epsilon_cons_raw") ), "epsilon_gw_raw": _try_float( eval_metrics.get("epsilon_gw_raw") ), "lambda_offset": _try_float( eval_metrics.get("lambda_offset") ), "physics_mult": _try_float( eval_metrics.get("physics_mult") ), }, "calibration": { "target": _try_float(cal.get("target")), "coverage80_uncalibrated": _try_float( cal.get("coverage80_uncalibrated") ), "coverage80_calibrated": _try_float( cal.get("coverage80_calibrated") ), "sharpness80_uncalibrated": _try_float( cal.get("sharpness80_uncalibrated") ), "sharpness80_calibrated": _try_float( cal.get("sharpness80_calibrated") ), "coverage_before": _try_float( before.get("coverage") ), "coverage_after": _try_float( after.get("coverage") ), "sharpness_before": _try_float( before.get("sharpness") ), "sharpness_after": _try_float( after.get("sharpness") ), "factor_min": factor_min, "factor_max": factor_max, }, "units": { "subs_metrics_unit": units.get( "subs_metrics_unit" ), "time_units": units.get("time_units"), "epsilon_cons_raw_unit": units.get( "epsilon_cons_raw_unit" ), "epsilon_gw_raw_unit": units.get( "epsilon_gw_raw_unit" ), }, "checks": { "has_metrics_evaluate": bool(eval_metrics), "has_physics_diagnostics": bool(phys), "has_interval_calibration": bool(cal), "has_point_metrics": bool(point), "has_units": bool(units), "has_per_horizon": bool( data.get("per_horizon", {}) ), "has_quantiles": bool(data.get("quantiles", [])), "physics_loss_nonnegative": ( ( _try_float( eval_metrics.get("physics_loss") ) or 0.0 ) >= 0.0 ), "epsilons_present": all( key in phys for key in [ "epsilon_prior", "epsilon_cons", "epsilon_gw", ] ), "calibration_target_in_01": ( _try_float(cal.get("target")) is not None and 0.0 <= float(cal.get("target")) <= 1.0 ), "coverage_improves_or_matches": ( _try_float(cal.get("coverage80_calibrated")) is not None and _try_float( cal.get("coverage80_uncalibrated") ) is not None and float(cal.get("coverage80_calibrated")) >= float(cal.get("coverage80_uncalibrated")) ), "reported_unit_present": bool( units.get("subs_metrics_unit") ), }, } return summary_map
[docs] def plot_eval_physics_metrics( payload: EvalPhysicsLike, *, keys: list[str] | tuple[str, ...] | None = None, ax: plt.Axes | None = None, title: str | None = None, error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """Plot selected ``metrics_evaluate`` values.""" fig, ax, _ = prepare_plot( ax=ax, figsize=(8.2, 4.8) if ax is None else None ) data = _as_payload(payload) metrics = _numeric_subset( data.get("metrics_evaluate", {}), keys=keys or _METRICS_EVALUATE_KEYS, ) return plot_metric_bars( ax, metrics, title=title or "Eval physics: metrics_evaluate", sort_by_value=True, top_n=14, absolute=True, error=error, **plot_kws, )
[docs] def plot_eval_physics_epsilons( payload: EvalPhysicsLike, *, ax: plt.Axes | None = None, title: str = "Eval physics: epsilon diagnostics", error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """Plot epsilon-related diagnostics.""" fig, ax, _ = prepare_plot( ax=ax, figsize=(8.0, 4.6) if ax is None else None ) data = _as_payload(payload) metrics = { **_numeric_subset( data.get("physics_diagnostics", {}), keys=[ "epsilon_prior", "epsilon_cons", "epsilon_gw", ], ), **_numeric_subset( data.get("metrics_evaluate", {}), keys=["epsilon_cons_raw", "epsilon_gw_raw"], ), } return plot_metric_bars( ax, metrics, title=title, sort_by_value=True, top_n=None, absolute=True, error=error, **plot_kws, )
[docs] def plot_eval_physics_calibration_factors( payload: EvalPhysicsLike, *, source: str = "top", ax: plt.Axes | None = None, title: str | None = None, error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """ Plot per-horizon calibration factors. Parameters ---------- source : {'top', 'nested'}, default='top' ``'top'`` uses ``factors_per_horizon``. ``'nested'`` uses ``factors_per_horizon_from_cal_stats``. """ fig, ax, _ = prepare_plot( ax=ax, figsize=(7.8, 4.6) if ax is None else None ) data = _as_payload(payload) cal = data.get("interval_calibration", {}) or {} if str(source).strip().lower() == "nested": nested = _calibration_nested_block(data) src = nested.get("factors", {}) if nested else {} series = { str(k): float(v) for k, v in (src or {}).items() if _try_float(v) is not None } else: vals = cal.get("factors_per_horizon", []) or [] series = { f"H{i}": float(v) for i, v in enumerate(vals, start=1) if _try_float(v) is not None } return plot_series_map( ax, series, title=title or "Calibration factors by horizon", xlabel="horizon", ylabel="factor", error=error, **plot_kws, )
[docs] def plot_eval_physics_point_metrics( payload: EvalPhysicsLike, *, ax: plt.Axes | None = None, title: str = "Eval physics: point metrics", error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """Plot point-metric summary.""" fig, ax, _ = prepare_plot( ax=ax, figsize=(7.8, 4.6) if ax is None else None ) data = _as_payload(payload) metrics = _numeric_subset( data.get("point_metrics", {}), keys=_POINT_KEYS, ) return plot_metric_bars( ax, metrics, title=title, sort_by_value=True, top_n=None, absolute=True, error=error, **plot_kws, )
[docs] def plot_eval_physics_per_horizon_metrics( payload: EvalPhysicsLike, *, metric: str = "mae", ax: plt.Axes | None = None, title: str | None = None, error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """Plot one exported per-horizon metric map.""" fig, ax, _ = prepare_plot( ax=ax, figsize=(7.8, 4.6) if ax is None else None ) data = _as_payload(payload) series = _per_horizon_map(data, metric) return plot_series_map( ax, series, title=title or f"Per-horizon {metric}", xlabel="horizon", ylabel=metric, error=error, **plot_kws, )
[docs] def plot_eval_physics_boolean_summary( payload: EvalPhysicsLike, *, ax: plt.Axes | None = None, title: str = "Eval physics checks", error: str = "ignore", **plot_kws: Any, ) -> plt.Axes: """Plot semantic pass/fail checks.""" fig, ax, _ = prepare_plot( ax=ax, figsize=(8.0, 4.6) if ax is None else None ) checks = summarize_eval_physics(payload)["checks"] return plot_boolean_checks( ax, checks, title=title, error=error, **plot_kws, )
[docs] def inspect_eval_physics( payload: EvalPhysicsLike, *, output_dir: PathLike | None = None, stem: str = "eval_physics", save_figures: bool = True, ) -> dict[str, Any]: """ Inspect an eval-physics artifact and optionally save figures. Returns ------- dict Bundle containing summary, tabular frames, and optionally written figure paths. """ data = _as_payload(payload) summary_map = summarize_eval_physics(data) bundle: dict[str, Any] = { "summary": summary_map, "frames": { "metrics_evaluate": eval_physics_metrics_frame( data ), "point_metrics": eval_physics_point_metrics_frame( data ), "units": eval_physics_units_frame(data), "censor_stratified": eval_physics_censor_frame( data ), "per_horizon": eval_physics_per_horizon_frame( data ), "interval_calibration": ( eval_physics_calibration_frame(data) ), "interval_calibration_per_horizon": ( eval_physics_calibration_per_horizon_frame( data ) ), }, "figures": {}, } if not save_figures or output_dir is None: return bundle outdir = Path(output_dir).expanduser().resolve() outdir.mkdir(parents=True, exist_ok=True) fig_specs = { f"{stem}_metrics.png": plot_eval_physics_metrics, f"{stem}_epsilons.png": plot_eval_physics_epsilons, f"{stem}_cal_factors.png": ( plot_eval_physics_calibration_factors ), f"{stem}_point_metrics.png": ( plot_eval_physics_point_metrics ), f"{stem}_per_h_mae.png": ( lambda p, ax=None: ( plot_eval_physics_per_horizon_metrics( p, metric="mae", ax=ax, ) ) ), f"{stem}_checks.png": plot_eval_physics_boolean_summary, } for name, fn in fig_specs.items(): fig, ax = plt.subplots(figsize=(8.2, 4.8)) fn(data, ax=ax) fig.tight_layout() path = outdir / name fig.savefig(path, dpi=160, bbox_inches="tight") plt.close(fig) bundle["figures"][name] = str(path) return bundle