Source code for geoprior.scripts.plot_external_validation

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 - https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>

"""
Plot point-support external validation of inferred
effective fields.

Panels
------
a) Zhongshan H_eff field with borehole / pumping-test
   locations (SW1--SW5) and matched pixels.
b) Borehole-derived compressible thickness vs model H_eff.
c) Late-step specific capacity vs model effective K.

Inputs
------
- site-level validation table (.csv or .xlsx)
- full_inputs.npz
- physics_payload_fullcity.npz
- optional coord scaler (.joblib)

Outputs
-------
- <out>.png
- <out>.svg
- optional summary JSON
"""

from __future__ import annotations

import argparse
import json
import math
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.ticker import ScalarFormatter

from . import config as cfg
from . import utils

ArrayDict = dict[str, np.ndarray]


[docs] def load_npz_dict(path: str) -> ArrayDict: with np.load(path, allow_pickle=False) as z: return {k: z[k] for k in z.files}
[docs] def read_table(path: str) -> pd.DataFrame: p = Path(path).expanduser() suf = p.suffix.lower() if suf in {".xlsx", ".xls"}: return pd.read_excel(p) return pd.read_csv(p)
[docs] def load_boundary_xy( path: str | None, ) -> tuple[np.ndarray, np.ndarray] | None: """ Load an optional boundary polyline/polygon. Preferred behaviour: - if nat.com.utils exposes load_boundary_xy(), use it - otherwise fall back to CSV/XLSX with x/y-like columns Accepted fallback column names: x, x_m, coord_x y, y_m, coord_y NaN rows are preserved so multi-part boundaries can be drawn with line breaks. """ if not path: return None if hasattr(utils, "load_boundary_xy"): try: out = utils.load_boundary_xy(path) if isinstance(out, tuple) and len(out) == 2: bx = np.asarray(out[0], dtype=float) by = np.asarray(out[1], dtype=float) if ( bx.ndim == 1 and by.ndim == 1 and bx.size == by.size ): return bx, by except Exception: pass df = read_table(path) x_col = next( ( c for c in ("x", "x_m", "coord_x") if c in df.columns ), None, ) y_col = next( ( c for c in ("y", "y_m", "coord_y") if c in df.columns ), None, ) if x_col is None or y_col is None: raise KeyError( "Boundary file must contain one of " "{'x','x_m','coord_x'} and one of " "{'y','y_m','coord_y'}." ) bx = pd.to_numeric(df[x_col], errors="coerce").to_numpy( dtype=float ) by = pd.to_numeric(df[y_col], errors="coerce").to_numpy( dtype=float ) ok = np.isfinite(bx) & np.isfinite(by) if not np.any(ok): raise ValueError( "Boundary file does not contain any finite x/y pairs." ) return bx, by
[docs] def apply_no_offset( ax, *, x: bool = True, y: bool = True, ) -> None: """ Disable Matplotlib offset/scientific-offset text on linear axes. """ if x and ax.get_xscale() == "linear": fmt_x = ScalarFormatter(useOffset=False) fmt_x.set_scientific(False) ax.xaxis.set_major_formatter(fmt_x) if y and ax.get_yscale() == "linear": fmt_y = ScalarFormatter(useOffset=False) fmt_y.set_scientific(False) ax.yaxis.set_major_formatter(fmt_y)
[docs] def inverse_txy( coords_bh3: np.ndarray, coord_scaler_path: str | None, ) -> np.ndarray: first = np.asarray(coords_bh3[:, 0, :], dtype=float) if not coord_scaler_path: return first scaler = joblib.load(coord_scaler_path) return scaler.inverse_transform(first)
[docs] def reduce_horizon( arr: np.ndarray, n_seq: int, horizon: int, how: str, ) -> np.ndarray: x = np.asarray(arr, dtype=float) if x.size == n_seq: x = x.reshape(n_seq, 1) elif x.size == n_seq * horizon: x = x.reshape(n_seq, horizon) else: raise ValueError( "Cannot align array with sequences. " f"size={x.size}, n_seq={n_seq}, horizon={horizon}" ) how = str(how).strip().lower() if how == "first": return x[:, 0] if how == "mean": return np.mean(x, axis=1) if how == "median": return np.median(x, axis=1) raise ValueError( "horizon reducer must be one of " "{'first','mean','median'}" )
[docs] def build_pixel_table( inputs_npz: str, payload_npz: str, coord_scaler_path: str | None, horizon_reducer: str, site_reducer: str, ) -> pd.DataFrame: x_np = load_npz_dict(inputs_npz) payload = load_npz_dict(payload_npz) coords = np.asarray(x_np["coords"], dtype=float) n_seq, horizon, _ = coords.shape txy = inverse_txy(coords, coord_scaler_path) x = txy[:, 1] y = txy[:, 2] h_eff = reduce_horizon( x_np["H_field"], n_seq=n_seq, horizon=horizon, how=horizon_reducer, ) k_vals = reduce_horizon( payload["K"], n_seq=n_seq, horizon=horizon, how=horizon_reducer, ) hd_vals = reduce_horizon( payload["Hd"], n_seq=n_seq, horizon=horizon, how=horizon_reducer, ) df = pd.DataFrame( { "seq_idx": np.arange(n_seq, dtype=int), "x": x, "y": y, "H_eff_input_m": h_eff, "K_mps": k_vals, "Hd_m": hd_vals, } ) agg_fun = ( "mean" if str(site_reducer).lower() == "mean" else "median" ) pix = ( df.groupby(["x", "y"], as_index=False)[ ["H_eff_input_m", "K_mps", "Hd_m"] ] .agg(agg_fun) .reset_index(drop=True) ) pix["pixel_idx"] = np.arange(len(pix), dtype=int) return pix
[docs] def grid_mean( x: np.ndarray, y: np.ndarray, v: np.ndarray, *, res: int, extent: tuple[float, float, float, float], ) -> tuple[np.ndarray, tuple[float, float, float, float]]: xmin, xmax, ymin, ymax = extent xb = np.linspace(xmin, xmax, int(res) + 1) yb = np.linspace(ymin, ymax, int(res) + 1) sumv, _, _ = np.histogram2d( x, y, bins=[xb, yb], weights=v, ) cnts, _, _ = np.histogram2d( x, y, bins=[xb, yb], ) with np.errstate(invalid="ignore", divide="ignore"): z = sumv / cnts z[cnts == 0] = np.nan return (z.T, extent)
[docs] def field_extent( pixels: pd.DataFrame, pad_frac: float = 0.02, ) -> tuple[float, float, float, float]: x = pixels["x"].to_numpy(dtype=float) y = pixels["y"].to_numpy(dtype=float) xmin = float(np.nanmin(x)) xmax = float(np.nanmax(x)) ymin = float(np.nanmin(y)) ymax = float(np.nanmax(y)) dx = max((xmax - xmin) * float(pad_frac), 1.0) dy = max((ymax - ymin) * float(pad_frac), 1.0) return ( xmin - dx, xmax + dx, ymin - dy, ymax + dy, )
[docs] def spearman_rho(x: np.ndarray, y: np.ndarray) -> float: x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) mask = np.isfinite(x) & np.isfinite(y) x = x[mask] y = y[mask] if x.size != y.size or x.size < 2: return float("nan") # Only return NaN if one vector is truly constant if np.unique(x).size < 2 or np.unique(y).size < 2: return float("nan") xr = pd.Series(x).rank(method="average").to_numpy() yr = pd.Series(y).rank(method="average").to_numpy() xs = xr - xr.mean() ys = yr - yr.mean() den = math.sqrt(float(np.sum(xs * xs) * np.sum(ys * ys))) if den <= 0.0: return float("nan") return float(np.sum(xs * ys) / den)
[docs] def mae(x: np.ndarray, y: np.ndarray) -> float: x = np.asarray(x, dtype=float) y = np.asarray(y, dtype=float) mask = np.isfinite(x) & np.isfinite(y) return float(np.mean(np.abs(x[mask] - y[mask])))
[docs] def median_bias(obs: np.ndarray, pred: np.ndarray) -> float: obs = np.asarray(obs, dtype=float) pred = np.asarray(pred, dtype=float) mask = np.isfinite(obs) & np.isfinite(pred) return float(np.median(pred[mask] - obs[mask]))
[docs] def annotate_sites( ax, df: pd.DataFrame, *, x_col: str, y_col: str, label_col: str = "well_id", fontsize: int = 8, ) -> None: for _, row in df.iterrows(): ax.text( float(row[x_col]), float(row[y_col]), str(row[label_col]), ha="left", va="bottom", fontsize=fontsize, fontweight="bold", )
[docs] def plot_external_validation( *, site_csv: str, full_inputs_npz: str, full_payload_npz: str, coord_scaler: str | None, out: str, out_json: str | None, horizon_reducer: str, site_reducer: str, grid_res: int, dpi: int, font: int, city: str, show_legend: bool, show_labels: bool, show_ticklabels: bool, show_title: bool, show_panel_titles: bool, title: str | None, boundary: str | None, paper_format: bool, paper_no_offset: bool, ) -> None: utils.ensure_script_dirs() utils.set_paper_style( fontsize=int(font), dpi=int(dpi), ) site_df = read_table(site_csv).copy() pix = build_pixel_table( full_inputs_npz, full_payload_npz, coord_scaler, horizon_reducer, site_reducer, ) boundary_xy = load_boundary_xy(boundary) if paper_format and title is None: show_title = False map_site_size = 22 if paper_format else 28 map_match_size = 38 if paper_format else 45 map_linewidth = 0.55 if paper_format else 0.6 map_edgewidth = 0.7 if paper_format else 0.8 label_font = max(7, int(font) - 1) fig_w = 8.8 if paper_format else 9.2 fig_h = 3.0 if paper_format else 3.3 needed = { "well_id", "x", "y", "matched_pixel_x", "matched_pixel_y", "model_H_eff_m", "model_K_mps", "approx_compressible_thickness_m", "step3_specific_capacity_Lps_per_m", "match_distance_m", } miss = [c for c in needed if c not in site_df.columns] if miss: raise KeyError(f"Site table missing columns: {miss}") ext = field_extent(pix) z_h, ext2 = grid_mean( pix["x"].to_numpy(float), pix["y"].to_numpy(float), pix["H_eff_input_m"].to_numpy(float), res=int(grid_res), extent=ext, ) obs_h = site_df[ "approx_compressible_thickness_m" ].to_numpy(dtype=float) mod_h = site_df["model_H_eff_m"].to_numpy(dtype=float) prod = site_df[ "step3_specific_capacity_Lps_per_m" ].to_numpy(dtype=float) mod_k = site_df["model_K_mps"].to_numpy(dtype=float) rho_h = spearman_rho(obs_h, mod_h) mae_h = mae(obs_h, mod_h) bias_h = median_bias(obs_h, mod_h) rho_k = spearman_rho(prod, mod_k) fig, axes = plt.subplots( nrows=1, ncols=3, figsize=(fig_w, fig_h), constrained_layout=False, ) plt.subplots_adjust( left=0.06, right=0.93 if show_legend else 0.98, top=0.84 if paper_format else 0.86, bottom=0.16, wspace=0.28 if paper_format else 0.33, ) city_name = utils.canonical_city(city) city_color = cfg.CITY_COLORS.get(city_name, "#333333") # ------------------------------------------------- # a) map # ------------------------------------------------- ax = axes[0] im = ax.imshow( z_h, extent=ext2, origin="lower", cmap="viridis", interpolation="nearest", aspect="equal", ) if boundary_xy is not None: bx, by = boundary_xy ax.plot( bx, by, color="black", linewidth=0.7 if paper_format else 0.8, alpha=0.9, zorder=2, ) ax.scatter( site_df["x"].to_numpy(float), site_df["y"].to_numpy(float), marker="o", s=map_site_size, facecolors="white", edgecolors="black", linewidths=map_edgewidth, zorder=3, ) ax.scatter( site_df["matched_pixel_x"].to_numpy(float), site_df["matched_pixel_y"].to_numpy(float), marker="+", s=map_match_size, color="black", linewidths=0.9, zorder=3, ) for _, row in site_df.iterrows(): ax.plot( [ float(row["x"]), float(row["matched_pixel_x"]), ], [ float(row["y"]), float(row["matched_pixel_y"]), ], color="#666666", linewidth=map_linewidth, alpha=0.8, zorder=2, ) annotate_sites( ax, site_df, x_col="x", y_col="y", fontsize=label_font, ) if show_panel_titles: ax.set_title( r"(a) $H_{\mathrm{eff}}$ field and sites", loc="left", fontweight="bold", pad=2.0 if paper_format else None, ) if show_labels: ax.set_xlabel("x (m)") ax.set_ylabel("y (m)") if not show_ticklabels: ax.set_xticks([]) ax.set_yticks([]) if paper_no_offset: apply_no_offset(ax, x=True, y=True) if show_legend: cb = fig.colorbar( im, ax=ax, fraction=0.046, pad=0.04, ) if show_labels: cb.set_label(r"$H_{\mathrm{eff}}$ (m)") # ------------------------------------------------- # b) borehole thickness vs model H_eff # ------------------------------------------------- ax = axes[1] ax.scatter( obs_h, mod_h, s=36, marker="o", color=city_color, edgecolors="black", linewidths=0.6, zorder=3, ) for _, row in site_df.iterrows(): ax.text( float(row["approx_compressible_thickness_m"]), float(row["model_H_eff_m"]), str(row["well_id"]), ha="left", va="bottom", fontsize=label_font, # fontsize=max(7, int(font) - 1), ) vmin = float(min(np.nanmin(obs_h), np.nanmin(mod_h), 0.0)) vmax = float(max(np.nanmax(obs_h), np.nanmax(mod_h))) pad = 0.05 * max(vmax - vmin, 1.0) lo = vmin - pad hi = vmax + pad ax.plot( [lo, hi], [lo, hi], linestyle="--", color="#444444", linewidth=0.8, zorder=1, ) ax.axhline( 30.0, linestyle=":", color="#444444", linewidth=0.9, zorder=1, ) ax.set_xlim(lo, hi) ax.set_ylim(lo, hi) txt = ( rf"$\rho={rho_h:.2f}$" "\n" rf"MAE$={mae_h:.2f}$ m" "\n" rf"bias$={bias_h:.2f}$ m" ) ax.text( 0.97, 0.03, txt, transform=ax.transAxes, ha="right", va="bottom", bbox=dict( boxstyle="round,pad=0.2", facecolor="white", alpha=0.9, linewidth=0.0, ), ) if show_panel_titles: ax.set_title( r"(b) Borehole thickness vs model " r"$H_{\mathrm{eff}}$", loc="left", fontweight="bold", pad=2.0 if paper_format else None, ) if show_labels: ax.set_xlabel("Borehole thickness (m)") ax.set_ylabel(r"Model $H_{\mathrm{eff}}$ (m)") if paper_no_offset: apply_no_offset(ax, x=True, y=True) # ------------------------------------------------- # c) late-step specific capacity vs model K # ------------------------------------------------- ax = axes[2] ax.scatter( prod, mod_k, s=36, marker="o", color=city_color, edgecolors="black", linewidths=0.6, zorder=3, ) for _, row in site_df.iterrows(): ax.text( float(row["step3_specific_capacity_Lps_per_m"]), float(row["model_K_mps"]), str(row["well_id"]), ha="left", va="bottom", # fontsize=max(7, int(font) - 1), fontsize=label_font, ) ax.set_yscale("log") txt = rf"$\rho={rho_k:.2f}$" ax.text( 0.97, 0.03, txt, transform=ax.transAxes, ha="right", va="bottom", bbox=dict( boxstyle="round,pad=0.2", facecolor="white", alpha=0.9, linewidth=0.0, ), ) if show_panel_titles: ax.set_title( r"(c) Specific capacity vs model $K$", loc="left", fontweight="bold", pad=2.0 if paper_format else None, ) if show_labels: ax.set_xlabel( r"Late-step specific capacity " r"(L s$^{-1}$ m$^{-1}$)" ) ax.set_ylabel(r"Model $K$ (m s$^{-1}$)") if paper_no_offset: apply_no_offset(ax, x=True, y=False) for ax in axes: for side in ("top", "right"): ax.spines[side].set_visible(False) if show_title: ttl = utils.resolve_title( default=( "Zhongshan point-support external " "validation of inferred effective fields" ), title=title, ) fig.suptitle(ttl, x=0.02, ha="left") utils.save_figure( fig, out, dpi=int(dpi), ) if out_json: p = utils.resolve_out_out(out_json) if p.suffix.lower() != ".json": p = p.with_suffix(".json") p.parent.mkdir(parents=True, exist_ok=True) rec = { "city": city_name, "n_sites": int(len(site_df)), "borehole_vs_H_eff": { "spearman_rho": rho_h, "mae_m": mae_h, "median_bias_m": bias_h, }, "pumping_vs_K": { "spearman_rho": rho_k, }, "match_distance_m": { "min": float( site_df["match_distance_m"].min() ), "median": float( site_df["match_distance_m"].median() ), "max": float( site_df["match_distance_m"].max() ), }, "plot_options": { "boundary": ( str(Path(boundary).resolve()) if boundary else None ), "paper_format": bool(paper_format), "paper_no_offset": bool(paper_no_offset), }, "files": { "site_csv": str(Path(site_csv).resolve()), "full_inputs_npz": str( Path(full_inputs_npz).resolve() ), "full_payload_npz": str( Path(full_payload_npz).resolve() ), "coord_scaler": ( str(Path(coord_scaler).resolve()) if coord_scaler else None ), }, } p.write_text( json.dumps(rec, indent=2), encoding="utf-8", ) print(f"[OK] wrote {p}")
def _add_args(ap: argparse.ArgumentParser) -> None: ap.add_argument( "--site-csv", type=str, required=True, help="Site-level validation CSV/XLSX.", ) ap.add_argument( "--full-inputs-npz", type=str, required=True, ) ap.add_argument( "--full-payload-npz", type=str, required=True, ) ap.add_argument( "--coord-scaler", type=str, default=None, ) ap.add_argument( "--city", type=str, default="Zhongshan", ) ap.add_argument( "--horizon-reducer", type=str, default="mean", choices=["first", "mean", "median"], ) ap.add_argument( "--boundary", type=str, default=None, help=( "Optional boundary file for panel (a). " "Uses utils.load_boundary_xy() if available; " "otherwise expects CSV/XLSX with x/y-like columns." ), ) ap.add_argument( "--paper-format", action="store_true", help="Use a tighter paper-friendly layout.", ) ap.add_argument( "--paper-no-offset", action="store_true", help=( "Disable Matplotlib axis offset / scientific-offset text " "on linear axes." ), ) ap.add_argument( "--site-reducer", type=str, default="median", choices=["mean", "median"], ) ap.add_argument( "--grid-res", type=int, default=260, ) ap.add_argument( "--dpi", type=int, default=cfg.PAPER_DPI, ) ap.add_argument( "--font", type=int, default=cfg.PAPER_FONT, ) utils.add_plot_text_args( ap, default_out="supp_external_validation", ) ap.add_argument( "--out-json", type=str, default=None, )
[docs] def plot_external_validation_main( argv: list[str] | None = None, *, prog: str | None = None, ) -> None: ap = argparse.ArgumentParser( prog=prog or "plot-external-validation", description=( "Supplementary figure for Zhongshan " "point-support external validation." ), ) _add_args(ap) args = ap.parse_args(argv) show_legend = utils.str_to_bool( args.show_legend, default=True, ) show_labels = utils.str_to_bool( args.show_labels, default=True, ) show_ticklabels = utils.str_to_bool( args.show_ticklabels, default=True, ) show_title = utils.str_to_bool( args.show_title, default=True, ) show_panel_titles = utils.str_to_bool( args.show_panel_titles, default=True, ) plot_external_validation( site_csv=str(args.site_csv), full_inputs_npz=str(args.full_inputs_npz), full_payload_npz=str(args.full_payload_npz), coord_scaler=args.coord_scaler, out=str(args.out), out_json=args.out_json, horizon_reducer=str(args.horizon_reducer), site_reducer=str(args.site_reducer), grid_res=int(args.grid_res), dpi=int(args.dpi), font=int(args.font), city=str(args.city), show_legend=show_legend, show_labels=show_labels, show_ticklabels=show_ticklabels, show_title=show_title, show_panel_titles=show_panel_titles, title=args.title, boundary=args.boundary, paper_format=bool(args.paper_format), paper_no_offset=bool(args.paper_no_offset), )
[docs] def main( argv: list[str] | None = None, *, prog: str | None = None, ) -> None: plot_external_validation_main(argv, prog=prog)
if __name__ == "__main__": main() # Daniel@daniel03 MINGW64 /d/projects/geoprior-v3 (develop) # $ python -m scripts plot-external-validation --site-csv D:/projects/geoprior-v3/nat.com/boreholes_zhongshan_with_model.csv --full-inputs-npz E:/nature/results/zhongshan_GeoPriorSubsNet_stage1/external_validation_fullcity/full_inputs.npz --full-payload-npz E:/nature/results/zhongshan_GeoPriorSubsNet_stage1/external_validation_fullcity/physics_payload_fullcity.npz --coord-scaler E:/nature/results/zhongshan_GeoPriorSubsNet_stage1/artifacts/zhongshan_coord_scaler.joblib --city Zhongshan --out-json supp_zhongshan_external_validation.json --paper-format --paper-no-offset --out "scripts/out/FigS06"