# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# https://lkouadio.com
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
"""
Utilities for computing error metrics in *physical* units given
Stage-1 scaling metadata.
Main entry points include ``inverse_scale_target(...)``,
``point_metrics(...)``, and ``per_horizon_metrics(...)``.
These are designed to work with the NATCOM pipeline where Stage-1
stores a ``scaler_info`` dict in the manifest::
scaler_info[target_name] = {
"scaler_path": ".../main_scaler.joblib",
"all_features": [...],
"idx": <index in scaler>,
"scaler": <fitted MinMaxScaler>, # optional, attached at Stage-2
}
They can also handle a bare scaler object, a scaler path, or manual
min/max or mean/std parameters.
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from typing import Any
import joblib
import numpy as np
from .nat_utils import extract_preds
from .shapes import canonicalize_BHQO_quantiles_np
ArrayLike = Any
def _as_np(x: Any) -> np.ndarray:
"""Best-effort: Tensor -> np.ndarray."""
if x is None:
return np.asarray(x)
try:
return x.numpy()
except Exception:
return np.asarray(x)
def _pick_point_pred(
y: Any,
*,
n_q: int = 3,
quantiles: Sequence[float] | None = None,
q: float | int | None = None,
prefer_median: bool = True,
) -> np.ndarray:
"""
Return point prediction with shape (B, H, O).
Accepts:
- (B, H, 1)
- (B, H, Q, 1) (after canonicalization)
- (B, H) (will be expanded to (B, H, 1))
"""
arr = _as_np(y)
if arr.ndim == 4:
arr = canonicalize_BHQO_quantiles_np(
arr,
n_q=n_q,
)
q_dim = int(arr.shape[2])
if q is None and prefer_median:
if quantiles is not None:
q = 0.5
else:
q = q_dim // 2
if isinstance(q, int | np.integer):
idx = int(q)
else:
if quantiles is not None and q is not None:
qs = np.asarray(quantiles, dtype=float)
idx = int(np.argmin(np.abs(qs - float(q))))
elif q is not None:
frac = float(q)
idx = int(round(frac * (q_dim - 1)))
else:
idx = q_dim // 2
idx = max(0, min(idx, q_dim - 1))
return arr[:, :, idx, :]
if arr.ndim == 3:
return arr
if arr.ndim == 2:
return arr[:, :, None]
return np.asarray(arr)
[docs]
def evaluate_point_forecast(
model: Any,
out: Any,
y_true_subs: Any,
*,
y_true_gwl: Any | None = None,
n_q: int = 3,
quantiles: Sequence[float] | None = None,
q: float | int | None = None,
use_physical: bool = False,
return_physical: bool | None = None,
scaler_info: Mapping[str, Any] | None = None,
subs_target_name: str = "subsidence",
gwl_target_name: str = "gwl",
scaler_entry: Mapping[str, Any] | None = None,
scaler: Any | None = None,
feature_index: int | None = None,
n_features: int | None = None,
params: Mapping[str, float] | None = None,
strict: bool = True,
output_names: Sequence[str] | None = None,
) -> dict[str, Any]:
r"""
End-to-end helper for point-forecast evaluation.
Pipeline: extract predictions from ``model`` and ``out``,
canonicalize BHQO quantile outputs when needed, pick a point
prediction, optionally inverse-scale it, and compute global and
per-horizon metrics.
Parameters
----------
model, out :
Passed to extract_preds(...).
y_true_subs :
True subsidence, shape (B, H, 1) or (B, H).
y_true_gwl :
Optional true gwl/head, same shape conventions.
n_q, quantiles, q :
Quantile-selection controls for ``(B, H, Q, 1)`` outputs. If
``q`` is ``None``, the median is preferred. If ``q`` is an
integer, it is treated as a direct quantile index. If ``q`` is a
float and ``quantiles`` is provided, the nearest quantile is
selected; otherwise the float is treated as a fraction in
``[0, 1]``.
use_physical :
If True, compute metrics in physical units.
return_physical :
If None, defaults to use_physical.
If ``True``, return physical-space arrays such as
``subs_pred_phys`` and ``gwl_pred_phys`` when possible.
scaler_info, scaler_entry, scaler, feature_index, n_features, params :
Passed through to ``inverse_scale_target(...)``.
Returns
-------
dict
Dictionary containing model-space predictions, optional
physical-space predictions, and global and per-horizon metrics for
subsidence and groundwater outputs.
"""
if return_physical is None:
return_physical = use_physical
subs_pred, gwl_pred = extract_preds(
model,
out,
strict=strict,
output_names=output_names,
)
subs_pt = _pick_point_pred(
subs_pred,
n_q=n_q,
quantiles=quantiles,
q=q,
prefer_median=True,
)
gwl_pt = _pick_point_pred(
gwl_pred,
n_q=n_q,
quantiles=quantiles,
q=q,
prefer_median=True,
)
res: dict[str, Any] = {
"subs_pred_model": subs_pt,
"gwl_pred_model": gwl_pt,
}
subs_metrics = point_metrics(
y_true_subs,
subs_pt,
use_physical=use_physical,
scaler_info=scaler_info,
target_name=subs_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
subs_mae_h, subs_r2_h = per_horizon_metrics(
y_true_subs,
subs_pt,
use_physical=use_physical,
scaler_info=scaler_info,
target_name=subs_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
res["subs_metrics"] = subs_metrics
res["subs_mae_h"] = subs_mae_h
res["subs_r2_h"] = subs_r2_h
if y_true_gwl is not None:
gwl_metrics = point_metrics(
y_true_gwl,
gwl_pt,
use_physical=use_physical,
scaler_info=scaler_info,
target_name=gwl_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
gwl_mae_h, gwl_r2_h = per_horizon_metrics(
y_true_gwl,
gwl_pt,
use_physical=use_physical,
scaler_info=scaler_info,
target_name=gwl_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
res["gwl_metrics"] = gwl_metrics
res["gwl_mae_h"] = gwl_mae_h
res["gwl_r2_h"] = gwl_r2_h
if return_physical:
res["subs_pred_phys"] = inverse_scale_target(
subs_pt,
scaler_info=scaler_info,
target_name=subs_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
res["gwl_pred_phys"] = inverse_scale_target(
gwl_pt,
scaler_info=scaler_info,
target_name=gwl_target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
return res
[docs]
def auto_noise_std_from_increments(
y_inc: np.ndarray,
*,
noise_frac: float = 0.10,
percentile: float = 95.0,
min_std: float = 0.0,
max_std: float | None = None,
eps: float = 1e-12,
) -> float:
"""
Compute noise std as a fraction of a robust
increment scale.
Parameters
----------
y_inc : np.ndarray
Deterministic increments (before noise).
Any shape; will be flattened.
noise_frac : float, default=0.10
Fraction applied to the percentile scale.
percentile : float, default=95.0
Percentile of the absolute increments used as the scale.
min_std : float, default=0.0
Lower bound for returned std.
max_std : float or None, default=None
Optional upper bound for returned std.
eps : float, default=1e-12
Small positive value for safe fallback.
Returns
-------
float
A finite, non-negative std value.
Notes
-----
Non-finite values are filtered first. If the percentile-based scale
is approximately zero, the function falls back to the maximum
absolute increment, then the mean absolute increment, and finally
to ``eps``.
"""
if y_inc is None:
return float(max(min_std, 0.0))
a = np.asarray(y_inc, dtype=float).ravel()
if a.size == 0:
return float(max(min_std, 0.0))
a = a[np.isfinite(a)]
if a.size == 0:
return float(max(min_std, 0.0))
if not np.isfinite(noise_frac):
noise_frac = 0.0
noise_frac = float(max(noise_frac, 0.0))
p = float(percentile)
p = float(np.clip(p, 0.0, 100.0))
abs_a = np.abs(a)
try:
scale = float(np.percentile(abs_a, p))
except Exception:
scale = float(np.nan)
if (not np.isfinite(scale)) or (scale <= eps):
scale = float(np.max(abs_a)) if abs_a.size else 0.0
if (not np.isfinite(scale)) or (scale <= eps):
scale = float(np.mean(abs_a)) if abs_a.size else 0.0
if (not np.isfinite(scale)) or (scale <= eps):
scale = float(eps)
std = float(noise_frac * scale)
if not np.isfinite(std):
std = float(min_std)
std = float(max(std, float(min_std)))
if max_std is not None:
mx = float(max_std)
if np.isfinite(mx):
std = float(min(std, mx))
return std
[docs]
def resolve_noise_std(
y_inc: np.ndarray,
*,
noise_std: float | None = None,
noise_frac: float = 0.10,
percentile: float = 95.0,
min_std: float = 0.0,
max_std: float | None = None,
) -> float:
"""
Prefer explicit `noise_std` when provided,
otherwise auto-compute from increments.
"""
if noise_std is not None:
v = float(noise_std)
if np.isfinite(v) and v >= 0.0:
return v
return auto_noise_std_from_increments(
y_inc,
noise_frac=noise_frac,
percentile=percentile,
min_std=min_std,
max_std=max_std,
)
def _to_np(x: ArrayLike) -> np.ndarray:
"""Convert Tensor / list-like to a NumPy array."""
if hasattr(x, "numpy"):
x = x.numpy()
return np.asarray(x)
def _resolve_stage1_entry(
scaler_info: Mapping[str, Any] | None,
target_name: str | None,
scaler_entry: Mapping[str, Any] | None,
) -> tuple[Any | None, int | None, int | None]:
"""
Resolve (scaler, feature_index, n_features) from Stage-1 metadata.
Parameters
----------
scaler_info : mapping or None
Either the full Stage-1 scaler_info dict mapping
``feature_name -> entry`` or a single entry.
target_name : str, optional
Name of the target in ``scaler_info``.
scaler_entry : mapping, optional
Direct entry for the target.
Returns
-------
scaler : object or None
idx : int or None
Index of the target in the scaler's feature space.
n_features : int or None
Number of features expected by the scaler.
"""
entry: Mapping[str, Any] | None = None
# 0) If caller provided a direct entry, trust it.
if scaler_entry is not None:
entry = scaler_entry
# 1) If scaler_info itself looks like a single entry, use it.
elif isinstance(scaler_info, Mapping) and (
"idx" in scaler_info
or "scaler_path" in scaler_info
or "scaler" in scaler_info
):
entry = scaler_info # already an entry
# 2) Otherwise, scaler_info is expected to be name -> entry mapping.
elif isinstance(scaler_info, Mapping) and isinstance(
target_name, str
):
# direct hit
if target_name in scaler_info:
entry = scaler_info[target_name]
else:
# case-insensitive + aliases
keymap = {
k.lower(): k
for k in scaler_info.keys()
if isinstance(k, str)
}
lname = target_name.lower()
aliases: list[str] = []
if lname.endswith("_pred"):
aliases.append(lname[:-5])
if "subs" in lname:
aliases += [
"subsidence",
"subs",
"subs_pred",
"subsidence_cum",
"subsidence_cum__si",
]
if "gwl" in lname or "head" in lname:
aliases += [
"gwl",
"gwl_pred",
"gwl_depth_bgs",
"gwl_depth_bgs__si",
"head_m",
"head_m__si",
]
# de-dup preserve order
seen = set()
aliases = [
a
for a in aliases
if not (a in seen or seen.add(a))
]
# try aliases (case-insensitive)
for a in aliases:
real = keymap.get(a.lower())
if real is not None:
entry = scaler_info[real]
break
if entry is None:
return None, None, None
# 3) Load scaler if needed
scaler = entry.get("scaler")
if scaler is None and isinstance(
entry.get("scaler_path"), str
):
try:
scaler = joblib.load(entry["scaler_path"])
except Exception:
scaler = None
idx = entry.get("idx")
n_features = None
all_feats = entry.get("all_features")
if isinstance(all_feats, list | tuple):
n_features = len(all_feats)
if hasattr(scaler, "n_features_in_"):
try:
n_features = int(scaler.n_features_in_)
except Exception:
pass
if n_features is None and idx is not None:
n_features = int(idx) + 1
return scaler, idx, n_features
[docs]
def scale_target(
y_phys: ArrayLike,
*,
scaler_info: Mapping[str, Any] | None = None,
target_name: str | None = None,
scaler_entry: Mapping[str, Any] | None = None,
scaler: Any | None = None,
feature_index: int | None = None,
n_features: int | None = None,
params: Mapping[str, float] | None = None,
) -> np.ndarray:
"""
Forward-transform physical values into scaled space.
Mirrors inverse_scale_target(...) conventions.
"""
y = _to_np(y_phys)
shp = y.shape
y_flat = y.reshape(-1, 1)
# ---- 1) Manual params ---------------------------------
if params:
keys = set(params.keys())
if {"min", "max"} <= keys:
_min = float(params["min"])
_max = float(params["max"])
span = (_max - _min) or 1.0
ys = (y_flat - _min) / span
return ys.reshape(shp)
if {"mean", "std"} <= keys:
mean = float(params["mean"])
std = float(params["std"]) or 1.0
ys = (y_flat - mean) / std
return ys.reshape(shp)
if {"scale", "shift"} <= keys:
scale = float(params.get("scale", 1.0)) or 1.0
shift = float(params.get("shift", 0.0))
ys = (y_flat - shift) / scale
return ys.reshape(shp)
return y
# ---- 2) Stage-1 metadata -------------------------------
stg_sc, stg_idx, stg_nf = _resolve_stage1_entry(
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
)
if stg_sc is not None:
scaler = stg_sc
if feature_index is None:
feature_index = stg_idx
if stg_nf is not None:
n_features = stg_nf
# ---- 3) Bare scaler or path -----------------------------
if isinstance(scaler, str):
try:
scaler = joblib.load(scaler)
except Exception:
scaler = None
if scaler is None or not hasattr(scaler, "transform"):
return y
if n_features is None and hasattr(
scaler, "n_features_in_"
):
try:
n_features = int(scaler.n_features_in_)
except Exception:
n_features = None
if n_features is None:
n_features = 1
if feature_index is None:
feature_index = 0
nf = int(n_features)
fi = int(feature_index)
X = np.zeros((y_flat.shape[0], nf), dtype=float)
X[:, fi : fi + 1] = y_flat
try:
Xs = scaler.transform(X)
ys = Xs[:, fi : fi + 1]
return ys.reshape(shp)
except Exception:
return y
[docs]
def inverse_scale_target(
y_scaled: ArrayLike,
*,
scaler_info: Mapping[str, Any] | None = None,
target_name: str | None = None,
scaler_entry: Mapping[str, Any] | None = None,
scaler: Any | None = None,
feature_index: int | None = None,
n_features: int | None = None,
params: Mapping[str, float] | None = None,
) -> np.ndarray:
r"""
Inverse-transform a scaled target array back to physical units.
Supports three patterns:
1. Stage-1 ``scaler_info`` dict (preferred in NATCOM):
pass ``scaler_info=scaler_info_dict, target_name="subsidence"``.
2. A bare scaler instance or a path to a joblib dump via ``scaler=...``.
If multi-feature, also pass ``feature_index`` and optionally
``n_features``.
3. Manual scaling parameters via ``params`` such as
{\"min\", \"max\"}, {\"mean\", \"std\"}, or {\"scale\", \"shift\"}.
Parameters
----------
y_scaled : array-like
Scaled values, e.g. of shape (N, H, 1) or (N,).
scaler_info : mapping, optional
target_name : str, optional
scaler_entry : mapping, optional
scaler : object or str, optional
feature_index : int, optional
n_features : int, optional
params : mapping, optional
Manual parameters:
- {\"min\", \"max\"} → MinMax scaling
- {\"mean\", \"std\"} → standardization
- {\"scale\", \"shift\"} → x = scale * x_scaled + shift.
Returns
-------
np.ndarray
Array with same shape as ``y_scaled`` but in physical units when
scaling information is available. If nothing usable is found,
returns the input as a NumPy array.
"""
y = _to_np(y_scaled)
original_shape = y.shape
y_flat = y.reshape(-1, 1)
# ---- 1) Manual params ------------------------------------------------
if params:
keys = set(params.keys())
if {"min", "max"} <= keys:
_min = float(params["min"])
_max = float(params["max"])
span = (
(_max - _min) if (_max - _min) != 0.0 else 1.0
)
y_phys = y_flat * span + _min
return y_phys.reshape(original_shape)
if {"mean", "std"} <= keys:
mean = float(params["mean"])
std = float(params["std"])
if std == 0.0:
std = 1.0
y_phys = y_flat * std + mean
return y_phys.reshape(original_shape)
if {"scale", "shift"} <= keys:
scale = float(params.get("scale", 1.0))
shift = float(params.get("shift", 0.0))
y_phys = y_flat * scale + shift
return y_phys.reshape(original_shape)
# Unknown combination → fall through
return y
# ---- 2) Stage-1 metadata ---------------------------------------------
stg_scaler, stg_idx, stg_nfeat = _resolve_stage1_entry(
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
)
if stg_scaler is not None:
scaler = stg_scaler
feature_index = (
stg_idx
if feature_index is None
else feature_index
)
n_features = (
stg_nfeat if stg_nfeat is not None else n_features
)
# ---- 3) Bare scaler or path ------------------------------------------
if isinstance(scaler, str):
try:
scaler = joblib.load(scaler)
except Exception:
scaler = None
if scaler is not None and hasattr(
scaler, "inverse_transform"
):
if n_features is None and hasattr(
scaler, "n_features_in_"
):
try:
n_features = int(scaler.n_features_in_)
except Exception:
n_features = None
if n_features is None:
n_features = 1
if feature_index is None:
feature_index = 0
X = np.zeros(
(y_flat.shape[0], int(n_features)), dtype=float
)
fi = int(feature_index)
X[:, fi : fi + 1] = y_flat
try:
X_inv = scaler.inverse_transform(X)
y_phys = X_inv[:, fi : fi + 1]
return y_phys.reshape(original_shape)
except Exception:
return y
# ---- 4) No usable scaling info --------------------------------------
return y
[docs]
def point_metrics(
y_true: ArrayLike,
y_pred: ArrayLike,
*,
use_physical: bool = False,
scaler_info: Mapping[str, Any] | None = None,
target_name: str | None = None,
scaler_entry: Mapping[str, Any] | None = None,
scaler: Any | None = None,
feature_index: int | None = None,
n_features: int | None = None,
params: Mapping[str, float] | None = None,
) -> dict:
"""
Compute MAE/MSE/R², optionally in physical units.
If ``use_physical=True``, both y_true and y_pred are inverse-
transformed via inverse_scale_target before computing metrics.
Parameters
----------
y_true, y_pred : array-like
Arbitrary shapes (e.g. (N, H, 1)); flattened internally.
use_physical : bool, default=False
If ``True``, inverse-transform ``y_true`` and ``y_pred`` to
physical units before computing metrics.
scaler_info, target_name, scaler_entry, scaler, feature_index, n_features, params : optional
Passed through to ``inverse_scale_target``.
Returns
-------
dict
{\"mae\": ..., \"mse\": ..., \"r2\": ...}
"""
yt = _to_np(y_true)
yp = _to_np(y_pred)
if use_physical:
yt = inverse_scale_target(
yt,
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
yp = inverse_scale_target(
yp,
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
yt_f = yt.reshape(-1)
yp_f = yp.reshape(-1)
diff = yt_f - yp_f
mae = float(np.nanmean(np.abs(diff)))
mse = float(np.nanmean(diff**2))
var = float(np.nanvar(yt_f))
if var > 0.0 and np.isfinite(var):
r2 = float(1.0 - mse / (var + 1e-12))
else:
r2 = float("nan")
return {"mae": mae, "mse": mse, "r2": r2}
[docs]
def per_horizon_metrics(
y_true: ArrayLike,
y_pred: ArrayLike,
*,
use_physical: bool = False,
scaler_info: Mapping[str, Any] | None = None,
target_name: str | None = None,
scaler_entry: Mapping[str, Any] | None = None,
scaler: Any | None = None,
feature_index: int | None = None,
n_features: int | None = None,
params: Mapping[str, float] | None = None,
) -> tuple[dict, dict]:
"""
Per-horizon MAE/R².
Parameters
----------
y_true, y_pred : array-like
Shape (N, H, 1) or (N, H).
use_physical : bool, default=False
If True, inverse-transform to physical units before computing.
Returns
-------
mae_dict : dict
{'H1': mae_h1, ...}
r2_dict : dict
{'H1': r2_h1, ...}
"""
Y = _to_np(y_true)
P = _to_np(y_pred)
if use_physical:
Y = inverse_scale_target(
Y,
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
P = inverse_scale_target(
P,
scaler_info=scaler_info,
target_name=target_name,
scaler_entry=scaler_entry,
scaler=scaler,
feature_index=feature_index,
n_features=n_features,
params=params,
)
Y = np.asarray(Y).squeeze(-1) # (N, H)
P = np.asarray(P).squeeze(-1)
H = Y.shape[1]
mae_dict: dict[str, float | None] = {}
r2_dict: dict[str, float | None] = {}
for h in range(H):
yy = Y[:, h]
pp = P[:, h]
mask = np.isfinite(yy) & np.isfinite(pp)
if mask.sum() == 0:
mae_dict[f"H{h + 1}"] = None
r2_dict[f"H{h + 1}"] = None
continue
d = yy[mask] - pp[mask]
mae = float(np.mean(np.abs(d)))
mse = float(np.mean(d**2))
var = float(np.var(yy[mask]))
r2 = (
float(1.0 - mse / (var + 1e-12))
if var > 0
else float("nan")
)
mae_dict[f"H{h + 1}"] = mae
r2_dict[f"H{h + 1}"] = r2
return mae_dict, r2_dict