Source code for geoprior.scripts.plot_hotspot_analytics

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>

r"""Script helpers for plotting hotspot analytics."""


from __future__ import annotations

import argparse
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import (
    Any,
)

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from . import config as cfg
from . import utils

_CITY_A = cfg.CITY_CANON.get("ns", "Nansha")
_CITY_B = cfg.CITY_CANON.get("zh", "Zhongshan")


GeoDF = Any


def _slug_city(city: str) -> str:
    return str(city).strip().lower().replace(" ", "_")


# ---------------------------------------------------------------------
# Exceedance probability from quantiles (q10/q50/q90)
# (same logic as geoprior.utils.transfer.xfer_risk)
# ---------------------------------------------------------------------
[docs] def exceed_prob_from_quantiles( q10: np.ndarray, q50: np.ndarray, q90: np.ndarray, *, threshold: float, ) -> np.ndarray: qs = np.stack([q10, q50, q90], axis=1).astype(float) ps0 = np.array([0.1, 0.5, 0.9], dtype=float) ok = np.isfinite(qs).all(axis=1) out = np.full(qs.shape[0], np.nan, dtype=float) if not np.any(ok): return out qs_ok = qs[ok] order = np.argsort(qs_ok, axis=1) qs_s = np.take_along_axis(qs_ok, order, axis=1) ps_s = np.take(ps0, order) T = float(threshold) F = np.empty(qs_s.shape[0], dtype=float) lo = T <= qs_s[:, 0] hi = T >= qs_s[:, 2] mid = ~(lo | hi) F[lo] = ps_s[lo, 0] F[hi] = ps_s[hi, 2] if np.any(mid): q1 = qs_s[mid, 1] seg = (T > q1).astype(int) ii = np.arange(seg.size) x0 = qs_s[mid][ii, seg] x1 = qs_s[mid][ii, seg + 1] p0 = ps_s[mid][ii, seg] p1 = ps_s[mid][ii, seg + 1] den = x1 - x0 same = den == 0.0 Fm = np.empty(seg.size, dtype=float) Fm[same] = p1[same] Fm[~same] = p0[~same] + ( (T - x0[~same]) / den[~same] ) * (p1[~same] - p0[~same]) F[mid] = Fm out[ok] = 1.0 - F return out
# --------------------------------------------------------------------- # IO + canonization # --------------------------------------------------------------------- def _require_cols( df: pd.DataFrame, cols: Sequence[str], *, where: str, ) -> None: miss = [c for c in cols if c not in df.columns] if miss: raise KeyError(f"{where}: missing {miss}") _ALIASES = dict(cfg._BASE_ALIASES) _ALIASES.update( { "subsidence_q10": ("subsidence_q10", "q10", "p10"), "subsidence_q90": ("subsidence_q90", "q90", "p90"), } ) def _canonize( df: pd.DataFrame, *, where: str, required: Sequence[str], ) -> pd.DataFrame: utils.ensure_columns(df, aliases=_ALIASES) _require_cols(df, list(required), where=where) return df def _load_exposure(path: str, *, col: str) -> pd.DataFrame: p = utils.as_path(path) df = pd.read_csv(p) utils.ensure_columns( df, aliases={ "sample_idx": ("sample_idx", "sample_id"), col: (col,), }, ) if "sample_idx" not in df.columns: raise KeyError("exposure: missing sample_idx") if col not in df.columns: raise KeyError(f"exposure: missing {col}") df["sample_idx"] = pd.to_numeric( df["sample_idx"], errors="coerce" ) df[col] = pd.to_numeric(df[col], errors="coerce") df = df.dropna(subset=["sample_idx", col]).copy() df["sample_idx"] = df["sample_idx"].astype(int) # CRITICAL: avoid duplicates that explode rows after merge df = ( df.groupby("sample_idx", as_index=False)[col] .mean() .copy() ) return df[["sample_idx", col]] def _attach_exposure( pts: pd.DataFrame, expo: pd.DataFrame | None, *, col: str, default: float, ) -> pd.DataFrame: out = pts.copy() if expo is None or expo.empty: out["exposure"] = float(default) return out e = expo.rename(columns={col: "exposure"}).copy() out = out.merge(e, on="sample_idx", how="left") out["exposure"] = out["exposure"].fillna(float(default)) return out def _load_eval_df(path: str) -> pd.DataFrame: p = utils.as_path(path) df = pd.read_csv(p) _canonize( df, where="eval_calibrated", required=( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_actual", "subsidence_q50", ), ) for c in ( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_actual", "subsidence_q50", ): df[c] = pd.to_numeric(df[c], errors="coerce") for c in ("subsidence_q10", "subsidence_q90"): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") return df.dropna( subset=( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_actual", "subsidence_q50", ) ).copy() def _load_future_df(path: str) -> pd.DataFrame: p = utils.as_path(path) df = pd.read_csv(p) _canonize( df, where="future_forecast", required=( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", ), ) for c in ( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", ): df[c] = pd.to_numeric(df[c], errors="coerce") return df.dropna( subset=( "sample_idx", "coord_t", "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", ) ).copy() def _pick_eval_path( art: utils.Artifacts, split: str, ) -> tuple[Path | None, str]: if split == "val": return (art.forecast_val_csv, "val") if split == "test": return (art.forecast_test_csv, "test") if art.forecast_test_csv is not None: return (art.forecast_test_csv, "test") return (art.forecast_val_csv, "val") def _pick_future_path( art: utils.Artifacts, split: str, ) -> tuple[Path | None, str]: if split == "val": return (art.forecast_future_csv, "val") if split == "test": return (art.forecast_test_future_csv, "test") if art.forecast_test_future_csv is not None: return (art.forecast_test_future_csv, "test") return (art.forecast_future_csv, "val") def _resolve_city( *, city: str, src: str | None, eval_csv: str | None, future_csv: str | None, split: str, ) -> dict[str, Any]: out: dict[str, Any] = {"name": city} out["color"] = cfg.CITY_COLORS.get(city, "#333333") if eval_csv and future_csv: out["eval_df"] = _load_eval_df(eval_csv) out["future_df"] = _load_future_df(future_csv) out["split"] = split out["src_note"] = "manual" return out if not src: k = city[:2].lower() raise ValueError( f"{city}: provide --{k}-src or both " f"--{k}-eval and --{k}-future" ) art = utils.detect_artifacts(src) e_p, e_lab = _pick_eval_path(art, split) f_p, f_lab = _pick_future_path(art, split) if e_p is None: raise FileNotFoundError( f"{city}: no eval calibrated CSV under {src}" ) if f_p is None: raise FileNotFoundError( f"{city}: no future CSV under {src}" ) out["eval_df"] = _load_eval_df(str(e_p)) out["future_df"] = _load_future_df(str(f_p)) out["split"] = e_lab out["src_note"] = ( f"{e_p.name} + {f_p.name} ({e_lab}/{f_lab})" ) return out def _try_load_boundaries(path: str | None): if not path: return None try: import geopandas as gpd except Exception: print("[WARN] geopandas missing; skip boundary.") return None try: gdf = gpd.read_file(utils.as_path(path)) return gdf except Exception as e: print(f"[WARN] boundary load failed: {e}") return None def _filter_boundary_for_city( gdf: GeoDF, *, city: str, name_field: str | None = None, ) -> GeoDF: """ Try to filter a boundary GeoDataFrame to the requested city. Rules: - If name_field is provided and exists, use it. - Else if column 'city' exists, use it. - Else return gdf unchanged. """ if gdf is None: return None try: cols = set(getattr(gdf, "columns", [])) except: return gdf key = None if name_field and name_field in cols: key = str(name_field) elif "city" in cols: key = "city" if not key: return gdf try: s = gdf[key].astype(str).str.strip().str.lower() want0 = str(city).strip().lower() want1 = _slug_city(city) m = (s == want0) | (s.str.replace(" ", "_") == want1) sub = gdf.loc[m].copy() return sub if not sub.empty else gdf except: return gdf def _resolve_boundary_map( *, boundary: str | None, ns_boundary: str | None, zh_boundary: str | None, boundary_name_field: str | None, cities: Sequence[str], ) -> dict[str, GeoDF] | None: """ Return a per-city boundary mapping {city: GeoDataFrame}. """ out: dict[str, GeoDF] = {} g_ns = _try_load_boundaries(ns_boundary) if g_ns is not None: out[_CITY_A] = g_ns g_zh = _try_load_boundaries(zh_boundary) if g_zh is not None: out[_CITY_B] = g_zh g0 = _try_load_boundaries(boundary) if g0 is not None: for c in cities: if c in out: continue out[c] = _filter_boundary_for_city( g0, city=str(c), name_field=boundary_name_field, ) # drop Nones out = {k: v for k, v in out.items() if v is not None} return out if out else None # --------------------------------------------------------------------- # Annual conversion (subsidence-kind aware) # --------------------------------------------------------------------- def _annual_from_cumulative( df: pd.DataFrame, *, value_col: str, year_col: str = "coord_t", id_col: str = "sample_idx", ) -> pd.Series: d = df[[id_col, year_col, value_col]].copy() orig_idx = d.index d[year_col] = pd.to_numeric(d[year_col], errors="coerce") d[value_col] = pd.to_numeric( d[value_col], errors="coerce" ) d = d.dropna(subset=[id_col, year_col, value_col]) d[year_col] = d[year_col].astype(int) d = d.sort_values([id_col, year_col]) prev = d.groupby(id_col)[value_col].shift(1) out = d[value_col] - prev # If first year has no previous, treat previous as 0 (rebased). out = out.fillna(d[value_col]) out.index = d.index return out.reindex(orig_idx) def _annual_series( df: pd.DataFrame, *, value_col: str, kind: str, ) -> pd.Series: k = str(kind).strip().lower() if k in ("rate", "increment"): return pd.to_numeric( df[value_col], errors="coerce" ).astype(float) return _annual_from_cumulative(df, value_col=value_col) # --------------------------------------------------------------------- # Raster + clustering # --------------------------------------------------------------------- def _extent( df: pd.DataFrame, ) -> tuple[float, float, float, float]: x = pd.to_numeric( df["coord_x"], errors="coerce" ).to_numpy(float) y = pd.to_numeric( df["coord_y"], errors="coerce" ).to_numpy(float) x = x[np.isfinite(x)] y = y[np.isfinite(y)] if x.size == 0 or y.size == 0: return (0.0, 1.0, 0.0, 1.0) return ( float(np.nanmin(x)), float(np.nanmax(x)), float(np.nanmin(y)), float(np.nanmax(y)), ) def _grid_mean( x: np.ndarray, y: np.ndarray, v: np.ndarray, *, res: int, extent: tuple[float, float, float, float], ) -> tuple[np.ndarray, tuple[float, float, float, float]]: xmin, xmax, ymin, ymax = extent xb = np.linspace(xmin, xmax, int(res) + 1) yb = np.linspace(ymin, ymax, int(res) + 1) sumv, _, _ = np.histogram2d( x, y, bins=[xb, yb], weights=v ) cnts, _, _ = np.histogram2d(x, y, bins=[xb, yb]) with np.errstate(invalid="ignore", divide="ignore"): z = sumv / cnts z[cnts == 0] = np.nan return (z.T, extent) def _grid_hotspot_mask( z_d: np.ndarray, z_p: np.ndarray, *, thr: float, hotspot_rule: str, hotspot_abs: float | None, risk_min: float | None, ) -> np.ndarray: r = str(hotspot_rule).strip().lower() if r == "abs" and hotspot_abs is not None: return np.isfinite(z_d) & (z_d >= float(hotspot_abs)) if r == "risk" and risk_min is not None: return np.isfinite(z_p) & (z_p >= float(risk_min)) if r == "combo": m1 = ( (np.isfinite(z_d) & (z_d >= float(hotspot_abs))) if hotspot_abs is not None else np.zeros_like(z_d, bool) ) m2 = ( (np.isfinite(z_p) & (z_p >= float(risk_min))) if risk_min is not None else np.zeros_like(z_p, bool) ) m = m1 | m2 if np.any(m): return m # fallback if both thresholds missing # percentile fallback return np.isfinite(z_d) & (z_d >= float(thr)) def _compute_persistence_grid( *, dfp: pd.DataFrame, dfy: pd.DataFrame, years: Sequence[int], extent: tuple[float, float, float, float], grid_res: int, hotspot_rule: str, hotspot_abs: float | None, risk_min: float | None, mode: str, ) -> tuple[ np.ndarray, tuple[float, float, float, float], int ]: """ Compute per-cell hotspot persistence across years. mode: - fraction: count / valid_years per cell (0..1) - count: number of years a cell is hotspot (0..N) """ yrs = sorted({int(y) for y in years}) if not yrs: z = np.full( (int(grid_res), int(grid_res)), np.nan, float ) return z, extent, 0 # shape will be inferred from first grid we compute pers_cnt = None valid_cnt = None ext2 = extent for yy in yrs: sub = dfp.loc[dfp["coord_t"].astype(int).eq(int(yy))] if sub.empty: continue x = pd.to_numeric( sub["coord_x"], errors="coerce" ).to_numpy(float) y = pd.to_numeric( sub["coord_y"], errors="coerce" ).to_numpy(float) d = pd.to_numeric( sub["delta_abs"], errors="coerce" ).to_numpy(float) p = pd.to_numeric( sub["p_exceed"], errors="coerce" ).to_numpy(float) ok = np.isfinite(x) & np.isfinite(y) x = x[ok] y = y[ok] d = d[ok] p = p[ok] z_d, ext2 = _grid_mean( x, y, d, res=grid_res, extent=extent ) z_p, _ = _grid_mean( x, y, p, res=grid_res, extent=extent ) thr = float("nan") if "T_q" in dfy.columns: tsub = dfy.loc[ dfy["year"].astype(int).eq(int(yy)) ] if not tsub.empty: thr = float(tsub["T_q"].iloc[0]) if not np.isfinite(thr): d2 = d[np.isfinite(d)] thr = ( float(np.nanquantile(d2, 0.90)) if d2.size else float("nan") ) mask = _grid_hotspot_mask( z_d, z_p, thr=thr, hotspot_rule=hotspot_rule, hotspot_abs=hotspot_abs, risk_min=risk_min, ) valid = np.isfinite(z_d) | np.isfinite(z_p) if pers_cnt is None: pers_cnt = np.zeros_like(z_d, dtype=float) valid_cnt = np.zeros_like(z_d, dtype=float) pers_cnt += mask.astype(float) valid_cnt += valid.astype(float) if pers_cnt is None or valid_cnt is None: z = np.full( (int(grid_res), int(grid_res)), np.nan, float ) return z, extent, len(yrs) z = pers_cnt.copy() z[valid_cnt == 0] = np.nan mm = str(mode).strip().lower() if mm == "fraction": with np.errstate(invalid="ignore", divide="ignore"): z = z / valid_cnt z[valid_cnt == 0] = np.nan return z, ext2, len(yrs) def _label_components(mask: np.ndarray) -> np.ndarray: m = np.asarray(mask, dtype=bool) lab = np.zeros(m.shape, dtype=int) h, w = m.shape cur = 0 for iy in range(h): for ix in range(w): if not m[iy, ix] or lab[iy, ix] != 0: continue cur += 1 stack = [(iy, ix)] lab[iy, ix] = cur while stack: y0, x0 = stack.pop() for dy, dx in ( (-1, 0), (1, 0), (0, -1), (0, 1), ): y1 = y0 + dy x1 = x0 + dx if y1 < 0 or y1 >= h or x1 < 0 or x1 >= w: continue if not m[y1, x1] or lab[y1, x1] != 0: continue lab[y1, x1] = cur stack.append((y1, x1)) return lab def _cell_centers( extent: tuple[float, float, float, float], shape: tuple[int, int], ) -> tuple[np.ndarray, np.ndarray]: xmin, xmax, ymin, ymax = extent ny, nx = shape dx = (xmax - xmin) / float(nx) dy = (ymax - ymin) / float(ny) xs = xmin + (np.arange(nx) + 0.5) * dx ys = ymin + (np.arange(ny) + 0.5) * dy xx, yy = np.meshgrid(xs, ys) return xx, yy def _cluster_summary( *, lab: np.ndarray, metric: np.ndarray, risk: np.ndarray, extent: tuple[float, float, float, float], city: str, year: int, rank_by: str, ) -> pd.DataFrame: labs = np.unique(lab) labs = labs[labs > 0] if labs.size == 0: cols = [ "city", "year", "cluster_id", "n_cells", "metric_mean", "metric_max", "risk_mean", "risk_max", "centroid_x", "centroid_y", ] return pd.DataFrame(columns=cols) xx, yy = _cell_centers(extent, lab.shape) rows: list[dict[str, Any]] = [] for k in labs: m = lab == int(k) mv = metric[m] rv = risk[m] mv = mv[np.isfinite(mv)] rv = rv[np.isfinite(rv)] if mv.size == 0: continue cx = float(np.nanmean(xx[m])) cy = float(np.nanmean(yy[m])) rows.append( { "city": city, "year": int(year), "cluster_id": int(k), "n_cells": int(np.sum(m)), "metric_mean": float(np.nanmean(mv)), "metric_max": float(np.nanmax(mv)), "risk_mean": ( float(np.nanmean(rv)) if rv.size else float("nan") ), "risk_max": ( float(np.nanmax(rv)) if rv.size else float("nan") ), "centroid_x": cx, "centroid_y": cy, } ) out = pd.DataFrame(rows) if out.empty: return out # out = out.sort_values( # ["metric_mean", "n_cells"], # ascending=[False, False], # ) key = "metric_mean" if str(rank_by).strip().lower() == "risk": key = "risk_mean" out = out.sort_values( [key, "n_cells"], ascending=[False, False], ) out["priority_rank"] = np.arange(1, len(out) + 1) return out # --------------------------------------------------------------------- # Core computation per city # ---------------------------------------------------------------------
[docs] @dataclass(frozen=True) class CityHotspotData: city: str color: str years: list[int] base_year: int df_points: pd.DataFrame df_years: pd.DataFrame extent: tuple[float, float, float, float]
def _choose_hotspot_mask( pts: pd.DataFrame, *, rule: str, ) -> np.ndarray: r = str(rule).strip().lower() m_pct = pts["is_hotspot_pct"].to_numpy(bool) if "is_hotspot_abs" in pts.columns: m_abs = pts["is_hotspot_abs"].to_numpy(bool) else: m_abs = m_pct if "is_hotspot_risk" in pts.columns: m_rsk = pts["is_hotspot_risk"].to_numpy(bool) else: m_rsk = m_pct if r == "abs": return m_abs if r == "risk": return m_rsk if r == "combo": return m_abs | m_rsk return m_pct
[docs] def build_hotspot_tables( *, city: str, color: str, eval_df: pd.DataFrame, fut_df: pd.DataFrame, years: list[int], base_year: int, subsidence_kind: str, hotspot_q: float, risk_threshold: float, use_abs_risk: bool, hotspot_rule: str, hotspot_abs: float | None, risk_min: float | None, exposure_df: pd.DataFrame | None, exposure_col: str, exposure_default: float, ) -> CityHotspotData: """ Build point/year hotspot tables for one city. Robust points: - If ``kind="cumulative"``, annual rates are derived via diffs. For the first requested year, the previous year is included automatically when available, preferring future data and otherwise evaluation data. - The hotspot rule can be percentile, abs, risk, or combo. - Risk can be exposure-weighted as :math:`P * |\Delta s| * exposure`. Returns ``CityHotspotData`` with ``df_points`` for the requested per-point metrics and ``df_years`` for the requested per-year summaries. """ # ---- normalize years years_req = sorted({int(y) for y in years}) if not years_req: raise ValueError("years must be non-empty") knd = str(subsidence_kind).strip().lower() is_cum = knd == "cumulative" y_min = int(min(years_req)) y_prev = int(y_min - 1) # ---- eval: clean year + ids ev = eval_df.copy() ev["coord_t"] = pd.to_numeric( ev["coord_t"], errors="coerce" ) ev = ev.dropna(subset=["coord_t"]).copy() ev["coord_t"] = ev["coord_t"].astype(int) # ---- baseline annual (actual) at base_year if is_cum: a0 = _annual_from_cumulative( ev, value_col="subsidence_actual", year_col="coord_t", id_col="sample_idx", ) m_b = ev["coord_t"].eq(int(base_year)) base_ids = ev.loc[m_b, "sample_idx"].to_numpy(int) base_vals = a0.loc[m_b].to_numpy(float) base_annual = pd.Series(base_vals, index=base_ids) else: base_annual = ( ev.loc[ ev["coord_t"].eq(int(base_year)), ["sample_idx", "subsidence_actual"], ] .dropna() .assign( sample_idx=lambda d: pd.to_numeric( d["sample_idx"], errors="coerce" ) ) .dropna(subset=["sample_idx"]) .assign( sample_idx=lambda d: d["sample_idx"].astype( int ) ) .set_index("sample_idx")["subsidence_actual"] .astype(float) ) # ---- future: coerce year + keep needed years fu = fut_df.copy() fu["coord_t"] = pd.to_numeric( fu["coord_t"], errors="coerce" ) fu = fu.dropna(subset=["coord_t"]).copy() fu["coord_t"] = fu["coord_t"].astype(int) years_use = set(years_req) if is_cum: years_use.add(y_prev) fu = fu.loc[fu["coord_t"].isin(years_use)].copy() # ---- for cumulative: ensure we have y_prev rows per sample # Prefer future rows; fill missing from eval quantiles at y_prev. if is_cum: have_prev = fu.loc[ fu["coord_t"].eq(y_prev), "sample_idx" ] have_prev = pd.to_numeric(have_prev, errors="coerce") have_prev = have_prev.dropna().astype(int) have_prev_ids = set(have_prev.to_list()) need_prev = fu.loc[ fu["coord_t"].eq(y_min), "sample_idx" ] need_prev = pd.to_numeric(need_prev, errors="coerce") need_prev = need_prev.dropna().astype(int) need_prev_ids = set(need_prev.to_list()) miss_ids = sorted(list(need_prev_ids - have_prev_ids)) if miss_ids: # eval may not have q10/q90; fallback to q50 c10 = ( "subsidence_q10" if "subsidence_q10" in ev.columns else "subsidence_q50" ) c90 = ( "subsidence_q90" if "subsidence_q90" in ev.columns else "subsidence_q50" ) anchor = ev.loc[ ev["coord_t"].eq(y_prev), [ "sample_idx", "coord_t", "coord_x", "coord_y", c10, "subsidence_q50", c90, ], ].copy() anchor = anchor.rename( columns={ c10: "subsidence_q10", c90: "subsidence_q90", } ) anchor["sample_idx"] = pd.to_numeric( anchor["sample_idx"], errors="coerce" ) anchor = anchor.dropna( subset=["sample_idx"] ).copy() anchor["sample_idx"] = anchor[ "sample_idx" ].astype(int) anchor = anchor.loc[ anchor["sample_idx"].isin(miss_ids) ] if not anchor.empty: fu = pd.concat( [fu, anchor], ignore_index=True ) # ---- aggregate duplicates safely, then annualize fu = fu.copy() fu["sample_idx"] = pd.to_numeric( fu["sample_idx"], errors="coerce" ) fu = fu.dropna(subset=["sample_idx"]).copy() fu["sample_idx"] = fu["sample_idx"].astype(int) fu = ( fu.groupby(["sample_idx", "coord_t"], as_index=False) .agg( coord_x=("coord_x", "mean"), coord_y=("coord_y", "mean"), subsidence_q10=("subsidence_q10", "mean"), subsidence_q50=("subsidence_q50", "mean"), subsidence_q90=("subsidence_q90", "mean"), ) .sort_values(["sample_idx", "coord_t"]) .reset_index(drop=True) ) q10_a = _annual_series( fu, value_col="subsidence_q10", kind=knd ) q50_a = _annual_series( fu, value_col="subsidence_q50", kind=knd ) q90_a = _annual_series( fu, value_col="subsidence_q90", kind=knd ) fu = fu.copy() fu["s_q10"] = q10_a.to_numpy(float) fu["s_q50"] = q50_a.to_numpy(float) fu["s_q90"] = q90_a.to_numpy(float) # Keep only requested years for outputs pts = fu.loc[ fu["coord_t"].isin(set(years_req)), [ "sample_idx", "coord_t", "coord_x", "coord_y", "s_q10", "s_q50", "s_q90", ], ].copy() # ---- anomaly vs baseline annual actual pts["s_base"] = pts["sample_idx"].map(base_annual) pts["delta_abs"] = np.abs( pts["s_q50"].to_numpy(float) - pts["s_base"].to_numpy(float) ) # ---- exceedance probability from quantiles s10 = pts["s_q10"].to_numpy(float) s50 = pts["s_q50"].to_numpy(float) s90 = pts["s_q90"].to_numpy(float) if bool(use_abs_risk): s10 = np.abs(s10) s50 = np.abs(s50) s90 = np.abs(s90) pts["p_exceed"] = exceed_prob_from_quantiles( s10, s50, s90, threshold=float(risk_threshold), ) # ---- exposure-weighted risk score pts = _attach_exposure( pts, exposure_df, col=str(exposure_col), default=float(exposure_default), ) pts["risk_score"] = ( pts["delta_abs"].to_numpy(float) * pts["p_exceed"].to_numpy(float) * pts["exposure"].to_numpy(float) ) # ---- percentile thresholds per year (T_q) thr_map: dict[int, float] = {} for yy, g in pts.groupby("coord_t", dropna=False): a = pd.to_numeric( g["delta_abs"], errors="coerce" ).to_numpy(float) a = a[np.isfinite(a)] if a.size == 0: thr_map[int(yy)] = float("nan") else: thr_map[int(yy)] = float( np.nanquantile(a, float(hotspot_q)) ) pts["T_q"] = pts["coord_t"].map(thr_map) # ---- hotspot flags (optional) pts["is_hotspot_pct"] = ( np.isfinite(pts["delta_abs"].to_numpy(float)) & np.isfinite(pts["T_q"].to_numpy(float)) & ( pts["delta_abs"].to_numpy(float) >= pts["T_q"].to_numpy(float) ) ) if hotspot_abs is not None: pts["is_hotspot_abs"] = pts["delta_abs"].to_numpy( float ) >= float(hotspot_abs) if risk_min is not None: pts["is_hotspot_risk"] = pts["p_exceed"].to_numpy( float ) >= float(risk_min) pts["is_hotspot"] = _choose_hotspot_mask( pts, rule=str(hotspot_rule), ) pts["city"] = str(city) # ---- per-year summary (requested years only) # rows_y: List[Dict[str, Any]] = [] # for yy in years_req: # g = pts.loc[pts["coord_t"].eq(int(yy))].copy() # if g.empty: # continue # a = pd.to_numeric(g["delta_abs"], errors="coerce").to_numpy(float) # p = pd.to_numeric(g["p_exceed"], errors="coerce").to_numpy(float) # a2 = a[np.isfinite(a)] # p2 = p[np.isfinite(p)] # h_sel = g["is_hotspot"].to_numpy(bool) # h_pct = g["is_hotspot_pct"].to_numpy(bool) # n_abs = -1 # if "is_hotspot_abs" in g.columns: # n_abs = int(np.sum(g["is_hotspot_abs"].to_numpy(bool))) # n_rsk = -1 # if "is_hotspot_risk" in g.columns: # n_rsk = int(np.sum(g["is_hotspot_risk"].to_numpy(bool))) # rows_y.append( # { # "city": str(city), # "year": int(yy), # "T_q": float(thr_map.get(int(yy), np.nan)), # "n_hotspots": int(np.sum(h_sel)), # "n_hotspots_pct": int(np.sum(h_pct)), # "n_hotspots_abs": n_abs, # "n_hotspots_risk": n_rsk, # "delta_mean": float(np.nanmean(a2)) if a2.size else float("nan"), # "delta_max": float(np.nanmax(a2)) if a2.size else float("nan"), # "p_mean": float(np.nanmean(p2)) if p2.size else float("nan"), # "risk_sum": float( # np.nansum( # pd.to_numeric( # g["risk_score"], # errors="coerce", # ).to_numpy(float) # ) # ), # } # ) # ---- per-year summary (requested years only) rows_y: list[dict[str, Any]] = [] ever_set: set[int] = set() for yy in years_req: g = pts.loc[pts["coord_t"].eq(int(yy))].copy() if g.empty: continue # robust unique sets (never sum booleans) sel_ids = set( pd.to_numeric( g.loc[g["is_hotspot"], "sample_idx"], errors="coerce", ) .dropna() .astype(int) .unique() .tolist() ) pct_ids = set( pd.to_numeric( g.loc[g["is_hotspot_pct"], "sample_idx"], errors="coerce", ) .dropna() .astype(int) .unique() .tolist() ) abs_ids = None if "is_hotspot_abs" in g.columns: abs_ids = set( pd.to_numeric( g.loc[g["is_hotspot_abs"], "sample_idx"], errors="coerce", ) .dropna() .astype(int) .unique() .tolist() ) rsk_ids = None if "is_hotspot_risk" in g.columns: rsk_ids = set( pd.to_numeric( g.loc[g["is_hotspot_risk"], "sample_idx"], errors="coerce", ) .dropna() .astype(int) .unique() .tolist() ) # ever/new based on SELECTED rule new_ids = sel_ids - ever_set ever_set |= sel_ids a = pd.to_numeric( g["delta_abs"], errors="coerce" ).to_numpy(float) p = pd.to_numeric( g["p_exceed"], errors="coerce" ).to_numpy(float) a2 = a[np.isfinite(a)] p2 = p[np.isfinite(p)] rows_y.append( { "city": str(city), "year": int(yy), "T_q": float(thr_map.get(int(yy), np.nan)), # timeline variants "n_hotspots": int(len(sel_ids)), # current "n_hotspots_ever": int( len(ever_set) ), # monotone "n_hotspots_new": int( len(new_ids) ), # additions # keep rule-specific counts for transparency "n_hotspots_pct": int(len(pct_ids)), "n_hotspots_abs": ( int(len(abs_ids)) if abs_ids is not None else -1 ), "n_hotspots_risk": ( int(len(rsk_ids)) if rsk_ids is not None else -1 ), "delta_mean": ( float(np.nanmean(a2)) if a2.size else float("nan") ), "delta_max": ( float(np.nanmax(a2)) if a2.size else float("nan") ), "p_mean": ( float(np.nanmean(p2)) if p2.size else float("nan") ), "risk_sum": float( np.nansum( pd.to_numeric( g["risk_score"], errors="coerce", ).to_numpy(float) ) ), } ) df_years = pd.DataFrame(rows_y) ext = _extent(pts) return CityHotspotData( city=str(city), color=str(color), years=years_req, base_year=int(base_year), df_points=pts, df_years=df_years, extent=ext, )
# --------------------------------------------------------------------- # Plot # --------------------------------------------------------------------- def _axes_cleanup(ax: plt.Axes) -> None: ax.set_xticks([]) ax.set_yticks([]) for s in ("top", "right", "left", "bottom"): ax.spines[s].set_visible(False) def _clip_bounds( vals: list[np.ndarray], clip: float ) -> tuple[float, float]: good: list[np.ndarray] = [] for v in vals: a = np.asarray(v) a = a[np.isfinite(a)] if a.size: good.append(a) if not good: return (0.0, 1.0) g = np.concatenate(good) hi = float(np.nanpercentile(g, clip)) lo = float(np.nanpercentile(g, 100.0 - clip)) if not np.isfinite(lo) or not np.isfinite(hi) or lo >= hi: lo = float(np.nanmin(g)) hi = float(np.nanmax(g)) return lo, hi
[docs] def plot_hotspot_analytics( *, cities: list[CityHotspotData], focus_year: int, grid_res: int, clip: float, cmap_metric: str, cmap_prob: str, persistence_mode: str, cluster_top: int, risk_threshold: float, show_legend: bool, show_title: bool, show_panel_titles: bool, title: str | None, out: str, out_points: str | None, out_years: str | None, out_clusters: str | None, timeline_mode: str, timeline_overlay_current: bool, dpi: int, font: int, cluster_rank: str, add_persistence: bool, add_compare: bool, compare_metric: str, hotspot_rule: str, hotspot_abs: float | None, risk_min: float | None, ns_future_b: str | None = None, zh_future_b: str | None = None, gdf: GeoDF | dict[str, GeoDF] | None = None, ) -> None: utils.ensure_script_dirs() utils.set_paper_style(fontsize=int(font), dpi=int(dpi)) cmap1 = plt.get_cmap(str(cmap_metric)) cmap2 = plt.get_cmap(str(cmap_prob)) dvals: list[np.ndarray] = [] for c in cities: dvals.append( pd.to_numeric( c.df_points["delta_abs"], errors="coerce" ).to_numpy(float) ) dmin, dmax = _clip_bounds(dvals, float(clip)) n_rows = len(cities) n_cols = 4 if add_persistence: n_cols += 1 have_b = ( ns_future_b is not None or zh_future_b is not None # or since we can compare for one city only # and if we want to compare both cities obligatory ) if add_compare and have_b: n_cols += 1 fig_w = 2.4 * n_cols + 1.8 fig_h = 2.5 * n_rows + 1.2 fig, axes = plt.subplots( nrows=n_rows, ncols=n_cols, figsize=(fig_w, fig_h), constrained_layout=False, ) ax_arr = np.array(axes, ndmin=2) # plt.subplots_adjust( # left=0.06, # right=0.92 if show_legend else 0.98, # top=0.92, # bottom=0.10, # wspace=0.18, # hspace=0.25, # ) plot_right = 0.88 if show_legend else 0.98 plt.subplots_adjust( left=0.06, right=plot_right, top=0.92, bottom=0.10, wspace=0.26, hspace=0.25, ) all_points: list[pd.DataFrame] = [] all_years: list[pd.DataFrame] = [] all_clusters: list[pd.DataFrame] = [] for r, c in enumerate(cities): dfp = c.df_points.copy() dfy = c.df_years.copy() all_points.append(dfp) all_years.append(dfy) city = c.city col = c.color ext = c.extent # sub = dfp.loc[dfp["coord_t"].astype(int).eq(int(focus_year))] sub = dfp.loc[ dfp["coord_t"].astype(int).eq(int(focus_year)) ] focus_y = int(focus_year) if sub.empty: focus_y = int( np.nanmax(dfp["coord_t"].to_numpy(int)) ) sub = dfp.loc[ dfp["coord_t"].astype(int).eq(int(focus_y)) ] # x = pd.to_numeric(sub["coord_x"], errors="coerce").to_numpy(float) # y = pd.to_numeric(sub["coord_y"], errors="coerce").to_numpy(float) # d = pd.to_numeric(sub["delta_abs"], errors="coerce").to_numpy(float) # p = pd.to_numeric(sub["p_exceed"], errors="coerce").to_numpy(float) # ok = np.isfinite(x) & np.isfinite(y) & np.isfinite(d) # x = x[ok] # y = y[ok] # d = d[ok] # p = p[ok] # p = np.where(np.isfinite(p), p, 0.0) # # z_d, ext2 = _grid_mean(x, y, d, res=grid_res, extent=ext) # z_d, ext2 = _grid_mean(x, y, d, res=grid_res, extent=ext) # z_p, _ = _grid_mean(x, y, p, res=grid_res, extent=ext) # # z_r, _ = _grid_mean(x, y, d * p, res=grid_res, extent=ext) # rs = pd.to_numeric( # sub["risk_score"], errors="coerce" # ).to_numpy(float) # ok = np.isfinite(x) & np.isfinite(y) & np.isfinite(d) # # also require rs for risk grid # rs = rs[ok] # z_r, _ = _grid_mean(x, y, rs, # res=grid_res, extent=ext) x = pd.to_numeric( sub["coord_x"], errors="coerce" ).to_numpy(float) y = pd.to_numeric( sub["coord_y"], errors="coerce" ).to_numpy(float) d = pd.to_numeric( sub["delta_abs"], errors="coerce" ).to_numpy(float) p = pd.to_numeric( sub["p_exceed"], errors="coerce" ).to_numpy(float) rs = pd.to_numeric( sub["risk_score"], errors="coerce" ).to_numpy(float) ok = np.isfinite(x) & np.isfinite(y) & np.isfinite(d) x = x[ok] y = y[ok] d = d[ok] p = p[ok] rs = rs[ok] p = np.where(np.isfinite(p), p, 0.0) rs = np.where(np.isfinite(rs), rs, 0.0) z_d, ext2 = _grid_mean( x, y, d, res=grid_res, extent=ext ) z_p, _ = _grid_mean(x, y, p, res=grid_res, extent=ext) z_r, _ = _grid_mean( x, y, rs, res=grid_res, extent=ext ) thr = float("nan") if "T_q" in dfy.columns: tsub = dfy.loc[dfy["year"].eq(int(focus_y))] if not tsub.empty: thr = float(tsub["T_q"].iloc[0]) if not np.isfinite(thr): thr = float(np.nanquantile(d, 0.90)) mask = _grid_hotspot_mask( z_d, z_p, thr=thr, hotspot_rule=hotspot_rule, hotspot_abs=hotspot_abs, risk_min=risk_min, ) lab = _label_components(mask) cl = _cluster_summary( lab=lab, metric=z_d, risk=z_r, extent=ext, city=city, year=int(focus_y), rank_by=cluster_rank, ) all_clusters.append(cl) # boundaries per city (optional) gdf_here = None if isinstance(gdf, dict): gdf_here = gdf.get(city) if gdf_here is None or getattr( gdf_here, "empty", False ): g_alt = gdf.get(_slug_city(city)) if g_alt is not None and not getattr( g_alt, "empty", False ): gdf_here = g_alt else: gdf_here = gdf # persistence grid (optional) z_pers = None pers_ext = ext2 n_pers_years = 0 if add_persistence: z_pers, pers_ext, n_pers_years = ( _compute_persistence_grid( dfp=dfp, dfy=dfy, years=c.years, extent=ext, grid_res=int(grid_res), hotspot_rule=str(hotspot_rule), hotspot_abs=hotspot_abs, risk_min=risk_min, mode=str(persistence_mode), ) ) # (1) anomaly map ax0 = ax_arr[r, 0] ax0.imshow( z_d, extent=ext2, origin="lower", cmap=cmap1, vmin=dmin, vmax=dmax, interpolation="nearest", aspect="equal", ) if np.any(lab > 0): ax0.contour( (lab > 0).astype(float), levels=[0.5], colors="k", linewidths=0.7, origin="lower", extent=ext2, ) if not cl.empty and int(cluster_top) > 0: topc = cl.head(int(cluster_top)) for _, row in topc.iterrows(): ax0.text( float(row["centroid_x"]), float(row["centroid_y"]), str(int(row["priority_rank"])), color=col, fontsize=max(7, int(font)), fontweight="bold", ha="center", va="center", bbox={ "boxstyle": "circle,pad=0.15", "fc": "white", "ec": col, "lw": 0.8, "alpha": 0.9, }, ) try: gdf_here.boundary.plot( ax=ax0, color="k", linewidth=0.6, alpha=0.7, ) except Exception: pass _axes_cleanup(ax0) # (2) probability map ax1 = ax_arr[r, 1] ax1.imshow( z_p, extent=ext2, origin="lower", cmap=cmap2, vmin=0.0, vmax=1.0, interpolation="nearest", aspect="equal", ) if np.any(lab > 0): ax1.contour( (lab > 0).astype(float), levels=[0.5], colors="k", linewidths=0.7, origin="lower", extent=ext2, ) try: gdf_here.boundary.plot( ax=ax1, color="k", linewidth=0.6, alpha=0.7, ) except Exception: pass _axes_cleanup(ax1) # (3) timeline ax2 = ax_arr[r, 2] t = dfy.sort_values("year") yrs = t["year"].to_numpy(int) mode = str(timeline_mode).strip().lower() col_map = { "current": "n_hotspots", "ever": "n_hotspots_ever", "new": "n_hotspots_new", } k = col_map.get(mode, "n_hotspots") if k not in t.columns: k = "n_hotspots" nh = pd.to_numeric(t[k], errors="coerce").to_numpy( float ) nh_cur = pd.to_numeric( t.get("n_hotspots", np.nan), errors="coerce", ).to_numpy(float) rgba_bar = mpl.colors.to_rgba(col, 0.25) bar_lab = ( mode if mode in ("ever", "new") else "current" ) ax2.bar( yrs, nh, width=0.72, color=rgba_bar, edgecolor=col, linewidth=1.0, zorder=2, label=bar_lab, ) ax2.set_axisbelow(True) ax2.grid(True, axis="y", alpha=0.20, zorder=0) for s in ("top", "right"): ax2.spines[s].set_visible(False) if mode == "ever": yl = "# hotspots (ever)" elif mode == "new": yl = "# hotspots (new)" else: yl = "# hotspots (current)" ax2.set_ylabel(yl) # ---- optional overlay: dashed current line do_overlay = ( bool(timeline_overlay_current) and mode in ("ever", "new") and np.any(np.isfinite(nh_cur)) ) if do_overlay: rgba_line = mpl.colors.to_rgba(col, 0.90) ax2.plot( yrs, nh_cur, linestyle="--", marker="o", linewidth=1.2, markersize=4.0, color=rgba_line, zorder=3, label="current", ) # light, local legend (only for this axis) ax2.legend( loc="upper right", frameon=False, fontsize=max(7, int(font) - 1), handlelength=2.0, ) ax2b = ax2.twinx() tq = pd.to_numeric( t["T_q"], errors="coerce" ).to_numpy(float) dx = pd.to_numeric( t["delta_max"], errors="coerce" ).to_numpy(float) # ax2b.plot(yrs, tq, marker="o", linestyle="--") # ax2b.plot(yrs, dx, marker="s", linestyle="-") ax2b.plot( yrs, tq, marker="o", linestyle="--", linewidth=1.2, ) ax2b.plot( yrs, dx, marker="s", linestyle="-", linewidth=1.2, ) ax2.set_xlabel("Year") # ax2.set_ylabel("# hotspots") # ax2b.set_ylabel("T0.9 / max|Δs|") ax2b.set_ylabel( r"T$_{0.9}$ / max|Δs|", labelpad=1, ) # pull the label *inside* so it doesn't hit ax3 labels ax2b.yaxis.set_label_coords(0.98, 0.5) ax2b.tick_params(pad=1) ax2.grid(True, axis="y", alpha=0.25) # (4) priority clusters (Nature-style) ax3 = ax_arr[r, 3] for s in ("top", "right"): ax3.spines[s].set_visible(False) ax3.set_axisbelow(True) ax3.grid(True, axis="x", alpha=0.18) if cl.empty: ax3.text( 0.5, 0.5, "No hotspots", ha="center", va="center", transform=ax3.transAxes, ) ax3.set_xticks([]) ax3.set_yticks([]) else: topc = cl.head(int(cluster_top)).copy() key = "metric_mean" xlab = r"mean |Δs| (mm yr$^{-1}$)" if str(cluster_rank).strip().lower() == "risk": key = "risk_mean" xlab = "mean risk score" vals = pd.to_numeric( topc[key], errors="coerce" ).to_numpy(float) ylab = [] for _, rr in topc.iterrows(): rk = int(rr["priority_rank"]) cid = int(rr["cluster_id"]) ylab.append(f"{rk}. C{cid}") yk = np.arange(len(ylab)) rgba = mpl.colors.to_rgba(col, 0.75) bars = ax3.barh( yk, vals, color=rgba, edgecolor=col, linewidth=1.0, ) ax3.set_yticks(yk) ax3.set_yticklabels(ylab) ax3.tick_params(axis="y", length=0) ax3.invert_yaxis() ax3.set_xlabel(xlab) vmax = ( float(np.nanmax(vals)) if np.any(np.isfinite(vals)) else 1.0 ) pad = 0.03 * vmax ax3.set_xlim(0.0, vmax + 6.0 * pad) for b, v in zip(bars, vals, strict=False): if not np.isfinite(v): continue ax3.text( float(v) + pad, b.get_y() + 0.5 * b.get_height(), f"{v:.1f}", va="center", ha="left", fontsize=max(7, int(font) - 1), ) # (5) persistence panel (optional) if add_persistence: ax4 = ax_arr[r, 4] mm = str(persistence_mode).strip().lower() if mm == "count": vmin, vmax = 0.0, float(max(1, n_pers_years)) else: vmin, vmax = 0.0, 1.0 ax4.imshow( z_pers, extent=pers_ext, origin="lower", cmap=cmap2, vmin=vmin, vmax=vmax, interpolation="nearest", aspect="equal", ) try: gdf_here.boundary.plot( ax=ax4, color="k", linewidth=0.6, alpha=0.7, ) except Exception: pass _axes_cleanup(ax4) if show_panel_titles: ax0.set_title( f"{city}{focus_y} |Δs| vs {c.base_year}", loc="left", fontweight="bold", pad=4, ) ax1.set_title( f"{city} • P(|s|≥{risk_threshold:g})", loc="left", fontweight="bold", pad=4, ) tag = ( mode if mode in ("current", "ever", "new") else "current" ) ax2.set_title( f"{city} • hotspot evolution ({tag})", loc="left", fontweight="bold", pad=4, ) ax3.set_title( f"{city} • priority clusters", loc="left", fontweight="bold", pad=4, ) if add_persistence: mm = str(persistence_mode).strip().lower() tag = "fraction" if mm != "count" else "count" ax4.set_title( f"{city} • persistence ({tag})", loc="left", fontweight="bold", pad=4, ) if show_legend: # Colorbar strip starts just to the right of the subplot grid cbar_x = plot_right + 0.02 cbar_w = 0.012 if add_persistence: # 3 stacked bars with comfortable gaps cax_d = fig.add_axes([cbar_x, 0.68, cbar_w, 0.22]) cax_s = fig.add_axes([cbar_x, 0.40, cbar_w, 0.20]) cax_p = fig.add_axes([cbar_x, 0.12, cbar_w, 0.22]) else: # 2 stacked bars cax_d = fig.add_axes([cbar_x, 0.56, cbar_w, 0.30]) cax_p = fig.add_axes([cbar_x, 0.16, cbar_w, 0.30]) cax_s = None cb1 = fig.colorbar( mpl.cm.ScalarMappable( norm=mpl.colors.Normalize( vmin=dmin, vmax=dmax ), cmap=cmap1, ), cax=cax_d, ) cb1.set_label(r"|Δs| (mm yr$^{-1}$)") if add_persistence and cax_s is not None: mm = str(persistence_mode).strip().lower() vmaxp = ( float(max(1, len(cities[0].years))) if mm == "count" else 1.0 ) cbp = fig.colorbar( mpl.cm.ScalarMappable( norm=mpl.colors.Normalize( vmin=0.0, vmax=vmaxp ), cmap=cmap2, ), cax=cax_s, ) cbp.set_label( "Hotspot persistence (years)" if mm == "count" else "Hotspot persistence (fraction)" ) cb2 = fig.colorbar( mpl.cm.ScalarMappable( norm=mpl.colors.Normalize(vmin=0.0, vmax=1.0), cmap=cmap2, ), cax=cax_p, ) cb2.set_label(r"P(|s| ≥ T)") if show_title: ttl = utils.resolve_title( default="Hotspot analytics — where to act first", title=title, ) fig.suptitle(ttl, x=0.02, ha="left") utils.save_figure(fig, out, dpi=int(dpi)) p_pts = ( utils.as_path(out_points) if out_points else utils.resolve_out_out("hotspot_points.csv") ) pd.concat(all_points, ignore_index=True).to_csv( p_pts, index=False ) print(f"[OK] wrote {p_pts}") p_yrs = ( utils.as_path(out_years) if out_years else utils.resolve_out_out("hotspot_years.csv") ) pd.concat(all_years, ignore_index=True).to_csv( p_yrs, index=False ) print(f"[OK] wrote {p_yrs}") p_cls = ( utils.as_path(out_clusters) if out_clusters else utils.resolve_out_out("hotspot_clusters.csv") ) pd.concat(all_clusters, ignore_index=True).to_csv( p_cls, index=False ) print(f"[OK] wrote {p_cls}")
# --------------------------------------------------------------------- # CLI # --------------------------------------------------------------------- def _add_args(ap: argparse.ArgumentParser) -> None: utils.add_city_flags(ap, default_both=True) ap.add_argument("--ns-src", type=str, default=None) ap.add_argument("--zh-src", type=str, default=None) ap.add_argument("--ns-eval", type=str, default=None) ap.add_argument("--zh-eval", type=str, default=None) ap.add_argument("--ns-future", type=str, default=None) ap.add_argument("--zh-future", type=str, default=None) ap.add_argument( "--split", type=str, choices=["auto", "val", "test"], default="auto", ) ap.add_argument("--base-year", type=int, default=2022) ap.add_argument( "--years", nargs="+", type=int, default=[2025, 2026], ) ap.add_argument("--focus-year", type=int, default=None) ap.add_argument( "--subsidence-kind", type=str, default="cumulative", choices=["cumulative", "rate", "increment"], ) ap.add_argument( "--hotspot-quantile", type=float, default=0.90 ) ap.add_argument( "--risk-threshold", type=float, default=50.0 ) ap.add_argument("--risk-abs", type=str, default="true") ap.add_argument("--grid-res", type=int, default=300) ap.add_argument("--clip", type=float, default=98.0) ap.add_argument( "--cmap-metric", type=str, default="magma" ) ap.add_argument( "--cmap-prob", type=str, default="viridis" ) ap.add_argument("--cluster-top", type=int, default=6) ap.add_argument("--out-points", type=str, default=None) ap.add_argument("--out-years", type=str, default=None) ap.add_argument("--out-clusters", type=str, default=None) ap.add_argument("--dpi", type=int, default=cfg.PAPER_DPI) ap.add_argument( "--font", type=int, default=cfg.PAPER_FONT ) ap.add_argument( "--hotspot-rule", type=str, default="percentile", choices=[ "percentile", "abs", "risk", "combo", ], help=( "Which hotspot definition is used " "for counts, clusters, persistence." ), ) ap.add_argument( "--hotspot-abs", type=float, default=None, help="Absolute hotspot: |Δs|>=X (mm/yr).", ) ap.add_argument( "--risk-min", type=float, default=None, help="Risk hotspot: P(|s|>=T)>=p.", ) ap.add_argument( "--exposure", type=str, default=None, help=("Optional exposure CSV (sample_idx,exposure)."), ) ap.add_argument( "--exposure-col", type=str, default="exposure", ) ap.add_argument( "--exposure-default", type=float, default=1.0, ) ap.add_argument( "--timeline-mode", type=str, default="current", choices=["current", "ever", "new"], help=( "Timeline bars show: " "current hotspots (per-year), " "ever hotspots (union up to year), " "or new hotspots (yearly additions)." ), ) ap.add_argument( "--timeline-overlay-current", type=str, default="true", help=( "Overlay dashed line for current hotspots " "on top of ever/new bars." ), ) ap.add_argument( "--cluster-rank", type=str, default="delta", choices=["delta", "risk"], help="Rank clusters by |Δs| or risk score.", ) ap.add_argument( "--add-persistence", action="store_true", help="Add a persistence map panel.", ) ap.add_argument( "--persistence-mode", type=str, default="fraction", choices=["fraction", "count"], help=( "Persistence definition: " "fraction (0..1) or count (0..N years)." ), ) ap.add_argument( "--boundary", type=str, default=None, help="Optional boundary GeoJSON/SHP.", ) ap.add_argument( "--ns-boundary", type=str, default=None, help="Optional Nansha boundary GeoJSON/SHP.", ) ap.add_argument( "--zh-boundary", type=str, default=None, help="Optional Zhongshan boundary GeoJSON/SHP.", ) ap.add_argument( "--boundary-name-field", type=str, default=None, ) ap.add_argument( "--ns-future-b", type=str, default=None, ) ap.add_argument( "--zh-future-b", type=str, default=None, ) ap.add_argument( "--compare-metric", type=str, default="risk", choices=["risk", "delta", "prob"], ) ap.add_argument( "--add-compare", action="store_true", help="Add scenario A vs B panel if B exists.", ) utils.add_plot_text_args( ap, default_out="hotspot_analytics", )
[docs] def plot_hotspot_analytics_main( argv: list[str] | None = None, *, prog: str | None = None, ) -> None: ap = argparse.ArgumentParser( prog=prog or "plot-hotspot-analytics", description="Decision-maker hotspot analytics.", ) _add_args(ap) args = ap.parse_args(argv) show_legend = utils.str_to_bool( args.show_legend, default=True ) show_title = utils.str_to_bool( args.show_title, default=True ) show_pt = utils.str_to_bool( args.show_panel_titles, default=True ) overlay_cur = utils.str_to_bool( args.timeline_overlay_current, default=True, ) years = [int(y) for y in (args.years or [])] if not years: raise SystemExit("--years must be non-empty") focus_year = ( int(args.focus_year) if args.focus_year is not None else int(max(years)) ) cities0 = utils.resolve_cities(args) if not cities0: cities0 = [_CITY_A, _CITY_B] want_a = _CITY_A in cities0 want_b = _CITY_B in cities0 if not want_a and not want_b: want_a = True want_b = True expo = None if args.exposure: expo = _load_exposure( args.exposure, col=str(args.exposure_col), ) resolved: list[dict[str, Any]] = [] if want_a: resolved.append( _resolve_city( city=_CITY_A, src=args.ns_src, eval_csv=args.ns_eval, future_csv=args.ns_future, split=str(args.split), ) ) if want_b: resolved.append( _resolve_city( city=_CITY_B, src=args.zh_src, eval_csv=args.zh_eval, future_csv=args.zh_future, split=str(args.split), ) ) cities: list[CityHotspotData] = [] for rc in resolved: cities.append( build_hotspot_tables( city=str(rc["name"]), color=str(rc["color"]), eval_df=rc["eval_df"], fut_df=rc["future_df"], years=years, base_year=int(args.base_year), subsidence_kind=str(args.subsidence_kind), hotspot_q=float(args.hotspot_quantile), risk_threshold=float(args.risk_threshold), use_abs_risk=utils.str_to_bool( args.risk_abs, default=True ), hotspot_rule=str(args.hotspot_rule), hotspot_abs=args.hotspot_abs, risk_min=args.risk_min, exposure_df=expo, exposure_col=str(args.exposure_col), exposure_default=float(args.exposure_default), ) ) # gdf = _try_load_boundaries(args.boundary) gdf_map = _resolve_boundary_map( boundary=args.boundary, ns_boundary=args.ns_boundary, zh_boundary=args.zh_boundary, boundary_name_field=args.boundary_name_field, cities=[c.city for c in cities], ) plot_hotspot_analytics( cities=cities, focus_year=int(focus_year), grid_res=int(args.grid_res), clip=float(args.clip), cmap_metric=str(args.cmap_metric), cmap_prob=str(args.cmap_prob), persistence_mode=str(args.persistence_mode), cluster_top=int(args.cluster_top), risk_threshold=float(args.risk_threshold), show_legend=bool(show_legend), show_title=bool(show_title), show_panel_titles=bool(show_pt), title=args.title, out=str(args.out), out_points=args.out_points, out_years=args.out_years, out_clusters=args.out_clusters, dpi=int(args.dpi), font=int(args.font), cluster_rank=str(args.cluster_rank), add_persistence=bool(args.add_persistence), hotspot_rule=str(args.hotspot_rule), hotspot_abs=args.hotspot_abs, risk_min=args.risk_min, # boundary=args.boundary, timeline_mode=str(args.timeline_mode), timeline_overlay_current=bool(overlay_cur), add_compare=bool(args.add_compare), ns_future_b=args.ns_future_b, zh_future_b=args.zh_future_b, compare_metric=str(args.compare_metric), gdf=gdf_map, )
[docs] def main( argv: list[str] | None = None, *, prog: str | None = None, ) -> None: plot_hotspot_analytics_main(argv, prog=prog)
if __name__ == "__main__": main()