# License: Apache-2.0
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
r"""Utility helpers for subsidence data, units, and coordinates."""
from __future__ import annotations
import json
import math
import os
import warnings
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass
from typing import (
Any,
Literal,
)
import numpy as np
import pandas as pd
ShiftMode = Literal["none", "min", "mean", "value"]
Mode = Literal["add", "overwrite"]
DEFAULT_SUBS_UNIT_TO_SI = 1e-3
def _is_nonempty_str(x) -> bool:
return isinstance(x, str) and x.strip() != ""
def _looks_like_zscore_name(col: str) -> bool:
if not isinstance(col, str):
return False
c = col.strip().lower()
return c.endswith("_z") or c.endswith("_zscore")
def _first_present(
cols: set,
candidates: Sequence[str],
) -> str | None:
for c in candidates:
if c and c in cols:
return c
return None
[docs]
def normalize_gwl_alias(
df: pd.DataFrame,
gwl_col_user: str | None,
*,
prefer_depth_bgs: bool = True,
verbose: bool = True,
) -> tuple[pd.DataFrame, str | None]:
"""Normalize common GWL naming aliases.
Naming-only: unit conversion happens later.
"""
if not _is_nonempty_str(gwl_col_user):
return df, gwl_col_user
cols = set(df.columns)
user = gwl_col_user.strip()
depth_m = ["GWL_depth_bgs_m"]
depth = ["GWL_depth_bgs"]
generic = ["GWL"]
aliases_depth_m = {"gwl_depth_bgs_m", "gwl_depth_m"}
aliases_depth = {"gwl_depth_bgs", "gwl_depth"}
aliases_generic = {"gwl"}
u = user.lower()
if u in aliases_generic:
order = depth_m + depth + generic
if not prefer_depth_bgs:
order = generic + depth + depth_m
pick = _first_present(cols, order)
if pick is None:
return df, user
if verbose and pick != user:
print(
" [GWL] Config uses "
f"{user!r}; using {pick!r}."
)
return df, pick
if u in aliases_depth_m and "GWL_depth_bgs_m" in cols:
if verbose and user != "GWL_depth_bgs_m":
print(
" [GWL] Alias "
f"{user!r} -> 'GWL_depth_bgs_m'."
)
return df, "GWL_depth_bgs_m"
if u in aliases_depth and "GWL_depth_bgs" in cols:
if verbose and user != "GWL_depth_bgs":
print(
f" [GWL] Alias {user!r} -> 'GWL_depth_bgs'."
)
return df, "GWL_depth_bgs"
if user == "GWL_depth_bgs_m" and user not in cols:
alt = _first_present(cols, depth)
if alt is not None:
if verbose:
print(
" [GWL] 'GWL_depth_bgs_m' "
"missing; using 'GWL_depth_bgs'."
)
return df, alt
if user == "GWL_depth_bgs" and user not in cols:
alt = _first_present(cols, depth_m)
if alt is not None:
if verbose:
print(
" [GWL] 'GWL_depth_bgs' "
"missing; using 'GWL_depth_bgs_m'."
)
return df, alt
if (
user == "GWL"
and ("GWL" in cols)
and _first_present(cols, depth_m + depth) is None
):
if "GWL_depth_bgs" not in cols:
if verbose:
print(
" [GWL] Renaming "
"'GWL' -> 'GWL_depth_bgs'."
)
df.rename(
columns={"GWL": "GWL_depth_bgs"},
inplace=True,
)
return df, "GWL_depth_bgs"
return df, user
[docs]
def resolve_gwl_for_physics(
df: pd.DataFrame,
gwl_col_user: str | None,
*,
prefer_depth_bgs: bool = True,
allow_keep_zscore_as_ml: bool = True,
verbose: bool = True,
) -> tuple[str, str | None]:
"""Pick meters GWL for physics; keep z-score ML-only."""
df, gwl_col_user = normalize_gwl_alias(
df,
gwl_col_user,
prefer_depth_bgs=prefer_depth_bgs,
verbose=verbose,
)
cols = set(df.columns)
meters_pref = [
"GWL_depth_bgs_m",
"GWL_depth_bgs",
"GWL",
]
if not prefer_depth_bgs:
meters_pref = [
"GWL",
"GWL_depth_bgs",
"GWL_depth_bgs_m",
]
zscore_cands = [
"GWL_depth_bgs_m_z",
"GWL_depth_bgs_z",
"GWL_z",
]
meters_avail = _first_present(cols, meters_pref)
zscore_avail = _first_present(cols, zscore_cands)
def _is_z(col: str) -> bool:
if _looks_like_zscore_name(col):
return True
return col in zscore_cands
if _is_nonempty_str(gwl_col_user):
if gwl_col_user not in cols:
raise ValueError(
f"GWL_COL={gwl_col_user!r} not in dataset."
)
if _is_z(gwl_col_user):
if meters_avail is None:
raise ValueError(
"GWL_COL is z-scored but no meters "
"GWL exists. Physics needs meters."
)
gwl_m = meters_avail
if allow_keep_zscore_as_ml:
gwl_z = gwl_col_user
else:
gwl_z = None
if verbose:
print(
" [GWL] Using "
f"{gwl_m!r} for physics; "
f"keeping {gwl_z!r} ML-only."
)
return gwl_m, gwl_z
if ("GWL_depth_bgs_m" in cols) and (
gwl_col_user != "GWL_depth_bgs_m"
):
if verbose:
print(
" [GWL] Using "
"'GWL_depth_bgs_m' for physics "
f"(over {gwl_col_user!r})."
)
gwl_m = "GWL_depth_bgs_m"
else:
gwl_m = gwl_col_user
if allow_keep_zscore_as_ml and zscore_avail:
gwl_z = zscore_avail
else:
gwl_z = None
return gwl_m, gwl_z
if meters_avail is None:
if zscore_avail is not None:
raise ValueError(
"No meters GWL found, but a z-score "
"column exists. Physics needs meters."
)
raise ValueError(
"No groundwater column found. Expected "
"'GWL_depth_bgs_m', 'GWL_depth_bgs', or 'GWL'."
)
if allow_keep_zscore_as_ml and zscore_avail:
gwl_z = zscore_avail
else:
gwl_z = None
if verbose:
if gwl_z is not None:
print(
" [GWL] Auto-selected "
f"{meters_avail!r} for physics; "
"z-score kept ML-only."
)
else:
print(
" [GWL] Auto-selected "
f"{meters_avail!r} for physics."
)
return meters_avail, gwl_z
[docs]
def resolve_head_column(
df: pd.DataFrame,
*,
depth_col: str,
head_col: str = "head_m",
z_surf_col: str | None = None,
use_head_proxy: bool = True,
) -> tuple[str, str | None]:
"""Resolve a head column, creating one if needed."""
cols = set(df.columns)
if head_col and head_col in cols:
z_used = None
if z_surf_col and z_surf_col in cols:
z_used = z_surf_col
elif "z_surf_m" in cols:
z_used = "z_surf_m"
elif "z_surf" in cols:
z_used = "z_surf"
return head_col, z_used
z_cands = []
if z_surf_col:
z_cands.append(z_surf_col)
z_cands += ["z_surf_m", "z_surf_corr", "z_surf"]
z_used = _first_present(cols, z_cands)
if z_used is not None:
new_head = "head_m"
df[new_head] = pd.to_numeric(
df[z_used], errors="coerce"
) - pd.to_numeric(df[depth_col], errors="coerce")
return new_head, z_used
if use_head_proxy:
new_head = "head_m"
df[new_head] = -pd.to_numeric(
df[depth_col],
errors="coerce",
)
return new_head, None
raise ValueError(
"Cannot resolve head. Provide 'head_m' "
"or z_surf, or enable USE_HEAD_PROXY."
)
def _norm_unit(unit: str | None) -> str:
u = (unit or "").strip().lower()
if u in ("m", "meter", "meters"):
return "m"
if u in ("mm", "millimeter", "millimeters"):
return "mm"
return u
def _unit_factor(
from_unit: str | None,
to_unit: str | None,
) -> float:
fu = _norm_unit(from_unit)
tu = _norm_unit(to_unit)
if fu == tu:
return 1.0
if fu == "m" and tu == "mm":
return 1000.0
if fu == "mm" and tu == "m":
return 1e-3
raise ValueError(
"Unsupported unit conversion: "
f"{from_unit!r} -> {to_unit!r}."
)
def _as_float(x: object) -> float:
try:
v = float(x) # type: ignore[arg-type]
except Exception as e:
raise ValueError(
f"Cannot convert to float: {x!r}."
) from e
if not math.isfinite(v):
raise ValueError(f"Non-finite value: {v!r}.")
return float(v)
def _as_pos_float(
x: object,
*,
default: float,
) -> float:
v = _as_float(x)
if v <= 0.0:
return float(default)
return float(v)
def _select_target_cols(
df: pd.DataFrame,
*,
base: str,
columns: Iterable[str] | None = None,
) -> list[str]:
if columns is not None:
return [c for c in columns if c in df.columns]
b = (base or "").strip()
if not b:
return []
pref = b + "_"
cols: list[str] = []
for c in df.columns:
cs = str(c)
if cs == b or cs.startswith(pref):
cols.append(cs)
return cols
[docs]
def convert_target_units_df(
df: pd.DataFrame | None,
*,
base: str,
from_unit: str = "m",
to_unit: str = "mm",
mode: Mode = "overwrite",
suffix: str = "_mm",
columns: Iterable[str] | None = None,
unit_col: str | None = None,
copy_df: bool = True,
overwrite_cols: bool = False,
strict: bool = False,
) -> pd.DataFrame | None:
"""
Convert target-like columns between "m" and "mm".
Selected columns:
- `base`
- `base + "_*"` (quantiles, intervals, ...)
If mode="add", new columns use `suffix`.
"""
if df is None or getattr(df, "empty", True):
return df
cols = _select_target_cols(
df,
base=base,
columns=columns,
)
if not cols:
return df
m = (mode or "overwrite").strip().lower()
if m not in ("add", "overwrite"):
raise ValueError(
"mode must be 'add' or 'overwrite'. "
f"Got {mode!r}."
)
# If user asked to "add" but suffix is empty,
# this is effectively overwrite.
if m == "add" and (suffix == ""):
m = "overwrite"
factor = _unit_factor(from_unit, to_unit)
out = df.copy() if copy_df else df
for c in cols:
dst = c if m == "overwrite" else (c + suffix)
if (dst in out.columns) and (m == "add"):
if not overwrite_cols:
continue
ser = out[c]
vals = pd.to_numeric(ser, errors="coerce")
if strict and (vals.notna().sum() == 0):
raise ValueError(
f"Cannot convert column to numeric: {c!r}."
)
out[dst] = vals.astype(float) * float(factor)
ucol = unit_col or (str(base).strip() + "_unit")
out[ucol] = _norm_unit(to_unit)
return out
[docs]
def subs_unit_to_si(
cfg: Mapping[str, object] | None = None,
*,
default: float = DEFAULT_SUBS_UNIT_TO_SI,
units_prov_key: str = "units_provenance",
stage1_key: str = "subs_unit_to_si_applied_stage1",
cfg_key: str = "SUBS_UNIT_TO_SI",
) -> float:
"""
Subsidence unit->SI factor (to meters).
Priority:
1) cfg[units_prov_key][stage1_key]
2) cfg[cfg_key]
3) default
"""
if cfg is None:
return float(default)
prov = cfg.get(units_prov_key)
if isinstance(prov, Mapping):
v = prov.get(stage1_key)
if v is not None:
return _as_pos_float(v, default=default)
v2 = cfg.get(cfg_key)
if v2 is not None:
return _as_pos_float(v2, default=default)
return float(default)
[docs]
def subs_native_unit(
cfg: Mapping[str, object] | None = None,
*,
default: str = "mm",
) -> str:
"""
Infer the "native" subsidence unit from cfg.
- unit_to_si ~= 1e-3 -> "mm"
- unit_to_si ~= 1.0 -> "m"
"""
if cfg is None:
return _norm_unit(default) or "mm"
u2si = subs_unit_to_si(cfg)
if abs(u2si - 1e-3) <= 1e-10:
return "mm"
if abs(u2si - 1.0) <= 1e-10:
return "m"
return _norm_unit(default) or "mm"
[docs]
def add_subsidence_mm_columns(
df: pd.DataFrame | None,
cfg: Mapping[str, object] | None = None,
*,
base: str = "subsidence",
columns: Iterable[str] | None = None,
suffix: str = "_mm",
unit_col: str | None = None,
copy_df: bool = True,
overwrite: bool = False,
) -> pd.DataFrame | None:
"""
Add (or overwrite) subsidence columns in millimeters.
Always assumes the current values are in meters.
"""
return convert_target_units_df(
df,
base=base,
from_unit="m",
to_unit="mm",
mode="add",
suffix=suffix,
columns=columns,
unit_col=unit_col,
copy_df=copy_df,
overwrite_cols=overwrite,
strict=False,
)
[docs]
def add_subsidence_native_unit_columns(
df: pd.DataFrame | None,
cfg: Mapping[str, object] | None = None,
*,
base: str = "subsidence",
columns: Iterable[str] | None = None,
suffix: str = "_native",
unit_col: str | None = None,
copy_df: bool = True,
overwrite: bool = False,
) -> pd.DataFrame | None:
"""
Add columns in the cfg-inferred native unit.
If cfg says the native unit was meters, this becomes
a no-op aside from optional unit_col.
"""
to_unit = subs_native_unit(cfg)
return convert_target_units_df(
df,
base=base,
from_unit="m",
to_unit=to_unit,
mode="add",
suffix=suffix,
columns=columns,
unit_col=unit_col,
copy_df=copy_df,
overwrite_cols=overwrite,
strict=False,
)
[docs]
def finalize_si_scaling_kwargs(
scaling_kwargs: dict[str, Any],
*,
subs_in_si: bool,
head_in_si: bool,
thickness_in_si: bool,
force_identity_affine_if_si: bool = True,
warn: bool = True,
) -> dict[str, Any]:
"""
Prevent double SI conversion in GeoPrior scaling_kwargs.
"""
kw = dict(scaling_kwargs)
def _f(name: str, default: float) -> float:
v = kw.get(name, default)
try:
return float(v)
except Exception:
return float(default)
kw["subs_unit_to_si"] = _f("subs_unit_to_si", 1.0)
kw["head_unit_to_si"] = _f("head_unit_to_si", 1.0)
kw["thickness_unit_to_si"] = _f(
"thickness_unit_to_si", 1.0
)
kw.setdefault("subs_scale_si", None)
kw.setdefault("subs_bias_si", None)
kw.setdefault("head_scale_si", None)
kw.setdefault("head_bias_si", None)
kw.setdefault("H_scale_si", None)
kw.setdefault("H_bias_si", None)
def _fix(
*,
already_si: bool,
unit_key: str,
scale_key: str,
bias_key: str,
name: str,
) -> None:
if not already_si:
if kw.get(scale_key) is None:
kw[scale_key] = 1.0
if kw.get(bias_key) is None:
kw[bias_key] = 0.0
return
cur = _f(unit_key, 1.0)
if warn and (cur != 1.0):
warnings.warn(
"[GeoPrior SI] "
f"{name} already SI but {unit_key}={cur}. "
f"Setting {unit_key}=1.0.",
RuntimeWarning,
stacklevel=2,
)
kw[unit_key] = 1.0
if force_identity_affine_if_si:
kw[scale_key] = 1.0
kw[bias_key] = 0.0
else:
if kw.get(scale_key) is None:
kw[scale_key] = 1.0
if kw.get(bias_key) is None:
kw[bias_key] = 0.0
_fix(
already_si=subs_in_si,
unit_key="subs_unit_to_si",
scale_key="subs_scale_si",
bias_key="subs_bias_si",
name="subsidence",
)
_fix(
already_si=head_in_si,
unit_key="head_unit_to_si",
scale_key="head_scale_si",
bias_key="head_bias_si",
name="head/GWL",
)
_fix(
already_si=thickness_in_si,
unit_key="thickness_unit_to_si",
scale_key="H_scale_si",
bias_key="H_bias_si",
name="thickness/H_field",
)
return kw
# Backward-compat alias (no duplicate body)
finalize_si_affines_and_units = finalize_si_scaling_kwargs
[docs]
def infer_utm_epsg_from_lonlat(
lon_deg: np.ndarray,
lat_deg: np.ndarray,
) -> int:
"""
Infer a UTM EPSG code from lon/lat (EPSG:4326).
Uses standard UTM zoning:
zone = floor((lon + 180)/6) + 1
EPSG = 32600 + zone (north), 32700 + zone (south)
"""
lon = float(np.nanmean(np.asarray(lon_deg, dtype=float)))
lat = float(np.nanmean(np.asarray(lat_deg, dtype=float)))
zone = int(np.floor((lon + 180.0) / 6.0) + 1.0)
zone = max(1, min(zone, 60))
return (32600 + zone) if (lat >= 0.0) else (32700 + zone)
[docs]
def lonlat_to_utm_m(
lon_deg: np.ndarray,
lat_deg: np.ndarray,
*,
src_epsg: int = 4326,
target_epsg: int | None = None,
) -> tuple[np.ndarray, np.ndarray, int]:
"""
Convert lon/lat degrees to UTM meters using pyproj.
Returns
-------
x_m, y_m, target_epsg
"""
lon = np.asarray(lon_deg, dtype=float)
lat = np.asarray(lat_deg, dtype=float)
if target_epsg is None:
target_epsg = infer_utm_epsg_from_lonlat(lon, lat)
try:
from pyproj import Transformer
except Exception as e:
raise ImportError(
"pyproj is required for lon/lat -> UTM conversion. "
"Install with: pip install pyproj"
) from e
tr = Transformer.from_crs(
f"EPSG:{src_epsg}",
f"EPSG:{target_epsg}",
always_xy=True,
)
x_m, y_m = tr.transform(lon, lat)
return (
np.asarray(x_m, float),
np.asarray(y_m, float),
int(target_epsg),
)
def _shift_1d(
a: np.ndarray,
mode: ShiftMode,
value: float | None = None,
) -> tuple[np.ndarray, float]:
"""
Shift array by min/mean/custom value (translation only, not scaling).
"""
a = np.asarray(a, dtype=float)
if mode == "none":
return a, 0.0
if mode == "min":
s = float(np.nanmin(a))
return a - s, s
if mode == "mean":
s = float(np.nanmean(a))
return a - s, s
if mode == "value":
if value is None:
raise ValueError(
"shift mode 'value' requires shift_value."
)
s = float(value)
return a - s, s
raise ValueError(f"Unknown shift mode: {mode!r}")
[docs]
@dataclass
class CoordsPack:
coords: np.ndarray # (B, H, 3) float32
coord_mins: dict[
str, float
] # {"t":..., "x":..., "y":...} (original mins/means used)
coord_ranges: dict[
str, float
] # {"t":..., "x":..., "y":...} (after shifting; ranges)
meta: dict[str, Any] # epsg, shift modes, etc.
[docs]
def make_txy_coords(
t: np.ndarray,
x: np.ndarray,
y: np.ndarray,
*,
time_shift: ShiftMode = "min",
xy_shift: ShiftMode = "min",
time_shift_value: float | None = None,
x_shift_value: float | None = None,
y_shift_value: float | None = None,
dtype: str = "float32",
) -> CoordsPack:
"""
Build coords tensor (t, x, y) with OPTIONAL shifting (translation only).
This is designed for your "not normalized" workflow:
- You keep SI units (years and meters),
- but you avoid feeding huge UTM magnitudes (e.g. 3e5, 2.5e6)
into coord MLPs by shifting x,y (and optionally t).
Notes
-----
- This does NOT min-max scale to [0,1]. It only translates.
- Returning coord_mins/coord_ranges is still useful for logging/debug.
"""
t = np.asarray(t, dtype=float)
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
if t.shape != x.shape or t.shape != y.shape:
raise ValueError(
f"t, x, y must have same shape. Got {t.shape}, {x.shape}, {y.shape}"
)
t2, t0 = _shift_1d(t, time_shift, time_shift_value)
x2, x0 = _shift_1d(x, xy_shift, x_shift_value)
y2, y0 = _shift_1d(y, xy_shift, y_shift_value)
# ranges after shifting (safe even if you don't "use" them in model)
t_rng = float(np.nanmax(t2) - np.nanmin(t2))
x_rng = float(np.nanmax(x2) - np.nanmin(x2))
y_rng = float(np.nanmax(y2) - np.nanmin(y2))
coords = np.concatenate(
[t2[..., None], x2[..., None], y2[..., None]], axis=-1
).astype(dtype)
return CoordsPack(
coords=coords,
coord_mins={"t": t0, "x": x0, "y": y0},
coord_ranges={"t": t_rng, "x": x_rng, "y": y_rng},
meta={
"time_shift": time_shift,
"xy_shift": xy_shift,
},
)
[docs]
def detect_subsidence_mode(
df: pd.DataFrame,
*,
rate_col: str = "subsidence",
cum_col: str = "subsidence_cum",
time_col: str = "year",
group_cols: Iterable[str] = (
"longitude",
"latitude",
"city",
),
tol_rel: float = 0.05,
min_points: int = 3,
max_groups: int = 200,
random_state: int = 42,
) -> dict:
"""
Infer whether subsidence columns represent 'rate', 'cumulative',
or an inconsistent pair.
Strategy
--------
If both columns exist, check whether:
Δcum(t_i) ≈ rate(t_i) * Δt_i
per group (lon/lat[/city]) over i>=1. This is baseline-invariant.
Returns
-------
dict with keys:
mode: 'cumulative', 'rate', 'pair-consistent', 'unknown'
details: diagnostics (errors, counts, etc.)
"""
cols = set(df.columns)
has_rate = rate_col in cols
has_cum = cum_col in cols
if has_cum and not has_rate:
return {
"mode": "cumulative",
"details": {"reason": "cum_col present only"},
}
if has_rate and not has_cum:
return {
"mode": "rate",
"details": {"reason": "rate_col present only"},
}
if not (has_rate and has_cum):
return {
"mode": "unknown",
"details": {"reason": "neither column present"},
}
gcols = [c for c in group_cols if c in cols]
if not gcols:
return {
"mode": "unknown",
"details": {
"reason": "no valid group_cols found in df"
},
}
# sample groups for speed
groups = df.groupby(gcols, sort=False)
keys = list(groups.groups.keys())
if not keys:
return {
"mode": "unknown",
"details": {"reason": "no groups"},
}
rng = np.random.default_rng(random_state)
if len(keys) > max_groups:
keys = [
keys[i]
for i in rng.choice(
len(keys), size=max_groups, replace=False
)
]
rel_errs = []
used_groups = 0
used_points = 0
for k in keys:
g = groups.get_group(k).sort_values(time_col)
if len(g) < min_points:
continue
t = pd.to_numeric(
g[time_col], errors="coerce"
).to_numpy(float)
r = pd.to_numeric(
g[rate_col], errors="coerce"
).to_numpy(float)
c = pd.to_numeric(
g[cum_col], errors="coerce"
).to_numpy(float)
m = np.isfinite(t) & np.isfinite(r) & np.isfinite(c)
t, r, c = t[m], r[m], c[m]
if t.size < min_points:
continue
dt = np.diff(t)
dc = np.diff(c)
r_next = r[1:]
m2 = (
np.isfinite(dt)
& np.isfinite(dc)
& np.isfinite(r_next)
& (dt != 0)
)
if m2.sum() < (min_points - 1):
continue
dt, dc, r_next = dt[m2], dc[m2], r_next[m2]
pred_dc = r_next * dt
denom = np.maximum(np.mean(np.abs(pred_dc)), 1e-12)
rel = np.mean(np.abs(dc - pred_dc)) / denom
rel_errs.append(rel)
used_groups += 1
used_points += int(m2.sum())
if used_groups == 0:
return {
"mode": "unknown",
"details": {
"reason": "insufficient data per group"
},
}
med = float(np.median(rel_errs))
out = {
"median_rel_error": med,
"mean_rel_error": float(np.mean(rel_errs)),
"n_groups": used_groups,
"n_points": used_points,
"tol_rel": tol_rel,
}
if med <= tol_rel:
return {"mode": "pair-consistent", "details": out}
return {"mode": "unknown", "details": out}
[docs]
def rate_to_cumulative(
df: pd.DataFrame,
*,
rate_col: str = "subsidence",
cum_col: str = "subsidence_cum",
time_col: str = "year",
group_cols: Iterable[str] = (
"longitude",
"latitude",
"city",
),
initial: str = "first_equals_rate_dt",
inplace: bool = False,
) -> pd.DataFrame:
"""
Build cumulative displacement from a rate series.
Assumption
----------
rate(t_i) represents the rate over the interval (t_{i-1}, t_i].
Then:
cum(t_i) = cum(t_{i-1}) + rate(t_i) * dt_i
Parameters
----------
initial:
- 'zero': cum at first time is 0
- 'first_equals_rate_dt': cum(t0) = rate(t0) * dt_ref
where dt_ref is median dt in that group (fallback 1).
Returns
-------
DataFrame with cum_col added/overwritten.
"""
out = df if inplace else df.copy()
cols = set(out.columns)
gcols = [c for c in group_cols if c in cols]
if not gcols:
raise ValueError("No valid group_cols found in df.")
out[cum_col] = np.nan
for _, gidx in out.groupby(
gcols, sort=False
).groups.items():
g = out.loc[gidx].sort_values(time_col)
t = pd.to_numeric(
g[time_col], errors="coerce"
).to_numpy(float)
r = pd.to_numeric(
g[rate_col], errors="coerce"
).to_numpy(float)
m = np.isfinite(t) & np.isfinite(r)
t, r = t[m], r[m]
if t.size == 0:
continue
dt = np.diff(t)
dt_ref = (
float(np.median(dt[np.isfinite(dt) & (dt != 0)]))
if t.size > 1
else 1.0
)
if not np.isfinite(dt_ref) or dt_ref == 0:
dt_ref = 1.0
c = np.zeros_like(r, dtype=float)
if initial == "zero":
c[0] = 0.0
elif initial == "first_equals_rate_dt":
c[0] = r[0] * dt_ref
else:
raise ValueError(
"initial must be 'zero' or 'first_equals_rate_dt'."
)
if r.size > 1:
dt_i = np.diff(t)
dt_i = np.where(
(~np.isfinite(dt_i)) | (dt_i == 0),
dt_ref,
dt_i,
)
c[1:] = c[0] + np.cumsum(r[1:] * dt_i)
# write back aligned to original rows in this group
out.loc[g.index[m], cum_col] = c
return out
[docs]
def cumulative_to_rate(
df: pd.DataFrame,
*,
cum_col: str = "subsidence_cum",
rate_col: str = "subsidence",
time_col: str = "year",
group_cols: Iterable[str] = (
"longitude",
"latitude",
"city",
),
first: str = "cum_over_dtref",
inplace: bool = False,
) -> pd.DataFrame:
"""
Recover a rate series from cumulative displacement.
rate(t_i) = (cum(t_i) - cum(t_{i-1})) / dt_i for i>=1
first:
- 'nan': first rate is NaN
- 'cum_over_dtref': rate(t0) = cum(t0)/dt_ref (dt_ref median dt)
Returns
-------
DataFrame with rate_col added/overwritten.
"""
out = df if inplace else df.copy()
cols = set(out.columns)
gcols = [c for c in group_cols if c in cols]
if not gcols:
raise ValueError("No valid group_cols found in df.")
out[rate_col] = np.nan
for _, gidx in out.groupby(
gcols, sort=False
).groups.items():
g = out.loc[gidx].sort_values(time_col)
t = pd.to_numeric(
g[time_col], errors="coerce"
).to_numpy(float)
c = pd.to_numeric(
g[cum_col], errors="coerce"
).to_numpy(float)
m = np.isfinite(t) & np.isfinite(c)
t, c = t[m], c[m]
if t.size == 0:
continue
dt = np.diff(t)
dt_ref = (
float(np.median(dt[np.isfinite(dt) & (dt != 0)]))
if t.size > 1
else 1.0
)
if not np.isfinite(dt_ref) or dt_ref == 0:
dt_ref = 1.0
r = np.zeros_like(c, dtype=float)
if first == "nan":
r[0] = np.nan
elif first == "cum_over_dtref":
r[0] = c[0] / dt_ref
else:
raise ValueError(
"first must be 'nan' or 'cum_over_dtref'."
)
if c.size > 1:
dt_i = np.diff(t)
dt_i = np.where(
(~np.isfinite(dt_i)) | (dt_i == 0),
dt_ref,
dt_i,
)
r[1:] = np.diff(c) / dt_i
out.loc[g.index[m], rate_col] = r
return out
# ---------------------------------------------------------------------
# Evaluation payload unit conversion
# ---------------------------------------------------------------------
_TIME_UNIT_TO_SECONDS: dict[str, float] = {
"unitless": 1.0,
"step": 1.0,
"index": 1.0,
"s": 1.0,
"sec": 1.0,
"second": 1.0,
"seconds": 1.0,
"min": 60.0,
"minute": 60.0,
"minutes": 60.0,
"h": 3600.0,
"hr": 3600.0,
"hour": 3600.0,
"hours": 3600.0,
"day": 86400.0,
"days": 86400.0,
"week": 7.0 * 86400.0,
"weeks": 7.0 * 86400.0,
# Mean tropical year (365.2425 d)
"year": 31556952.0,
"years": 31556952.0,
"yr": 31556952.0,
"month": 31556952.0 / 12.0,
"months": 31556952.0 / 12.0,
}
def _cfg_to_mapping(
cfg: object | None,
) -> Mapping[str, object]:
if cfg is None:
return {}
if isinstance(cfg, Mapping):
return cfg
try:
return dict(vars(cfg))
except Exception:
return {}
def _norm_time_units(time_units: str | None) -> str:
s = (time_units or "").strip().lower()
if s in ("", "none", "null"):
return "unitless"
if s in ("t", "time", "times"):
return "unitless"
if s in ("yrs",):
return "yr"
return s
def _seconds_per_time_unit(time_units: str | None) -> float:
key = _norm_time_units(time_units)
if key not in _TIME_UNIT_TO_SECONDS:
keys = sorted(_TIME_UNIT_TO_SECONDS.keys())
raise ValueError(
f"Unsupported time_units={time_units!r}. "
f"Supported: {keys}"
)
return float(_TIME_UNIT_TO_SECONDS[key])
def _is_number(x: object) -> bool:
if isinstance(x, bool):
return False
return isinstance(x, int | float | np.number)
def _scale_obj(obj: object, factor: float) -> object:
"""
Multiply numbers inside common JSON-like containers.
Supports scalar numbers, dicts of numbers, and lists/tuples of
numbers. Non-numeric entries are left unchanged.
"""
if _is_number(obj):
return float(obj) * float(factor)
if isinstance(obj, dict):
return {
k: _scale_obj(v, factor) for k, v in obj.items()
}
if isinstance(obj, list | tuple):
return [_scale_obj(v, factor) for v in obj]
return obj
def _set_scaled(
d: dict[str, Any], key: str, factor: float
) -> None:
if key not in d:
return
d[key] = _scale_obj(d[key], factor)
[docs]
def convert_eval_payload_units(
payload: Mapping[str, Any],
cfg: Mapping[str, object] | object | None = None,
*,
mode: Literal["si", "interpretable"] = "si",
scope: Literal["all", "subsidence", "physics"] = "all",
savefile: str | None = None,
fmt: Literal["json"] = "json",
indent: int = 2,
copy_payload: bool = True,
) -> dict[str, Any]:
"""
Convert GeoPriorSubsNet evaluation-payload units for reporting.
This is a **post-processing** helper meant for stage-2 evaluation
JSON payloads (e.g. ``geoprior_eval_phys_<timestamp>.json``).
Parameters
----------
payload : Mapping[str, Any]
The evaluation payload dict. It is expected to contain sections
like ``metrics_evaluate``, ``point_metrics``, ``per_horizon``,
``interval_calibration`` and ``censor_stratified``.
cfg : mapping or module, optional
The experiment config (e.g. ``config`` module or ``globals()``).
The helper reads ``SUBS_UNIT_TO_SI`` (or stage-1 provenance) and
``TIME_UNITS`` from this object when available.
mode : {"si", "interpretable"}, default="si"
``"si"`` leaves values untouched. ``"interpretable"`` converts
selected subsidence and physics metrics from SI into the native
units implied by ``SUBS_UNIT_TO_SI``.
scope : {"all", "subsidence", "physics"}, default="all"
Which parts to convert when ``mode="interpretable"``.
``"subsidence"`` converts only subsidence metrics such as
MAE, MSE, and sharpness to the native unit. ``"physics"``
converts only unambiguous physics residual rates, currently
``epsilon_cons_raw`` and ``epsilon_gw_raw``. ``"all"``
applies both conversions.
savefile : str, optional
If provided, write the converted payload to this path.
fmt : {"json"}, default="json"
Output format when ``savefile`` is provided.
indent : int, default=2
JSON indentation.
copy_payload : bool, default=True
If True, operate on a deep copy of ``payload``. If False,
convert in-place (dangerous).
Returns
-------
dict
Converted payload as a plain ``dict``.
Notes
-----
For subsidence metrics, linear quantities such as MAE and sharpness
scale by ``1 / SUBS_UNIT_TO_SI``, while squared quantities such as
MSE scale by ``(1 / SUBS_UNIT_TO_SI) ** 2``.
When physics conversion is enabled, ``epsilon_cons_raw`` is treated
as a rate in ``m/s`` and converted to
``<subs_native_unit>/<TIME_UNITS>`` (for example ``mm/yr``), while
``epsilon_gw_raw`` is treated as a rate in ``1/s`` and converted to
``1/<TIME_UNITS>``.
The helper records unit provenance under ``payload["units"]``.
"""
if payload is None:
raise ValueError(
"payload must be a mapping, got None."
)
out: dict[str, Any]
if copy_payload:
import copy as _copy
out = _copy.deepcopy(dict(payload))
else:
out = dict(payload) # shallow
cfg_m = _cfg_to_mapping(cfg)
# -----------------------------------------------------------------
# Resolve unit factors
# -----------------------------------------------------------------
u2si = subs_unit_to_si(cfg_m)
native_u = subs_native_unit(cfg_m)
subs_from_si = 1.0 / float(u2si)
time_units = (
cfg_m.get("TIME_UNITS")
or out.get("time_units")
or out.get("TIME_UNITS")
or "year"
)
sec_per = _seconds_per_time_unit(str(time_units))
# -----------------------------------------------------------------
# Always attach provenance (even if mode="si")
# -----------------------------------------------------------------
out.setdefault("units", {})
if not isinstance(out["units"], dict):
out["units"] = {}
out["units"].update(
{
# Core provenance the user asked for
"subs_unit_to_si": float(u2si),
"subs_factor_si_to_real": float(subs_from_si),
"subs_metrics_unit": (
native_u if mode == "interpretable" else "m"
),
# Extra helpful metadata
"time_units": str(time_units),
"seconds_per_time_unit": float(sec_per),
}
)
if mode != "interpretable":
if savefile:
_write_payload(
out, savefile, fmt=fmt, indent=indent
)
return out
# -----------------------------------------------------------------
# 1) Subsidence metrics (SI m -> native unit, e.g. mm)
# -----------------------------------------------------------------
do_subs = scope in ("all", "subsidence")
if do_subs:
# metrics_evaluate: subs_pred_* metrics
me = out.get("metrics_evaluate")
if isinstance(me, dict):
for k, v in list(me.items()):
if not _is_number(v):
continue
if not k.startswith("subs_pred_"):
continue
lk = k.lower()
if "coverage" in lk or "r2" in lk:
continue
if "mse" in lk:
me[k] = float(v) * (subs_from_si**2)
elif (
("mae" in lk)
or ("rmse" in lk)
or ("sharpness" in lk)
):
me[k] = float(v) * subs_from_si
# point_metrics: (mae, mse, r2)
pm = out.get("point_metrics")
if isinstance(pm, dict):
if "mae" in pm:
_set_scaled(pm, "mae", subs_from_si)
if "mse" in pm:
_set_scaled(pm, "mse", subs_from_si**2)
if "rmse" in pm:
_set_scaled(pm, "rmse", subs_from_si)
# per_horizon: mae/mse/rmse dicts with keys H1/H2/...
ph = out.get("per_horizon")
if isinstance(ph, dict):
for metric_name, series in ph.items():
if not isinstance(series, dict):
continue
lk = str(metric_name).lower()
if lk in ("mae", "rmse", "sharpness"):
ph[metric_name] = _scale_obj(
series, subs_from_si
)
elif lk == "mse":
ph[metric_name] = _scale_obj(
series, subs_from_si**2
)
# interval_calibration: only *_phys sharpness is physical
ic = out.get("interval_calibration")
if isinstance(ic, dict):
for k, v in list(ic.items()):
if not _is_number(v):
continue
lk = k.lower()
if ("sharpness" in lk) and lk.endswith(
"_phys"
):
ic[k] = float(v) * subs_from_si
# censor_stratified: mae_* values
cs = out.get("censor_stratified")
if isinstance(cs, dict):
for k, v in list(cs.items()):
if not _is_number(v):
continue
lk = k.lower()
if lk.startswith("mae_") or ("mae" == lk):
cs[k] = float(v) * subs_from_si
elif lk.startswith("mse_") or ("mse" == lk):
cs[k] = float(v) * (subs_from_si**2)
# -----------------------------------------------------------------
# 2) Physics residual rates (SI -> interpretable)
# -----------------------------------------------------------------
do_phys = scope in ("all", "physics")
if do_phys:
unit_rate = (
f"{native_u}/{_norm_time_units(str(time_units))}"
)
# Prefer converting *_raw variants (scaled eps are often dimensionless)
me = out.get("metrics_evaluate")
if isinstance(me, dict):
if "epsilon_cons_raw" in me and _is_number(
me["epsilon_cons_raw"]
):
me["epsilon_cons_raw"] = (
float(me["epsilon_cons_raw"])
* subs_from_si
* sec_per
)
out["units"]["epsilon_cons_raw_unit"] = (
unit_rate
)
if "epsilon_gw_raw" in me and _is_number(
me["epsilon_gw_raw"]
):
me["epsilon_gw_raw"] = (
float(me["epsilon_gw_raw"]) * sec_per
)
out["units"]["epsilon_gw_raw_unit"] = (
f"1/{_norm_time_units(str(time_units))}"
)
pdg = out.get("physics_diagnostics")
if isinstance(pdg, dict):
if "epsilon_cons_raw" in pdg and _is_number(
pdg["epsilon_cons_raw"]
):
pdg["epsilon_cons_raw"] = (
float(pdg["epsilon_cons_raw"])
* subs_from_si
* sec_per
)
out["units"]["epsilon_cons_raw_unit"] = (
unit_rate
)
if "epsilon_gw_raw" in pdg and _is_number(
pdg["epsilon_gw_raw"]
):
pdg["epsilon_gw_raw"] = (
float(pdg["epsilon_gw_raw"]) * sec_per
)
out["units"]["epsilon_gw_raw_unit"] = (
f"1/{_norm_time_units(str(time_units))}"
)
if savefile:
_write_payload(out, savefile, fmt=fmt, indent=indent)
return out
def _json_default(x: object) -> object:
# numpy scalars -> python scalars
if isinstance(x, np.generic):
return x.item()
# numpy arrays -> lists
if isinstance(x, np.ndarray):
return x.tolist()
# pandas scalars
try:
import pandas as _pd # local
if isinstance(x, _pd.Timestamp):
return x.isoformat()
except Exception:
pass
# fallback: string
return str(x)
def _write_payload(
payload: Mapping[str, Any],
savefile: str,
*,
fmt: str = "json",
indent: int = 2,
) -> None:
"""
Write a payload to disk.
Parameters
----------
payload : mapping
JSON-serializable mapping.
savefile : str
Output file path.
fmt : {"json"}, default="json"
Output format.
indent : int, default=2
JSON indentation.
"""
fmt0 = (fmt or "").strip().lower()
if not fmt0:
ext = (
os.path.splitext(savefile)[1].lstrip(".").lower()
)
fmt0 = ext or "json"
if fmt0 != "json":
raise ValueError(
f"Unsupported fmt={fmt!r}. Only 'json' is supported."
)
with open(savefile, "w", encoding="utf-8") as f:
json.dump(
dict(payload),
f,
indent=int(indent),
ensure_ascii=False,
default=_json_default,
)
def _lonlat_to_local_xy_m(
lon: np.ndarray | pd.Series,
lat: np.ndarray | pd.Series,
) -> tuple[np.ndarray, np.ndarray]:
"""Local XY meters from lon/lat (fast, city-scale)."""
lon = np.asarray(lon, dtype=float)
lat = np.asarray(lat, dtype=float)
lat0 = np.nanmedian(lat)
lon0 = np.nanmedian(lon)
# meters per degree approx
m_per_deg_lat = 110_540.0
m_per_deg_lon = 111_320.0 * np.cos(np.deg2rad(lat0))
x = (lon - lon0) * m_per_deg_lon
y = (lat - lat0) * m_per_deg_lat
return x, y
def _knn_impute(
values: np.ndarray | pd.Series,
x: np.ndarray | pd.Series,
y: np.ndarray | pd.Series,
good_mask: np.ndarray,
k: int = 20,
) -> np.ndarray:
"""Median kNN impute for values where good_mask is False."""
values = np.asarray(values, dtype=float)
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
bad_mask = ~good_mask
if int(bad_mask.sum()) == 0:
return values.copy()
good_idx = np.where(good_mask)[0]
bad_idx = np.where(bad_mask)[0]
if good_idx.size == 0:
filled = values.copy()
filled[bad_idx] = np.nanmedian(values)
return filled
# If too few good points, fallback
need = max(5, min(int(k), 20))
if good_idx.size < need:
filled = values.copy()
filled[bad_idx] = np.nanmedian(values[good_idx])
return filled
p_good = np.c_[x[good_idx], y[good_idx]]
p_bad = np.c_[x[bad_idx], y[bad_idx]]
filled = values.copy()
n_nei = int(min(int(k), good_idx.size))
try: # sklearn
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(
n_neighbors=n_nei,
algorithm="kd_tree",
)
nn.fit(p_good)
neigh = nn.kneighbors(p_bad, return_distance=False)
for j, nbrs in enumerate(neigh):
src = good_idx[nbrs]
filled[bad_idx[j]] = np.nanmedian(values[src])
return filled
except:
pass
try: # scipy
from scipy.spatial import cKDTree
tree = cKDTree(p_good)
_, ii = tree.query(p_bad, k=n_nei)
if ii.ndim == 1:
ii = ii[:, None]
for j, nbrs in enumerate(ii):
src = good_idx[nbrs]
filled[bad_idx[j]] = np.nanmedian(values[src])
return filled
except:
pass
# last resort
filled[bad_idx] = np.nanmedian(values[good_idx])
return filled
def _infer_gwl_scale_auto(
g: np.ndarray,
) -> float:
"""Infer GWL scale from distribution only."""
g = np.asarray(g, dtype=float)
g = g[np.isfinite(g)]
if g.size == 0:
return 1.0
gmax = float(np.max(g))
p95 = float(np.percentile(g, 95))
p99 = float(np.percentile(g, 99))
# If it reaches ~100, it is almost surely cm.
if (gmax >= 60.0) or (p95 >= 50.0) or (p99 >= 60.0):
return 0.01
# If the whole city is <= ~35, it is
# more consistent with meters (e.g. 0-28 m).
if gmax <= 35.0:
return 1.0
# Ambiguous middle zone: default to meters.
return 1.0
def _resolve_save_path(
savefile: str | None,
) -> str | None:
"""Return a CSV path or None."""
if not savefile:
return None
path = os.fspath(savefile)
if path.endswith(os.sep):
return os.path.join(path, "cleaned_gwl_zsurf.csv")
if os.path.isdir(path):
return os.path.join(path, "cleaned_gwl_zsurf.csv")
if not path.lower().endswith(".csv"):
path = f"{path}.csv"
return path
[docs]
def clean_gwl_zsurf(
df: pd.DataFrame,
*,
lon_col: str = "longitude",
lat_col: str = "latitude",
z_col: str = "z_surf",
gwl_col: str = "GWL_depth_bgs",
gwl_scale: str | float = "auto",
max_depth_m: float = 50.0,
z_outlier_iqr_mult: float = 3.0,
knn_k: int = 25,
return_report: bool = True,
savefile: str | None = None,
) -> tuple[pd.DataFrame, dict[str, Any] | None, str | None]:
"""Fix GWL units + robust z_surf, then recompute head_m."""
df = df.copy()
# --- 0) Basic checks
required = (lon_col, lat_col, z_col, gwl_col)
for c in required:
if c not in df.columns:
raise KeyError(f"Missing column: {c!r}")
gwl_raw = f"{gwl_col}_raw"
z_raw = f"{z_col}_raw"
# Keep raw
if gwl_raw not in df.columns:
df[gwl_raw] = df[gwl_col]
if z_raw not in df.columns:
df[z_raw] = df[z_col]
# --- 1) Infer GWL scale
if gwl_scale == "auto":
g = df[gwl_col].to_numpy(dtype=float)
gwl_scale_val = _infer_gwl_scale_auto(g)
# gmax = float(np.nanmax(g))
# gmed = float(np.nanmedian(g))
# # Your case: always <=100 and city is low-lying -> cm is the safest default
# # If discover the collector used decimeters, set gwl_scale=0.1 manually.
# if (gmax <= 120.0) and (gmed >= 5.0):
# gwl_scale_val = 0.01 # cm -> m
# else:
# gwl_scale_val = 1.0 # already meters (fallback)
else:
gwl_scale_val = float(gwl_scale)
# --- 2) Convert depth-bgs to meters + clip
df["GWL_depth_bgs_m"] = np.clip(
df[gwl_col].astype(float) * gwl_scale_val,
0.0,
float(max_depth_m),
)
# --- 3) Make z_surf static per (lon,lat) using median (robust to noise)
z_loc = (
df.groupby([lon_col, lat_col], sort=False)[z_col]
.median()
.rename("z_surf_loc_med")
.reset_index()
)
# --- 4) Robust outlier detection on location-level z_surf
zvals = z_loc["z_surf_loc_med"].to_numpy(dtype=float)
q1, q3 = np.nanpercentile(zvals, [25, 75])
iqr = float(q3 - q1)
lo = float(q1 - z_outlier_iqr_mult * iqr)
hi = float(q3 + z_outlier_iqr_mult * iqr)
good = np.isfinite(zvals) & (zvals >= lo) & (zvals <= hi)
# --- 5) Spatial kNN impute for outliers
x, y = _lonlat_to_local_xy_m(
z_loc[lon_col].to_numpy(),
z_loc[lat_col].to_numpy(),
)
z_filled = _knn_impute(
zvals,
x,
y,
good_mask=good,
k=int(knn_k),
)
# Extra safety: clip to robust bounds after imputation
z_filled = np.clip(z_filled, lo, hi)
z_loc["z_surf_corr"] = z_filled
z_loc["z_surf_flag_outlier"] = ~good
# --- 6) Merge corrected z_surf back
df = df.merge(
z_loc[
[
lon_col,
lat_col,
"z_surf_corr",
"z_surf_flag_outlier",
]
],
on=[lon_col, lat_col],
how="left",
)
# Prefer corrected z_surf everywhere
df["z_surf_m"] = df["z_surf_corr"].astype(float)
df["head_m"] = df["z_surf_m"] - df["GWL_depth_bgs_m"]
saved_path = _resolve_save_path(savefile)
if saved_path is not None:
df.to_csv(saved_path, index=False)
report: dict[str, Any] | None = None
if return_report:
pct = [0.01, 0.05, 0.5, 0.95, 0.99]
report = {
"gwl_scale_used": float(gwl_scale_val),
"z_loc_bounds_iqr": {
"q1": float(q1),
"q3": float(q3),
"iqr": float(iqr),
"lo": float(lo),
"hi": float(hi),
},
"z_loc_outliers": int((~good).sum()),
"z_loc_total": int(len(z_loc)),
"depth_m_stats": df["GWL_depth_bgs_m"]
.describe(percentiles=pct)
.to_dict(),
"head_stats": df["head_m"]
.describe(percentiles=pct)
.to_dict(),
"saved_file": saved_path,
}
return df, report
[docs]
def postprocess_eval_json(
eval_json: str | Mapping[str, Any],
*,
cfg: Mapping[str, object] | None = None,
scope: Literal["all", "subsidence", "physics"] = "all",
out_path: str | None = None,
overwrite: bool = False,
add_rmse: bool = True,
force: bool = False,
indent: int = 2,
) -> dict[str, Any]:
"""
Post-hoc convert a Stage-2 evaluation JSON from SI to interpretable units.
This is a safe wrapper around `convert_eval_payload_units(...)` that:
- loads a JSON from disk (or accepts a payload dict),
- infers unit factors from `payload["units"]` when cfg is missing,
- avoids double conversion unless `force=True`,
- optionally adds RMSE fields (sqrt(MSE)),
- writes a converted JSON file if `out_path` is provided.
Parameters
----------
eval_json :
Either a file path to the saved JSON, or an in-memory mapping.
cfg :
Optional config mapping/module. If missing (or incomplete), this helper
will synthesize the minimal keys needed for conversion:
- "SUBS_UNIT_TO_SI"
- "TIME_UNITS"
scope :
Forwarded to `convert_eval_payload_units(...)`.
out_path :
If given, write the converted payload there. If a directory is given,
a filename is generated next to the input name (or "geoprior_eval...").
overwrite :
If False and out_path exists, raise.
add_rmse :
If True, add RMSE fields wherever MSE is present (metrics_evaluate,
point_metrics, per_horizon).
force :
If False, skip conversion when payload already declares an interpretable
subsidence unit (e.g., "mm"). If True, always convert.
indent :
JSON indent used when writing.
Returns
-------
dict
The converted payload (always returned as a dict).
"""
# ----------------------------
# 1) Load payload if needed
# ----------------------------
src_path = None
if isinstance(eval_json, str | os.PathLike):
src_path = os.fspath(eval_json)
with open(src_path, encoding="utf-8") as f:
payload = json.load(f)
else:
payload = dict(eval_json)
# ----------------------------
# 2) Build minimal cfg if needed
# ----------------------------
cfg_eff: dict[str, object] = dict(cfg or {})
units = payload.get("units", {})
if isinstance(units, dict):
# Help `subs_unit_to_si(cfg)` by providing SUBS_UNIT_TO_SI when missing.
if (
"SUBS_UNIT_TO_SI" not in cfg_eff
and "subs_unit_to_si" in units
):
try:
cfg_eff["SUBS_UNIT_TO_SI"] = float(
units["subs_unit_to_si"]
)
except Exception:
pass
# Help time-rate conversions (epsilon_*_raw) if present.
if (
"TIME_UNITS" not in cfg_eff
and "time_units" in units
):
cfg_eff["TIME_UNITS"] = str(units["time_units"])
# ----------------------------
# 3) Decide whether to convert
# ----------------------------
already_unit = ""
if isinstance(units, dict):
already_unit = (
str(units.get("subs_metrics_unit", ""))
.strip()
.lower()
)
# If already interpretable (e.g. "mm"), do nothing unless forced.
# Note: in your payloads, SI mode sets subs_metrics_unit="m".
do_convert = force or (
already_unit
not in ("mm", "millimeter", "millimeters")
)
out = payload
if do_convert:
# Use your existing conversion logic (DRY).
out = convert_eval_payload_units( # type: ignore[name-defined]
payload,
cfg_eff,
mode="interpretable",
scope=scope,
savefile=None, # we handle writing ourselves below
fmt="json",
indent=indent,
copy_payload=True,
)
# ----------------------------
# 4) Add RMSE fields (optional)
# ----------------------------
if add_rmse:
def _safe_sqrt(x: Any) -> float | None:
try:
v = float(x)
except Exception:
return None
if not math.isfinite(v) or v < 0.0:
return None
return float(math.sqrt(v))
me = out.get("metrics_evaluate")
if isinstance(me, dict):
# q50 convenience
if (
"subs_pred_mse_q50" in me
and "subs_pred_rmse_q50" not in me
):
r = _safe_sqrt(me.get("subs_pred_mse_q50"))
if r is not None:
me["subs_pred_rmse_q50"] = r
if (
"gwl_pred_mse_q50" in me
and "gwl_pred_rmse_q50" not in me
):
r = _safe_sqrt(me.get("gwl_pred_mse_q50"))
if r is not None:
me["gwl_pred_rmse_q50"] = r
pm = out.get("point_metrics")
if isinstance(pm, dict):
if "mse" in pm and "rmse" not in pm:
r = _safe_sqrt(pm.get("mse"))
if r is not None:
pm["rmse"] = r
ph = out.get("per_horizon")
if (
isinstance(ph, dict)
and "mse" in ph
and "rmse" not in ph
):
mse_map = ph.get("mse")
if isinstance(mse_map, dict):
rmse_map: dict[str, float] = {}
for k, v in mse_map.items():
r = _safe_sqrt(v)
if r is not None:
rmse_map[str(k)] = r
if rmse_map:
ph["rmse"] = rmse_map
# ----------------------------
# 5) Write to disk if requested
# ----------------------------
if out_path:
dst = os.fspath(out_path)
if os.path.isdir(dst):
# If caller gave a directory, build a filename.
base = "geoprior_eval_interpretable.json"
if src_path:
root = os.path.splitext(
os.path.basename(src_path)
)[0]
base = f"{root}_interpretable.json"
dst = os.path.join(dst, base)
if (not overwrite) and os.path.exists(dst):
raise FileExistsError(f"File exists: {dst}")
# Keep UTF-8 friendly (matches your writer style elsewhere).
with open(dst, "w", encoding="utf-8") as f:
json.dump(
out, f, indent=int(indent), ensure_ascii=False
)
return dict(out)
# # -----------------------
# # Recommended usage
# # -----------------------
# from geoprior.utils.subsidence_utils import clean_gwl_zsurf
# # Nansha (your diagnostics show cm -> m):
# nansha_df2, nansha_rep, _ = clean_gwl_zsurf(
# nansha_data,
# gwl_col="GWL_depth_bgs",
# gwl_scale="auto", # should pick 0.01
# )
# # Zhongshan (range 0-28 is consistent with meters):
# zhong_df2, zhong_rep, _ = clean_gwl_zsurf(
# zhongshan_data,
# gwl_col="GWL_depth_bgs",
# gwl_scale="auto", # should pick 1.0
# )