# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>
"""
Plot cumulative subsidence maps on satellite basemap + optional hotspots.
Layout
------
The figure uses two rows by N columns, with Nansha on top and
Zhongshan on the bottom. The first columns show 2022 observed and
predicted cumulative maps, followed by forecast cumulative maps for
later years.
Background and overlays
-----------------------
- Background: ``Esri.WorldImagery`` satellite tiles via
``contextily``.
- Foreground: PS points colored by cumulative subsidence (mm) since
a baseline year, default 2020.
- Optional overlay: hotspot points from the Fig. 6 hotspot CSV.
Cumulative definition
---------------------
This script always plots *cumulative since start_year*, but the
input CSV can be:
1. ``--subsidence-kind=cumulative`` (default): values are already
cumulative, so the script rebases them at the first year greater
than or equal to ``start_year`` using
``cum(t) = value(t) - value(first)``.
2. ``--subsidence-kind=increment`` or ``rate``: values are yearly
increments, so the script accumulates them using
``cum(t) = sum_y increment(y)``.
CRS handling (lat/lon vs UTM)
-----------------------------
coord_x/coord_y may be:
- lon/lat (degrees), treated as EPSG:4326
- UTM (meters), treated as ``EPSG:<utm_epsg>`` with default 32649
Auto-detection:
- if ``abs(x) <= 180`` and ``abs(y) <= 90``, treat the coordinates
as lon/lat
- otherwise, treat them as UTM
You can override using --coords-mode and --utm-epsg.
Outputs
-------
Saves both PNG and PDF using --out (resolved under scripts/figs
if relative).
Notes
-----
- Code style targets black+ruff with line length 62.
"""
from __future__ import annotations
import argparse
from collections.abc import Iterable
import contextily as cx
import geopandas as gpd
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from . import config as cfg
from . import utils
# ---------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------
[docs]
def build_parser(
*, prog: str | None = None
) -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog=prog or "plot-geo-cumulative",
description=(
"Cumulative subsidence maps on a satellite "
"basemap (GeoPrior)."
),
)
p.add_argument(
"--ns-val",
required=True,
help="Nansha validation calibrated CSV.",
)
p.add_argument(
"--zh-val",
required=True,
help="Zhongshan validation calibrated CSV.",
)
p.add_argument(
"--ns-future",
required=True,
help="Nansha future forecast CSV (H*_future).",
)
p.add_argument(
"--zh-future",
required=True,
help="Zhongshan future forecast CSV (H*_future).",
)
p.add_argument(
"--start-year",
type=int,
default=2020,
help="Baseline year for cumulative sum.",
)
p.add_argument(
"--year-val",
type=int,
default=2022,
help="Observed vs predicted panel year.",
)
p.add_argument(
"--years-forecast",
nargs="*",
type=int,
default=[2024, 2025],
help="Forecast years (cumulative q50).",
)
p.add_argument(
"--subsidence-kind",
default="cumulative",
choices=("cumulative", "increment", "rate"),
help=(
"Meaning of subsidence values in CSVs. "
"cumulative=already cumulative (rebase at start_year); "
"increment/rate=per-year increments (cumsum)."
),
)
p.add_argument(
"--clip",
type=float,
default=99.0,
help="Percentile clip for color scale.",
)
p.add_argument(
"--cmap",
default="viridis",
help="Colormap for cumulative subsidence.",
)
p.add_argument(
"--point-size",
type=float,
default=1.5,
help="Marker size for PS points.",
)
p.add_argument(
"--point-alpha",
type=float,
default=0.90,
help="Alpha for PS points.",
)
p.add_argument(
"--hotspot-csv",
default=None,
help="Optional hotspot points CSV (Fig.6).",
)
p.add_argument(
"--hotspot-field",
default=None,
help="Optional hotspot color-by column.",
)
p.add_argument(
"--hotspot-color",
default="black",
help="Hotspot color if no hotspot-field.",
)
p.add_argument(
"--hotspot-size",
type=float,
default=8.0,
help="Hotspot marker size.",
)
p.add_argument(
"--hotspot-alpha",
type=float,
default=0.95,
help="Hotspot alpha.",
)
p.add_argument(
"--coords-mode",
choices=("auto", "lonlat", "utm"),
default="auto",
help="Coordinate mode for coord_x/coord_y.",
)
p.add_argument(
"--utm-epsg",
type=int,
default=32649,
help="EPSG for UTM coords (default 32649).",
)
p.add_argument(
"--hide-attribution",
action="store_true",
help="Hide contextily attribution.",
)
p.add_argument("--dpi", type=int, default=cfg.PAPER_DPI)
p.add_argument("--font", type=int, default=9)
utils.add_plot_text_args(
p,
default_out="spatial_satellite_cumulative",
)
return p
[docs]
def parse_args(
argv: list[str] | None = None,
*,
prog: str | None = None,
) -> argparse.Namespace:
p = build_parser(prog=prog)
return p.parse_args(argv)
# ---------------------------------------------------------------------
# Loading / schema
# ---------------------------------------------------------------------
def _load_csv_base(
path: str,
*,
needed: Iterable[str],
) -> pd.DataFrame:
p = utils.as_path(path)
if not p.exists():
raise FileNotFoundError(str(p))
df = pd.read_csv(p)
miss = [c for c in needed if c not in df.columns]
if miss:
raise KeyError(f"Missing columns in {p}: {miss}")
for c in ["coord_x", "coord_y", "coord_t"]:
df[c] = pd.to_numeric(df[c], errors="coerce")
if "forecast_step" in df.columns:
df["forecast_step"] = pd.to_numeric(
df["forecast_step"],
errors="coerce",
)
df = df.dropna(subset=["coord_x", "coord_y", "coord_t"])
return df.copy()
[docs]
def load_val_csv(path: str) -> pd.DataFrame:
df = _load_csv_base(
path,
needed=(
"sample_idx",
"coord_t",
"coord_x",
"coord_y",
"subsidence_actual",
"subsidence_q50",
),
)
for c in ["subsidence_actual", "subsidence_q50"]:
df[c] = pd.to_numeric(df[c], errors="coerce")
return df.dropna(
subset=["subsidence_actual", "subsidence_q50"]
)
[docs]
def load_future_csv(path: str) -> pd.DataFrame:
df = _load_csv_base(
path,
needed=(
"sample_idx",
"coord_t",
"coord_x",
"coord_y",
"subsidence_q50",
),
)
df["subsidence_q50"] = pd.to_numeric(
df["subsidence_q50"],
errors="coerce",
)
return df.dropna(subset=["subsidence_q50"])
# ---------------------------------------------------------------------
# CRS inference
# ---------------------------------------------------------------------
def _looks_like_lonlat(x: float, y: float) -> bool:
return (abs(float(x)) <= 180.0) and (
abs(float(y)) <= 90.0
)
[docs]
def infer_xy_crs(
df: pd.DataFrame,
*,
mode: str,
utm_epsg: int,
) -> str:
if mode == "lonlat":
return "EPSG:4326"
if mode == "utm":
return f"EPSG:{int(utm_epsg)}"
xs = pd.to_numeric(df["coord_x"], errors="coerce")
ys = pd.to_numeric(df["coord_y"], errors="coerce")
if xs.notna().any() and ys.notna().any():
x0 = float(xs.dropna().iloc[0])
y0 = float(ys.dropna().iloc[0])
if _looks_like_lonlat(x0, y0):
return "EPSG:4326"
return f"EPSG:{int(utm_epsg)}"
[docs]
def to_web_mercator(
df: pd.DataFrame,
*,
src_crs: str,
) -> gpd.GeoDataFrame:
gdf = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(
df["coord_x"],
df["coord_y"],
),
crs=src_crs,
)
return gdf.to_crs(epsg=3857)
# ---------------------------------------------------------------------
# Cumulative panels
# ---------------------------------------------------------------------
def _panel_df(
df: pd.DataFrame,
value_col: str,
) -> pd.DataFrame:
out = df[["coord_x", "coord_y", value_col]].copy()
return out.rename(columns={value_col: "value"})
def _cum_from_input(
df: pd.DataFrame,
*,
value_col: str,
kind: str,
) -> pd.Series:
kind = str(kind).strip().lower()
g = df.groupby("sample_idx")[value_col]
if kind in ("increment", "rate"):
return g.cumsum()
# kind == "cumulative" (default): rebase at first available year
return df[value_col] - g.transform("first")
def _cum_title_tag(kind: str) -> str:
k = str(kind).strip().lower()
if k in ("increment", "rate"):
return "cum (from increments)"
return "cum (rebased)"
def _cum_note(kind: str) -> str:
k = str(kind).strip().lower()
if k in ("increment", "rate"):
return "from increments"
return "rebased"
[docs]
def build_city_panels(
val_df: pd.DataFrame,
fut_df: pd.DataFrame,
*,
start_year: int,
year_val: int,
years_forecast: list[int],
subs_kind: str = "cumulative",
) -> dict[str, pd.DataFrame]:
val = val_df.copy()
fut = fut_df.copy()
val["coord_t"] = val["coord_t"].astype(int)
fut["coord_t"] = fut["coord_t"].astype(int)
val = val[val["coord_t"] >= int(start_year)]
fut = fut[fut["coord_t"] >= int(start_year)]
# ---- observed cumulative (validation only)
v0 = val.sort_values(["sample_idx", "coord_t"])
v0 = v0.drop_duplicates(
subset=["sample_idx", "coord_t"],
keep="last",
)
# currently our data collected are cumlative so no need
# since the prediction should be made on cumulative
v0["cum_actual"] = _cum_from_input(
v0,
value_col="subsidence_actual",
kind=subs_kind,
)
obs = v0[v0["coord_t"] == int(year_val)]
obs_panel = (
_panel_df(obs, "cum_actual")
if not obs.empty
else (pd.DataFrame())
)
# ---- predicted cumulative (val + future)
cols = [
"sample_idx",
"coord_t",
"coord_x",
"coord_y",
"subsidence_q50",
]
comb = pd.concat(
[val[cols].copy(), fut[cols].copy()],
ignore_index=True,
)
comb = comb.sort_values(["sample_idx", "coord_t"])
comb = comb.drop_duplicates(
subset=["sample_idx", "coord_t"],
keep="last",
)
comb["cum_pred"] = _cum_from_input(
comb,
value_col="subsidence_q50",
kind=subs_kind,
)
pred = comb[comb["coord_t"] == int(year_val)]
pred_panel = (
_panel_df(pred, "cum_pred")
if not pred.empty
else (pd.DataFrame())
)
out: dict[str, pd.DataFrame] = {}
if not obs_panel.empty:
out["obs"] = obs_panel
if not pred_panel.empty:
out["pred"] = pred_panel
for yy in years_forecast:
sub = comb[comb["coord_t"] == int(yy)]
if sub.empty:
continue
out[f"Y{int(yy)}"] = _panel_df(sub, "cum_pred")
return out
[docs]
def clip_bounds(
arrays: list[np.ndarray],
*,
clip: float,
) -> tuple[float, float]:
if not arrays:
return (0.0, 1.0)
good: list[np.ndarray] = []
for a in arrays:
if a.size == 0:
continue
aa = a[np.isfinite(a)]
if aa.size:
good.append(aa)
if not good:
return (0.0, 1.0)
x = np.concatenate(good)
p = float(clip)
p = max(50.0, min(100.0, p))
lo = float(np.percentile(x, 100.0 - p))
hi = float(np.percentile(x, p))
if lo >= hi:
lo = float(np.min(x))
hi = float(np.max(x))
return (lo, hi)
# ---------------------------------------------------------------------
# Plotting
# ---------------------------------------------------------------------
[docs]
def draw_city_row(
*,
axes: np.ndarray,
row_idx: int,
city: str,
panels_3857: dict[str, gpd.GeoDataFrame],
col_keys: list[str],
cmap: mpl.colors.Colormap,
cum_tag: str,
vmin: float,
vmax: float,
args: argparse.Namespace,
show_panel_titles: bool,
hotspots: pd.DataFrame | None,
) -> None:
for j, key in enumerate(col_keys):
ax = axes[row_idx, j]
if key not in panels_3857:
ax.set_axis_off()
continue
gdf = panels_3857[key]
gdf.plot(
ax=ax,
column="value",
cmap=cmap,
markersize=args.point_size,
alpha=args.point_alpha,
vmin=vmin,
vmax=vmax,
linewidth=0.0,
)
attrib = None if not args.hide_attribution else False
cx.add_basemap(
ax,
source=cx.providers.Esri.WorldImagery,
alpha=0.95,
attribution=attrib,
)
if hotspots is not None and key.startswith("Y"):
yy = int(key[1:])
sub = hotspots[
(hotspots["city"] == city)
& (hotspots["year"].astype(int) == yy)
& (hotspots["kind"] == "forecast")
]
if not sub.empty:
src = infer_xy_crs(
sub,
mode=args.coords_mode,
utm_epsg=args.utm_epsg,
)
hot = to_web_mercator(sub, src_crs=src)
if args.hotspot_field and (
args.hotspot_field in hot.columns
):
hot.plot(
ax=ax,
column=args.hotspot_field,
cmap="hot_r",
markersize=args.hotspot_size,
alpha=args.hotspot_alpha,
linewidth=0.0,
legend=False,
)
else:
hot.plot(
ax=ax,
color=args.hotspot_color,
markersize=args.hotspot_size,
alpha=args.hotspot_alpha,
linewidth=0.0,
)
ax.set_axis_off()
if not show_panel_titles:
continue
if key == "obs":
ttl = f"{city} • {args.year_val} obs {cum_tag}"
elif key == "pred":
ttl = f"{city} • {args.year_val} pred {cum_tag}"
else:
ttl = f"{city} • {key[1:]} fcst {cum_tag}"
ax.set_title(
ttl,
loc="left",
fontweight="bold",
pad=4,
)
# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
[docs]
def plot_geo_cumulative_main(
argv: list[str] | None = None,
*,
prog: str | None = None,
) -> None:
args = parse_args(argv, prog=prog)
utils.set_paper_style(fontsize=args.font, dpi=args.dpi)
cum_tag = _cum_title_tag(args.subsidence_kind)
note = _cum_note(args.subsidence_kind)
show_legend = utils.str_to_bool(
args.show_legend, default=True
)
show_title = utils.str_to_bool(
args.show_title, default=True
)
show_pan_t = utils.str_to_bool(
args.show_panel_titles,
default=True,
)
show_labels = utils.str_to_bool(
args.show_labels, default=True
)
ns_val = load_val_csv(args.ns_val)
zh_val = load_val_csv(args.zh_val)
ns_fut = load_future_csv(args.ns_future)
zh_fut = load_future_csv(args.zh_future)
ns_pan = build_city_panels(
ns_val,
ns_fut,
start_year=args.start_year,
year_val=args.year_val,
years_forecast=list(args.years_forecast),
subs_kind=args.subsidence_kind,
)
zh_pan = build_city_panels(
zh_val,
zh_fut,
start_year=args.start_year,
year_val=args.year_val,
years_forecast=list(args.years_forecast),
subs_kind=args.subsidence_kind,
)
col_keys: list[str] = []
if ("obs" in ns_pan) or ("obs" in zh_pan):
col_keys.append("obs")
if ("pred" in ns_pan) or ("pred" in zh_pan):
col_keys.append("pred")
for yy in args.years_forecast:
k = f"Y{int(yy)}"
if (k in ns_pan) or (k in zh_pan):
col_keys.append(k)
if not col_keys:
raise RuntimeError(
"No panels found. Check years/CSVs."
)
# CRS conversion per city (supports lon/lat or UTM)
ns_crs = infer_xy_crs(
ns_val,
mode=args.coords_mode,
utm_epsg=args.utm_epsg,
)
zh_crs = infer_xy_crs(
zh_val,
mode=args.coords_mode,
utm_epsg=args.utm_epsg,
)
ns_3857: dict[str, gpd.GeoDataFrame] = {}
zh_3857: dict[str, gpd.GeoDataFrame] = {}
for k in col_keys:
if k in ns_pan:
ns_3857[k] = to_web_mercator(
ns_pan[k], src_crs=ns_crs
)
if k in zh_pan:
zh_3857[k] = to_web_mercator(
zh_pan[k], src_crs=zh_crs
)
# Global color scale
vals: list[np.ndarray] = []
for dct in (ns_pan, zh_pan):
for k in col_keys:
if k in dct:
vals.append(dct[k]["value"].to_numpy())
vmin, vmax = clip_bounds(vals, clip=args.clip)
cmap = plt.get_cmap(args.cmap)
# Optional hotspots
hot_df: pd.DataFrame | None = None
if args.hotspot_csv:
hp = utils.as_path(args.hotspot_csv)
if hp.exists():
hot_df = pd.read_csv(hp)
utils.ensure_columns(
hot_df,
aliases={
"coord_x": ("coord_x", "x", "lon", "lng"),
"coord_y": ("coord_y", "y", "lat"),
},
)
ncols = len(col_keys)
fig, axes = plt.subplots(
2,
ncols,
figsize=(3.0 * ncols, 6.0),
constrained_layout=False,
)
axes = np.asarray(axes).reshape(2, ncols)
plt.subplots_adjust(
left=0.04,
right=0.86 if show_legend else 0.98,
top=0.93,
bottom=0.06,
wspace=0.05,
hspace=0.12,
)
draw_city_row(
axes=axes,
row_idx=0,
city="Nansha",
panels_3857=ns_3857,
col_keys=col_keys,
cmap=cmap,
cum_tag=cum_tag,
vmin=vmin,
vmax=vmax,
args=args,
show_panel_titles=show_pan_t,
hotspots=hot_df,
)
draw_city_row(
axes=axes,
row_idx=1,
city="Zhongshan",
panels_3857=zh_3857,
col_keys=col_keys,
cum_tag=cum_tag,
cmap=cmap,
vmin=vmin,
vmax=vmax,
args=args,
show_panel_titles=show_pan_t,
hotspots=hot_df,
)
# Shared colorbar
if show_legend:
norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax)
sm = plt.cm.ScalarMappable(norm=norm, cmap=cmap)
sm.set_array([])
cax = fig.add_axes([0.88, 0.20, 0.02, 0.60])
cb = fig.colorbar(sm, cax=cax)
if show_labels:
lab = utils.label("subsidence_cum")
cb.set_label(
f"{lab} since {int(args.start_year)}"
)
if show_legend and show_labels:
lab = utils.label("subsidence_cum")
cb.set_label(
f"{lab} since {int(args.start_year)} ({note})"
)
if show_title:
default = (
f"Cumulative subsidence since "
f"{int(args.start_year)} "
f"({note}) on satellite basemap"
)
# Suptitle
if show_title:
default = (
f"Cumulative subsidence since {int(args.start_year)} "
"on satellite basemap"
)
ttl = utils.resolve_title(
default=default, title=args.title
)
fig.suptitle(
ttl,
y=0.98,
fontsize=11,
fontweight="bold",
)
out = utils.resolve_fig_out(args.out)
utils.save_figure(
fig,
out,
dpi=int(args.dpi),
)
# utils.ensure_dir(out.parent)
# png = out.with_suffix(".png")
# pdf = out.with_suffix(".pdf")
# fig.savefig(png, bbox_inches="tight")
# fig.savefig(pdf, bbox_inches="tight")
# print(f"[OK] Saved: {png}")
# print(f"[OK] Saved: {pdf}")
[docs]
def main(
*,
prog: str | None = None,
) -> None:
plot_geo_cumulative_main(prog=prog)
if __name__ == "__main__":
main()