Source code for geoprior.utils.scale_metrics

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# https://lkouadio.com
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>

"""
Utilities for computing error metrics in *physical* units given
Stage-1 scaling metadata.

Main entry points include ``inverse_scale_target(...)``,
``point_metrics(...)``, and ``per_horizon_metrics(...)``.

These are designed to work with the NATCOM pipeline where Stage-1
stores a ``scaler_info`` dict in the manifest::

    scaler_info[target_name] = {
        "scaler_path": ".../main_scaler.joblib",
        "all_features": [...],
        "idx": <index in scaler>,
        "scaler": <fitted MinMaxScaler>,  # optional, attached at Stage-2
    }

They can also handle a bare scaler object, a scaler path, or manual
min/max or mean/std parameters.
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import Any

import joblib
import numpy as np

from .nat_utils import extract_preds
from .shapes import canonicalize_BHQO_quantiles_np

ArrayLike = Any


def _as_np(x: Any) -> np.ndarray:
    """Best-effort: Tensor -> np.ndarray."""
    if x is None:
        return np.asarray(x)
    try:
        return x.numpy()
    except Exception:
        return np.asarray(x)


def _pick_point_pred(
    y: Any,
    *,
    n_q: int = 3,
    quantiles: Sequence[float] | None = None,
    q: float | int | None = None,
    prefer_median: bool = True,
) -> np.ndarray:
    """
    Return point prediction with shape (B, H, O).

    Accepts:
      - (B, H, 1)
      - (B, H, Q, 1)  (after canonicalization)
      - (B, H)        (will be expanded to (B, H, 1))
    """

    arr = _as_np(y)

    if arr.ndim == 4:
        arr = canonicalize_BHQO_quantiles_np(
            arr,
            n_q=n_q,
        )
        q_dim = int(arr.shape[2])

        if q is None and prefer_median:
            if quantiles is not None:
                q = 0.5
            else:
                q = q_dim // 2

        if isinstance(q, int | np.integer):
            idx = int(q)
        else:
            if quantiles is not None and q is not None:
                qs = np.asarray(quantiles, dtype=float)
                idx = int(np.argmin(np.abs(qs - float(q))))
            elif q is not None:
                frac = float(q)
                idx = int(round(frac * (q_dim - 1)))
            else:
                idx = q_dim // 2

        idx = max(0, min(idx, q_dim - 1))
        return arr[:, :, idx, :]

    if arr.ndim == 3:
        return arr

    if arr.ndim == 2:
        return arr[:, :, None]

    return np.asarray(arr)


[docs] def evaluate_point_forecast( model: Any, out: Any, y_true_subs: Any, *, y_true_gwl: Any | None = None, n_q: int = 3, quantiles: Sequence[float] | None = None, q: float | int | None = None, use_physical: bool = False, return_physical: bool | None = None, scaler_info: Mapping[str, Any] | None = None, subs_target_name: str = "subsidence", gwl_target_name: str = "gwl", scaler_entry: Mapping[str, Any] | None = None, scaler: Any | None = None, feature_index: int | None = None, n_features: int | None = None, params: Mapping[str, float] | None = None, strict: bool = True, output_names: Sequence[str] | None = None, ) -> dict[str, Any]: r""" End-to-end helper for point-forecast evaluation. Pipeline: extract predictions from ``model`` and ``out``, canonicalize BHQO quantile outputs when needed, pick a point prediction, optionally inverse-scale it, and compute global and per-horizon metrics. Parameters ---------- model, out : Passed to extract_preds(...). y_true_subs : True subsidence, shape (B, H, 1) or (B, H). y_true_gwl : Optional true gwl/head, same shape conventions. n_q, quantiles, q : Quantile-selection controls for ``(B, H, Q, 1)`` outputs. If ``q`` is ``None``, the median is preferred. If ``q`` is an integer, it is treated as a direct quantile index. If ``q`` is a float and ``quantiles`` is provided, the nearest quantile is selected; otherwise the float is treated as a fraction in ``[0, 1]``. use_physical : If True, compute metrics in physical units. return_physical : If None, defaults to use_physical. If ``True``, return physical-space arrays such as ``subs_pred_phys`` and ``gwl_pred_phys`` when possible. scaler_info, scaler_entry, scaler, feature_index, n_features, params : Passed through to ``inverse_scale_target(...)``. Returns ------- dict Dictionary containing model-space predictions, optional physical-space predictions, and global and per-horizon metrics for subsidence and groundwater outputs. """ if return_physical is None: return_physical = use_physical subs_pred, gwl_pred = extract_preds( model, out, strict=strict, output_names=output_names, ) subs_pt = _pick_point_pred( subs_pred, n_q=n_q, quantiles=quantiles, q=q, prefer_median=True, ) gwl_pt = _pick_point_pred( gwl_pred, n_q=n_q, quantiles=quantiles, q=q, prefer_median=True, ) res: dict[str, Any] = { "subs_pred_model": subs_pt, "gwl_pred_model": gwl_pt, } subs_metrics = point_metrics( y_true_subs, subs_pt, use_physical=use_physical, scaler_info=scaler_info, target_name=subs_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) subs_mae_h, subs_r2_h = per_horizon_metrics( y_true_subs, subs_pt, use_physical=use_physical, scaler_info=scaler_info, target_name=subs_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) res["subs_metrics"] = subs_metrics res["subs_mae_h"] = subs_mae_h res["subs_r2_h"] = subs_r2_h if y_true_gwl is not None: gwl_metrics = point_metrics( y_true_gwl, gwl_pt, use_physical=use_physical, scaler_info=scaler_info, target_name=gwl_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) gwl_mae_h, gwl_r2_h = per_horizon_metrics( y_true_gwl, gwl_pt, use_physical=use_physical, scaler_info=scaler_info, target_name=gwl_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) res["gwl_metrics"] = gwl_metrics res["gwl_mae_h"] = gwl_mae_h res["gwl_r2_h"] = gwl_r2_h if return_physical: res["subs_pred_phys"] = inverse_scale_target( subs_pt, scaler_info=scaler_info, target_name=subs_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) res["gwl_pred_phys"] = inverse_scale_target( gwl_pt, scaler_info=scaler_info, target_name=gwl_target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) return res
[docs] def auto_noise_std_from_increments( y_inc: np.ndarray, *, noise_frac: float = 0.10, percentile: float = 95.0, min_std: float = 0.0, max_std: float | None = None, eps: float = 1e-12, ) -> float: """ Compute noise std as a fraction of a robust increment scale. Parameters ---------- y_inc : np.ndarray Deterministic increments (before noise). Any shape; will be flattened. noise_frac : float, default=0.10 Fraction applied to the percentile scale. percentile : float, default=95.0 Percentile of the absolute increments used as the scale. min_std : float, default=0.0 Lower bound for returned std. max_std : float or None, default=None Optional upper bound for returned std. eps : float, default=1e-12 Small positive value for safe fallback. Returns ------- float A finite, non-negative std value. Notes ----- Non-finite values are filtered first. If the percentile-based scale is approximately zero, the function falls back to the maximum absolute increment, then the mean absolute increment, and finally to ``eps``. """ if y_inc is None: return float(max(min_std, 0.0)) a = np.asarray(y_inc, dtype=float).ravel() if a.size == 0: return float(max(min_std, 0.0)) a = a[np.isfinite(a)] if a.size == 0: return float(max(min_std, 0.0)) if not np.isfinite(noise_frac): noise_frac = 0.0 noise_frac = float(max(noise_frac, 0.0)) p = float(percentile) p = float(np.clip(p, 0.0, 100.0)) abs_a = np.abs(a) try: scale = float(np.percentile(abs_a, p)) except Exception: scale = float(np.nan) if (not np.isfinite(scale)) or (scale <= eps): scale = float(np.max(abs_a)) if abs_a.size else 0.0 if (not np.isfinite(scale)) or (scale <= eps): scale = float(np.mean(abs_a)) if abs_a.size else 0.0 if (not np.isfinite(scale)) or (scale <= eps): scale = float(eps) std = float(noise_frac * scale) if not np.isfinite(std): std = float(min_std) std = float(max(std, float(min_std))) if max_std is not None: mx = float(max_std) if np.isfinite(mx): std = float(min(std, mx)) return std
[docs] def resolve_noise_std( y_inc: np.ndarray, *, noise_std: float | None = None, noise_frac: float = 0.10, percentile: float = 95.0, min_std: float = 0.0, max_std: float | None = None, ) -> float: """ Prefer explicit `noise_std` when provided, otherwise auto-compute from increments. """ if noise_std is not None: v = float(noise_std) if np.isfinite(v) and v >= 0.0: return v return auto_noise_std_from_increments( y_inc, noise_frac=noise_frac, percentile=percentile, min_std=min_std, max_std=max_std, )
def _to_np(x: ArrayLike) -> np.ndarray: """Convert Tensor / list-like to a NumPy array.""" if hasattr(x, "numpy"): x = x.numpy() return np.asarray(x) def _resolve_stage1_entry( scaler_info: Mapping[str, Any] | None, target_name: str | None, scaler_entry: Mapping[str, Any] | None, ) -> tuple[Any | None, int | None, int | None]: """ Resolve (scaler, feature_index, n_features) from Stage-1 metadata. Parameters ---------- scaler_info : mapping or None Either the full Stage-1 scaler_info dict mapping ``feature_name -> entry`` or a single entry. target_name : str, optional Name of the target in ``scaler_info``. scaler_entry : mapping, optional Direct entry for the target. Returns ------- scaler : object or None idx : int or None Index of the target in the scaler's feature space. n_features : int or None Number of features expected by the scaler. """ entry: Mapping[str, Any] | None = None # 0) If caller provided a direct entry, trust it. if scaler_entry is not None: entry = scaler_entry # 1) If scaler_info itself looks like a single entry, use it. elif isinstance(scaler_info, Mapping) and ( "idx" in scaler_info or "scaler_path" in scaler_info or "scaler" in scaler_info ): entry = scaler_info # already an entry # 2) Otherwise, scaler_info is expected to be name -> entry mapping. elif isinstance(scaler_info, Mapping) and isinstance( target_name, str ): # direct hit if target_name in scaler_info: entry = scaler_info[target_name] else: # case-insensitive + aliases keymap = { k.lower(): k for k in scaler_info.keys() if isinstance(k, str) } lname = target_name.lower() aliases: list[str] = [] if lname.endswith("_pred"): aliases.append(lname[:-5]) if "subs" in lname: aliases += [ "subsidence", "subs", "subs_pred", "subsidence_cum", "subsidence_cum__si", ] if "gwl" in lname or "head" in lname: aliases += [ "gwl", "gwl_pred", "gwl_depth_bgs", "gwl_depth_bgs__si", "head_m", "head_m__si", ] # de-dup preserve order seen = set() aliases = [ a for a in aliases if not (a in seen or seen.add(a)) ] # try aliases (case-insensitive) for a in aliases: real = keymap.get(a.lower()) if real is not None: entry = scaler_info[real] break if entry is None: return None, None, None # 3) Load scaler if needed scaler = entry.get("scaler") if scaler is None and isinstance( entry.get("scaler_path"), str ): try: scaler = joblib.load(entry["scaler_path"]) except Exception: scaler = None idx = entry.get("idx") n_features = None all_feats = entry.get("all_features") if isinstance(all_feats, list | tuple): n_features = len(all_feats) if hasattr(scaler, "n_features_in_"): try: n_features = int(scaler.n_features_in_) except Exception: pass if n_features is None and idx is not None: n_features = int(idx) + 1 return scaler, idx, n_features
[docs] def scale_target( y_phys: ArrayLike, *, scaler_info: Mapping[str, Any] | None = None, target_name: str | None = None, scaler_entry: Mapping[str, Any] | None = None, scaler: Any | None = None, feature_index: int | None = None, n_features: int | None = None, params: Mapping[str, float] | None = None, ) -> np.ndarray: """ Forward-transform physical values into scaled space. Mirrors inverse_scale_target(...) conventions. """ y = _to_np(y_phys) shp = y.shape y_flat = y.reshape(-1, 1) # ---- 1) Manual params --------------------------------- if params: keys = set(params.keys()) if {"min", "max"} <= keys: _min = float(params["min"]) _max = float(params["max"]) span = (_max - _min) or 1.0 ys = (y_flat - _min) / span return ys.reshape(shp) if {"mean", "std"} <= keys: mean = float(params["mean"]) std = float(params["std"]) or 1.0 ys = (y_flat - mean) / std return ys.reshape(shp) if {"scale", "shift"} <= keys: scale = float(params.get("scale", 1.0)) or 1.0 shift = float(params.get("shift", 0.0)) ys = (y_flat - shift) / scale return ys.reshape(shp) return y # ---- 2) Stage-1 metadata ------------------------------- stg_sc, stg_idx, stg_nf = _resolve_stage1_entry( scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, ) if stg_sc is not None: scaler = stg_sc if feature_index is None: feature_index = stg_idx if stg_nf is not None: n_features = stg_nf # ---- 3) Bare scaler or path ----------------------------- if isinstance(scaler, str): try: scaler = joblib.load(scaler) except Exception: scaler = None if scaler is None or not hasattr(scaler, "transform"): return y if n_features is None and hasattr( scaler, "n_features_in_" ): try: n_features = int(scaler.n_features_in_) except Exception: n_features = None if n_features is None: n_features = 1 if feature_index is None: feature_index = 0 nf = int(n_features) fi = int(feature_index) X = np.zeros((y_flat.shape[0], nf), dtype=float) X[:, fi : fi + 1] = y_flat try: Xs = scaler.transform(X) ys = Xs[:, fi : fi + 1] return ys.reshape(shp) except Exception: return y
[docs] def inverse_scale_target( y_scaled: ArrayLike, *, scaler_info: Mapping[str, Any] | None = None, target_name: str | None = None, scaler_entry: Mapping[str, Any] | None = None, scaler: Any | None = None, feature_index: int | None = None, n_features: int | None = None, params: Mapping[str, float] | None = None, ) -> np.ndarray: r""" Inverse-transform a scaled target array back to physical units. Supports three patterns: 1. Stage-1 ``scaler_info`` dict (preferred in NATCOM): pass ``scaler_info=scaler_info_dict, target_name="subsidence"``. 2. A bare scaler instance or a path to a joblib dump via ``scaler=...``. If multi-feature, also pass ``feature_index`` and optionally ``n_features``. 3. Manual scaling parameters via ``params`` such as {\"min\", \"max\"}, {\"mean\", \"std\"}, or {\"scale\", \"shift\"}. Parameters ---------- y_scaled : array-like Scaled values, e.g. of shape (N, H, 1) or (N,). scaler_info : mapping, optional target_name : str, optional scaler_entry : mapping, optional scaler : object or str, optional feature_index : int, optional n_features : int, optional params : mapping, optional Manual parameters: - {\"min\", \"max\"} → MinMax scaling - {\"mean\", \"std\"} → standardization - {\"scale\", \"shift\"} → x = scale * x_scaled + shift. Returns ------- np.ndarray Array with same shape as ``y_scaled`` but in physical units when scaling information is available. If nothing usable is found, returns the input as a NumPy array. """ y = _to_np(y_scaled) original_shape = y.shape y_flat = y.reshape(-1, 1) # ---- 1) Manual params ------------------------------------------------ if params: keys = set(params.keys()) if {"min", "max"} <= keys: _min = float(params["min"]) _max = float(params["max"]) span = ( (_max - _min) if (_max - _min) != 0.0 else 1.0 ) y_phys = y_flat * span + _min return y_phys.reshape(original_shape) if {"mean", "std"} <= keys: mean = float(params["mean"]) std = float(params["std"]) if std == 0.0: std = 1.0 y_phys = y_flat * std + mean return y_phys.reshape(original_shape) if {"scale", "shift"} <= keys: scale = float(params.get("scale", 1.0)) shift = float(params.get("shift", 0.0)) y_phys = y_flat * scale + shift return y_phys.reshape(original_shape) # Unknown combination → fall through return y # ---- 2) Stage-1 metadata --------------------------------------------- stg_scaler, stg_idx, stg_nfeat = _resolve_stage1_entry( scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, ) if stg_scaler is not None: scaler = stg_scaler feature_index = ( stg_idx if feature_index is None else feature_index ) n_features = ( stg_nfeat if stg_nfeat is not None else n_features ) # ---- 3) Bare scaler or path ------------------------------------------ if isinstance(scaler, str): try: scaler = joblib.load(scaler) except Exception: scaler = None if scaler is not None and hasattr( scaler, "inverse_transform" ): if n_features is None and hasattr( scaler, "n_features_in_" ): try: n_features = int(scaler.n_features_in_) except Exception: n_features = None if n_features is None: n_features = 1 if feature_index is None: feature_index = 0 X = np.zeros( (y_flat.shape[0], int(n_features)), dtype=float ) fi = int(feature_index) X[:, fi : fi + 1] = y_flat try: X_inv = scaler.inverse_transform(X) y_phys = X_inv[:, fi : fi + 1] return y_phys.reshape(original_shape) except Exception: return y # ---- 4) No usable scaling info -------------------------------------- return y
[docs] def point_metrics( y_true: ArrayLike, y_pred: ArrayLike, *, use_physical: bool = False, scaler_info: Mapping[str, Any] | None = None, target_name: str | None = None, scaler_entry: Mapping[str, Any] | None = None, scaler: Any | None = None, feature_index: int | None = None, n_features: int | None = None, params: Mapping[str, float] | None = None, ) -> dict: """ Compute MAE/MSE/R², optionally in physical units. If ``use_physical=True``, both y_true and y_pred are inverse- transformed via inverse_scale_target before computing metrics. Parameters ---------- y_true, y_pred : array-like Arbitrary shapes (e.g. (N, H, 1)); flattened internally. use_physical : bool, default=False If ``True``, inverse-transform ``y_true`` and ``y_pred`` to physical units before computing metrics. scaler_info, target_name, scaler_entry, scaler, feature_index, n_features, params : optional Passed through to ``inverse_scale_target``. Returns ------- dict {\"mae\": ..., \"mse\": ..., \"r2\": ...} """ yt = _to_np(y_true) yp = _to_np(y_pred) if use_physical: yt = inverse_scale_target( yt, scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) yp = inverse_scale_target( yp, scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) yt_f = yt.reshape(-1) yp_f = yp.reshape(-1) diff = yt_f - yp_f mae = float(np.nanmean(np.abs(diff))) mse = float(np.nanmean(diff**2)) var = float(np.nanvar(yt_f)) if var > 0.0 and np.isfinite(var): r2 = float(1.0 - mse / (var + 1e-12)) else: r2 = float("nan") return {"mae": mae, "mse": mse, "r2": r2}
[docs] def per_horizon_metrics( y_true: ArrayLike, y_pred: ArrayLike, *, use_physical: bool = False, scaler_info: Mapping[str, Any] | None = None, target_name: str | None = None, scaler_entry: Mapping[str, Any] | None = None, scaler: Any | None = None, feature_index: int | None = None, n_features: int | None = None, params: Mapping[str, float] | None = None, ) -> tuple[dict, dict]: """ Per-horizon MAE/R². Parameters ---------- y_true, y_pred : array-like Shape (N, H, 1) or (N, H). use_physical : bool, default=False If True, inverse-transform to physical units before computing. Returns ------- mae_dict : dict {'H1': mae_h1, ...} r2_dict : dict {'H1': r2_h1, ...} """ Y = _to_np(y_true) P = _to_np(y_pred) if use_physical: Y = inverse_scale_target( Y, scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) P = inverse_scale_target( P, scaler_info=scaler_info, target_name=target_name, scaler_entry=scaler_entry, scaler=scaler, feature_index=feature_index, n_features=n_features, params=params, ) Y = np.asarray(Y).squeeze(-1) # (N, H) P = np.asarray(P).squeeze(-1) H = Y.shape[1] mae_dict: dict[str, float | None] = {} r2_dict: dict[str, float | None] = {} for h in range(H): yy = Y[:, h] pp = P[:, h] mask = np.isfinite(yy) & np.isfinite(pp) if mask.sum() == 0: mae_dict[f"H{h + 1}"] = None r2_dict[f"H{h + 1}"] = None continue d = yy[mask] - pp[mask] mae = float(np.mean(np.abs(d))) mse = float(np.mean(d**2)) var = float(np.var(yy[mask])) r2 = ( float(1.0 - mse / (var + 1e-12)) if var > 0 else float("nan") ) mae_dict[f"H{h + 1}"] = mae r2_dict[f"H{h + 1}"] = r2 return mae_dict, r2_dict