Source code for geoprior.scripts.compute_hotspots

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>
r"""Script helpers for computing spatial hotspot summaries."""

from __future__ import annotations

import argparse
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd

from . import utils as u


[docs] @dataclass(frozen=True) class CityInputs: city: str eval_csv: Path future_csv: Path
[docs] def build_parser( *, prog: str | None = None ) -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog=prog or "summarize-hotspots", description=( "Compute hotspot characteristics table " "(city × year) and export CSV/LaTeX." ), ) # City selection (optional) u.add_city_flags(p, default_both=True) # Preferred "src" mode (auto-discovery) p.add_argument("--ns-src", type=str, default=None) p.add_argument("--zh-src", type=str, default=None) # Direct file mode (overrides src discovery) p.add_argument( "--ns-eval", "--ns-val", type=str, default=None ) p.add_argument( "--zh-eval", "--zh-val", type=str, default=None ) p.add_argument("--ns-future", type=str, default=None) p.add_argument("--zh-future", type=str, default=None) p.add_argument( "--baseline-year", type=int, default=2022, help="Baseline year for anomaly (default: 2022).", ) p.add_argument( "--percentile", type=float, default=90.0, help="Hotspot threshold percentile (default: 90).", ) p.add_argument( "--subsidence-kind", type=str, default="cumulative", choices=("cumulative", "rate", "increment"), help=( "Meaning of subsidence values in CSVs. " "cumulative=yearly cumulative values; " "rate/increment=annual values directly." ), ) p.add_argument( "--baseline-source", type=str, default="actual", choices=("actual", "q50"), help=( "Baseline 2022 source in eval CSV: " "actual (default) or q50." ), ) p.add_argument( "--quantile", type=str, default="q50", choices=("q10", "q50", "q90"), help="Forecast quantile used for hotspots (default q50).", ) p.add_argument( "--years", nargs="*", type=int, default=None, help=( "Explicit years to summarize (e.g., 2025 2026). " "If omitted, uses last --n-years years in future CSV." ), ) p.add_argument( "--n-years", type=int, default=2, help="If --years omitted, use last N future years.", ) p.add_argument( "--format", type=str, default="both", choices=("csv", "tex", "both"), help="Export format (csv/tex/both).", ) p.add_argument( "--out", "-o", type=str, default="tab_hotspots", help="Output stem/path (scripts/out if relative).", ) p.add_argument( "--caption", type=str, default=( "Characteristics of forecast hotspot clusters. " "Hotspots are defined as locations where the absolute " "change in annual subsidence relative to 2022 exceeds " "the 90th percentile threshold $T_{0.9}$ within each " "city and year." ), ) p.add_argument( "--label", type=str, default="tab:hotspots", help="LaTeX label (default: tab:hotspots).", ) return p
def parse_args( argv: list[str] | None = None, *, prog: str | None = None, ) -> argparse.Namespace: return build_parser(prog=prog).parse_args(argv) def _as_path(x: str | None) -> Path | None: if x is None: return None s = str(x).strip() if not s: return None return Path(s).expanduser() def _pick_city_inputs( *, city: str, src: Path | None, eval_csv: Path | None, future_csv: Path | None, ) -> CityInputs: cc = u.canonical_city(city) if eval_csv is not None and future_csv is not None: return CityInputs(cc, eval_csv, future_csv) if src is None: raise SystemExit( f"[{cc}] Need either --{cc[:2].lower()}-src " "or both --*-eval and --*-future." ) art = u.detect_artifacts(src) ev = eval_csv fu = future_csv # eval: prefer TestSet eval, fallback to Validation if ev is None: ev = art.forecast_test_csv or art.forecast_val_csv # future: prefer TestSet future, fallback to generic future if fu is None: fu = ( art.forecast_test_future_csv or art.forecast_future_csv ) if ev is None: raise SystemExit(f"[{cc}] Could not locate eval CSV.") if fu is None: raise SystemExit( f"[{cc}] Could not locate future CSV." ) return CityInputs(cc, Path(ev), Path(fu)) def _infer_unit(df: pd.DataFrame) -> str: for c in ("subsidence_unit", "unit", "units"): if c in df.columns: s = df[c].dropna() if not s.empty: u0 = str(s.iloc[0]).strip().lower() if u0.startswith("mm"): return "mm" if u0 == "m" or u0.startswith("meter"): return "m" return "mm" def _to_mm(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: u0 = _infer_unit(df) if u0 == "mm": return df out = df.copy() for c in cols: if c in out.columns: out[c] = ( pd.to_numeric(out[c], errors="coerce") * 1000.0 ) if "subsidence_unit" in out.columns: out["subsidence_unit"] = "mm" return out def _load_eval(path: Path) -> pd.DataFrame: df = pd.read_csv(path) need = [ "sample_idx", "coord_t", "subsidence_q50", ] for c in need: if c not in df.columns: raise KeyError(f"Missing {c!r} in {path}") if "subsidence_actual" not in df.columns: # allow, but baseline_source=actual will fallback to q50 df["subsidence_actual"] = np.nan for c in ("sample_idx", "coord_t"): df[c] = pd.to_numeric(df[c], errors="coerce") for c in ( "subsidence_actual", "subsidence_q10", "subsidence_q50", "subsidence_q90", ): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") df = df.dropna(subset=["sample_idx", "coord_t"]).copy() return df def _load_future(path: Path) -> pd.DataFrame: df = pd.read_csv(path) need = [ "sample_idx", "coord_t", "subsidence_q50", ] for c in need: if c not in df.columns: raise KeyError(f"Missing {c!r} in {path}") for c in ("sample_idx", "coord_t"): df[c] = pd.to_numeric(df[c], errors="coerce") for c in ( "subsidence_q10", "subsidence_q50", "subsidence_q90", ): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") df = df.dropna(subset=["sample_idx", "coord_t"]).copy() return df def _wide( df: pd.DataFrame, *, value_col: str, ) -> pd.DataFrame: d0 = df[["sample_idx", "coord_t", value_col]].copy() d0["year"] = pd.to_numeric(d0["coord_t"], errors="coerce") d0 = d0.dropna(subset=["sample_idx", "year"]).copy() d0["year"] = d0["year"].astype(int) g = d0.groupby(["sample_idx", "year"], dropna=False)[ value_col ] s = g.mean() w = s.unstack("year") w.index = w.index.astype(int) return w def _annual_series( *, kind: str, eval_w: pd.DataFrame, fut_w: pd.DataFrame, base_year: int, years: list[int], ) -> tuple[pd.Series, dict[int, pd.Series]]: """ Returns: base_2022 (annual), and dict year->annual series. """ k = str(kind).strip().lower() # common sample set idx = eval_w.index.intersection(fut_w.index) ev = eval_w.loc[idx] fu = fut_w.loc[idx] out: dict[int, pd.Series] = {} if k in ("rate", "increment"): base = ev.get(base_year) if base is None: raise SystemExit("Baseline year missing in eval.") base = base.astype(float) for y in years: if y == base_year: out[y] = base else: if y not in fu.columns: out[y] = pd.Series(np.nan, index=idx) else: out[y] = fu[y].astype(float) return base, out # cumulative -> annual = diff if (base_year not in ev.columns) or ( (base_year - 1) not in ev.columns ): raise SystemExit( "Need baseline_year and baseline_year-1 in eval " "for cumulative->annual conversion." ) base_cum = ev[base_year].astype(float) prev_cum = ev[base_year - 1].astype(float) base = base_cum - prev_cum for y in years: if y == base_year: out[y] = base continue if y not in fu.columns: out[y] = pd.Series(np.nan, index=idx) continue if y - 1 in fu.columns: out[y] = fu[y].astype(float) - fu[y - 1].astype( float ) continue # first forecast year: use base cumulative out[y] = fu[y].astype(float) - base_cum return base, out def _summarize_city( *, city: str, eval_df: pd.DataFrame, fut_df: pd.DataFrame, years: list[int], kind: str, base_year: int, pct: float, baseline_source: str, qcol: str, ) -> pd.DataFrame: # unify units to mm for all numeric cols used eval_df = _to_mm( eval_df, [ "subsidence_actual", "subsidence_q10", "subsidence_q50", "subsidence_q90", ], ) fut_df = _to_mm( fut_df, [ "subsidence_q10", "subsidence_q50", "subsidence_q90", ], ) # pick baseline column in eval bs = str(baseline_source).strip().lower() if ( bs == "actual" and eval_df["subsidence_actual"].notna().any() ): base_col = "subsidence_actual" else: base_col = "subsidence_q50" ev_w = _wide(eval_df, value_col=base_col) fu_w = _wide(fut_df, value_col=qcol) base, annual = _annual_series( kind=kind, eval_w=ev_w, fut_w=fu_w, base_year=base_year, years=years, ) mean_2022 = float(np.nanmean(base.to_numpy(float))) rows: list[dict[str, object]] = [] for y in years: s_y = annual.get(y) if s_y is None: continue b = base.reindex(s_y.index).astype(float) s = s_y.astype(float) m = np.isfinite(b.to_numpy()) & np.isfinite( s.to_numpy() ) if not bool(np.any(m)): continue b2 = b.to_numpy()[m] s2 = s.to_numpy()[m] ds = np.abs(s2 - b2) if not np.isfinite(ds).any(): continue thr = float(np.nanpercentile(ds, float(pct))) hot = ds > thr n_hot = int(np.sum(hot)) if n_hot > 0: s_hot = s2[hot] ds_hot = ds[hot] s_min = float(np.nanmin(s_hot)) s_mean = float(np.nanmean(s_hot)) s_max = float(np.nanmax(s_hot)) d_mean = float(np.nanmean(ds_hot)) d_max = float(np.nanmax(ds_hot)) else: s_min = s_mean = s_max = np.nan d_mean = d_max = np.nan rows.append( { "City": u.canonical_city(city), "Year": int(y), "Hotspots_n": n_hot, "s_hot_min": s_min, "s_hot_mean": s_mean, "s_hot_max": s_max, "d_hot_mean": d_mean, "d_hot_max": d_max, "mean_2022": mean_2022, "T_0p9": thr, } ) return pd.DataFrame(rows) def _pick_years_from_future( fut_df: pd.DataFrame, *, base_year: int, years: list[int] | None, n_years: int, ) -> list[int]: if years: return [int(y) for y in years] ys = pd.to_numeric(fut_df["coord_t"], errors="coerce") ys = ys.dropna().astype(int) uniq = sorted( set(int(y) for y in ys if int(y) > base_year) ) if not uniq: return [] n = max(1, int(n_years)) return uniq[-n:] def _fmt_1(x: float) -> str: if x is None or not np.isfinite(float(x)): return "" return f"{float(x):.1f}" def _tex_sidewaystable( df: pd.DataFrame, *, caption: str, label: str, ) -> str: lines: list[str] = [] lines.append(r"\begin{sidewaystable}[p]") lines.append(r"\centering") lines.append(r"\caption{" + caption + r"}") lines.append(r"\label{" + label + r"}") lines.append(r"\begin{tabular}{llrccccccc}") lines.append(r"\toprule") lines.append( r"& & & \multicolumn{3}{c}{Hotspot subsidence $s_{\mathrm{hot}}$}" r" & \multicolumn{2}{c}{Hotspot anomaly $|\Delta s_{\mathrm{hot}}|$ vs.\ 2022}" r" & Mean 2022 subsidence & $T_{0.9}$ \\" ) lines.append( r"City & Year & Hotspots & \multicolumn{3}{c}{(mm yr$^{-1}$)}" r" & \multicolumn{2}{c}{(mm yr$^{-1}$)}" r" & (mm yr$^{-1}$) & (mm yr$^{-1}$) \\" ) lines.append(r"\cmidrule(lr){4-6} \cmidrule(lr){7-8}") lines.append( r"& & $n$ & min & mean & max & mean & max & & \\" ) lines.append(r"\midrule") for _, r in df.iterrows(): city = str(r["City"]) year = int(r["Year"]) n = int(r["Hotspots_n"]) smin = _fmt_1(r["s_hot_min"]) smea = _fmt_1(r["s_hot_mean"]) smax = _fmt_1(r["s_hot_max"]) dmea = _fmt_1(r["d_hot_mean"]) dmax = _fmt_1(r["d_hot_max"]) m22 = _fmt_1(r["mean_2022"]) t09 = _fmt_1(r["T_0p9"]) lines.append( f"{city} & {year} & {n} & " f"{smin} & {smea} & {smax} & " f"{dmea} & {dmax} & {m22} & {t09} \\\\" ) lines.append(r"\bottomrule") lines.append(r"\end{tabular}") lines.append(r"\end{sidewaystable}") lines.append("") lines.append(r"\clearpage") lines.append("") return "\n".join(lines)
[docs] def compute_hotspots_main( argv: list[str] | None = None, *, prog: str | None = None, ) -> None: args = parse_args(argv, prog=prog) # ensure scripts/out exists u.ensure_script_dirs() # scripts/out + scripts/figs cities = u.resolve_cities(args) ns_src = _as_path(args.ns_src) zh_src = _as_path(args.zh_src) ns_eval = _as_path(args.ns_eval) zh_eval = _as_path(args.zh_eval) ns_fut = _as_path(args.ns_future) zh_fut = _as_path(args.zh_future) inputs: list[CityInputs] = [] if "Nansha" in cities: inputs.append( _pick_city_inputs( city="Nansha", src=ns_src, eval_csv=ns_eval, future_csv=ns_fut, ) ) if "Zhongshan" in cities: inputs.append( _pick_city_inputs( city="Zhongshan", src=zh_src, eval_csv=zh_eval, future_csv=zh_fut, ) ) all_rows: list[pd.DataFrame] = [] for ci in inputs: ev = _load_eval(ci.eval_csv) fu = _load_future(ci.future_csv) years = _pick_years_from_future( fu, base_year=int(args.baseline_year), years=args.years, n_years=int(args.n_years), ) if not years: raise SystemExit( f"[{ci.city}] No future years found." ) qcol = ( "subsidence_" + str(args.quantile).strip().lower() ) d = _summarize_city( city=ci.city, eval_df=ev, fut_df=fu, years=years, kind=str(args.subsidence_kind), base_year=int(args.baseline_year), pct=float(args.percentile), baseline_source=str(args.baseline_source), qcol=qcol, ) all_rows.append(d) out = pd.concat(all_rows, ignore_index=True) out = out.sort_values(["City", "Year"]).reset_index( drop=True ) out_path = u.resolve_out_out(str(args.out)) stem = out_path if stem.suffix: stem = stem.with_suffix("") if args.format in ("csv", "both"): csv_p = stem.with_suffix(".csv") csv_p.parent.mkdir(parents=True, exist_ok=True) out.to_csv(csv_p, index=False) if args.format in ("tex", "both"): tex_p = stem.with_suffix(".tex") tex_p.parent.mkdir(parents=True, exist_ok=True) tex = _tex_sidewaystable( out, caption=str(args.caption), label=str(args.label), ) tex_p.write_text(tex, encoding="utf-8")
if __name__ == "__main__": compute_hotspots_main()