Source code for geoprior.scripts.extend_utils

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 - https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>
r"""Utility helpers for forecast extension scripts."""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import (
    Any,
    Literal,
)

import numpy as np
import pandas as pd

from . import utils

SubsKind = Literal["cumulative", "rate", "increment"]
OutKind = Literal["same", "cumulative", "rate"]
Method = Literal["linear_last", "linear_fit"]
UncGrowth = Literal["hold", "sqrt", "linear"]


_ALIASES: dict[str, tuple[str, ...]] = {
    "sample_idx": ("sample_idx", "sample_id", "sample"),
    "forecast_step": ("forecast_step", "forecast_s", "h"),
    "coord_t": ("coord_t", "year", "t"),
    "coord_x": ("coord_x", "lon", "longitude", "x"),
    "coord_y": ("coord_y", "lat", "latitude", "y"),
    "subsidence_q10": ("subsidence_q10", "q10", "p10"),
    "subsidence_q50": ("subsidence_q50", "q50", "p50"),
    "subsidence_q90": ("subsidence_q90", "q90", "p90"),
    "subsidence_actual": ("subsidence_actual", "actual"),
    "subsidence_unit": ("subsidence_unit", "unit", "units"),
    "calibration_factor": (
        "calibration_factor",
        "cal_factor",
    ),
    "is_calibrated": ("is_calibrated", "calibrated"),
}


[docs] @dataclass(frozen=True) class ExtendCfg: kind: SubsKind = "cumulative" out_kind: OutKind = "same" method: Method = "linear_fit" window: int = 3 unc_growth: UncGrowth = "sqrt" unc_scale: float = 1.0 group_col: str = "sample_idx" time_col: str = "coord_t"
def _as_path(x: Any) -> Path: return utils.as_path(x) def _canonize( df: pd.DataFrame, *, required: Sequence[str], where: str, ) -> pd.DataFrame: utils.ensure_columns(df, aliases=_ALIASES) miss = [c for c in required if c not in df.columns] if miss: raise KeyError(f"{where}: missing {miss}") return df def _coerce( df: pd.DataFrame, *, need_actual: bool, cc: ExtendCfg, ) -> pd.DataFrame: d = df.copy() base = [ cc.group_col, "forecast_step", cc.time_col, "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", ] if need_actual: base.append("subsidence_actual") for c in base: d[c] = pd.to_numeric(d[c], errors="coerce") d = d.dropna(subset=base).copy() d[cc.group_col] = d[cc.group_col].astype(int) d["forecast_step"] = d["forecast_step"].astype(int) d[cc.time_col] = d[cc.time_col].astype(int) return d
[docs] def load_eval_csv( path: Any, *, cc: ExtendCfg | None = None, ) -> pd.DataFrame: cc2 = cc or ExtendCfg() df = pd.read_csv(_as_path(path)) _canonize( df, required=[ cc2.group_col, "forecast_step", cc2.time_col, "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", "subsidence_actual", ], where="eval-forecast", ) return _coerce(df, need_actual=True, cc=cc2)
[docs] def load_future_csv( path: Any, *, cc: ExtendCfg | None = None, ) -> pd.DataFrame: cc2 = cc or ExtendCfg() df = pd.read_csv(_as_path(path)) _canonize( df, required=[ cc2.group_col, "forecast_step", cc2.time_col, "coord_x", "coord_y", "subsidence_q10", "subsidence_q50", "subsidence_q90", ], where="future-forecast", ) return _coerce(df, need_actual=False, cc=cc2)
def _infer_unit(df: pd.DataFrame) -> str: if "subsidence_unit" in df.columns: s = df["subsidence_unit"].dropna() if not s.empty: return str(s.iloc[0]).strip().lower() return "mm" def _unit_factor(u_from: str, u_to: str) -> float: a = (u_from or "").strip().lower() b = (u_to or "").strip().lower() if a == b: return 1.0 if a == "m" and b == "mm": return 1000.0 if a == "mm" and b == "m": return 1e-3 raise ValueError(f"Unsupported: {u_from}->{u_to}")
[docs] def to_mm(df: pd.DataFrame) -> pd.DataFrame: u0 = _infer_unit(df) if u0 == "mm": return df fac = _unit_factor(u0, "mm") d = df.copy() for c in [ "subsidence_q10", "subsidence_q50", "subsidence_q90", "subsidence_actual", ]: if c in d.columns: d[c] = pd.to_numeric(d[c], errors="coerce") * fac d["subsidence_unit"] = "mm" return d
def _targets( y_max: int, *, add_years: int, years: list[int] | None, ) -> list[int]: if years: ys = sorted(set(int(y) for y in years)) return [y for y in ys if y > int(y_max)] return [int(y_max) + i for i in range(1, add_years + 1)] def _growth(mode: UncGrowth, h: int) -> float: if mode == "hold": return 1.0 if mode == "linear": return float(1 + h) return float(np.sqrt(1.0 + float(h))) def _nanline_fit( t: np.ndarray, y: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: w = np.isfinite(y).astype(float) ws = np.sum(w, axis=1) ws = np.where(ws <= 0, 1.0, ws) t0 = np.sum(w * t[None, :], axis=1) / ws y0 = np.sum(w * y, axis=1) / ws dt = t[None, :] - t0[:, None] dy = y - y0[:, None] num = np.sum(w * dt * dy, axis=1) den = np.sum(w * dt * dt, axis=1) den = np.where(den <= 0, 1.0, den) a = num / den b = y0 - a * t0 return a, b def _predict( t_hist: np.ndarray, y_hist: np.ndarray, *, t_new: int, method: Method, ) -> np.ndarray: if method == "linear_last": y1 = y_hist[:, -1] y0 = y_hist[:, -2] t1 = float(t_hist[-1]) t0 = float(t_hist[-2]) dt = t1 - t0 if t1 != t0 else 1.0 a = (y1 - y0) / dt b = y1 - a * t1 else: a, b = _nanline_fit(t_hist, y_hist) return a * float(t_new) + b def _wide( df: pd.DataFrame, *, col: str, cc: ExtendCfg, ) -> pd.DataFrame: return df.pivot_table( index=cc.group_col, columns=cc.time_col, values=col, aggfunc="mean", ) def _base_from_eval( ev: pd.DataFrame, *, year: int, col: str, ids: pd.Index, cc: ExtendCfg, ) -> np.ndarray | None: d = ev.loc[ev[cc.time_col].astype(int).eq(int(year))] if d.empty: return None w = _wide(d, col=col, cc=cc) if int(year) not in w.columns: return None return w[int(year)].reindex(ids).to_numpy(float) def _cum_to_rate_all( wc: pd.DataFrame, *, years: np.ndarray, base: np.ndarray | None, ) -> np.ndarray: w = wc.reindex(columns=years).to_numpy(float) r = np.full_like(w, np.nan, dtype=float) for i in range(int(years.size)): if i == 0: if base is None: continue r[:, i] = (w[:, i] - base) / 1.0 continue dt = float(years[i] - years[i - 1]) if dt == 0: dt = 1.0 r[:, i] = (w[:, i] - w[:, i - 1]) / dt return r def _meta_last( fut: pd.DataFrame, *, ids: pd.Index, cc: ExtendCfg, ) -> pd.DataFrame: cols = [ "coord_x", "coord_y", "subsidence_unit", "calibration_factor", "is_calibrated", ] keep = [c for c in cols if c in fut.columns] if not keep: return pd.DataFrame(index=ids) g = fut.sort_values([cc.group_col, cc.time_col]) last = g.groupby(cc.group_col, sort=False).tail(1) m = last.set_index(cc.group_col)[keep] return m.reindex(ids)
[docs] def extend_future_df( future_df: pd.DataFrame, *, eval_df: pd.DataFrame | None = None, add_years: int = 1, years: list[int] | None = None, cc: ExtendCfg | None = None, ) -> pd.DataFrame: """ Extend a future forecast by 1-2+ years. This is extrapolation: it assumes a smooth (linear) trend in the last window of years. """ cc2 = cc or ExtendCfg() fut = to_mm(future_df) ev = to_mm(eval_df) if eval_df is not None else None fut = fut.sort_values( [cc2.group_col, cc2.time_col] ).copy() y_max = int(np.nanmax(fut[cc2.time_col].to_numpy(int))) tgt = _targets( y_max, add_years=add_years, years=years, ) if not tgt: return fut w10 = _wide(fut, col="subsidence_q10", cc=cc2) w50 = _wide(fut, col="subsidence_q50", cc=cc2) w90 = _wide(fut, col="subsidence_q90", cc=cc2) ids = w50.index.astype(int) years_all = np.array(sorted(w50.columns.astype(int))) k = int(max(2, cc2.window)) if years_all.size < k: k = int(years_all.size) t_fit = years_all[-k:] if cc2.kind == "cumulative": b_year = int(years_all[0] - 1) b10 = None b50 = None b90 = None if ev is not None: b10 = _base_from_eval( ev, year=b_year, col="subsidence_q10", ids=ids, cc=cc2, ) b50 = _base_from_eval( ev, year=b_year, col="subsidence_q50", ids=ids, cc=cc2, ) b90 = _base_from_eval( ev, year=b_year, col="subsidence_q90", ids=ids, cc=cc2, ) r10_all = _cum_to_rate_all( w10, years=years_all, base=b10, ) r50_all = _cum_to_rate_all( w50, years=years_all, base=b50, ) r90_all = _cum_to_rate_all( w90, years=years_all, base=b90, ) else: r10_all = w10.reindex( columns=years_all, ).to_numpy(float) r50_all = w50.reindex( columns=years_all, ).to_numpy(float) r90_all = w90.reindex( columns=years_all, ).to_numpy(float) r10 = r10_all[:, -k:] r50 = r50_all[:, -k:] r90 = r90_all[:, -k:] lo_gap = r50[:, -1] - r10[:, -1] hi_gap = r90[:, -1] - r50[:, -1] lo_gap = np.where(np.isfinite(lo_gap), lo_gap, 0.0) hi_gap = np.where(np.isfinite(hi_gap), hi_gap, 0.0) out_kind = cc2.out_kind if out_kind == "same": if cc2.kind == "cumulative": out_kind = "cumulative" else: out_kind = "rate" if out_kind == "cumulative": if cc2.kind == "cumulative": c10 = w10.reindex( columns=years_all, ).to_numpy(float) c50 = w50.reindex( columns=years_all, ).to_numpy(float) c90 = w90.reindex( columns=years_all, ).to_numpy(float) c10 = c10[:, -1] c50 = c50[:, -1] c90 = c90[:, -1] else: # Build a relative cumulative anchor from rates. dtv = np.diff( years_all, prepend=int(years_all[0]) - 1, ).astype(float) dtv = np.where(dtv <= 0.0, 1.0, dtv) rr10 = np.nan_to_num(r10_all, nan=0.0) rr50 = np.nan_to_num(r50_all, nan=0.0) rr90 = np.nan_to_num(r90_all, nan=0.0) c10 = np.cumsum(rr10 * dtv[None, :], axis=1) c50 = np.cumsum(rr50 * dtv[None, :], axis=1) c90 = np.cumsum(rr90 * dtv[None, :], axis=1) c10 = c10[:, -1] c50 = c50[:, -1] c90 = c90[:, -1] last_step = fut.groupby(cc2.group_col)[ "forecast_step" ].max() last_step = last_step.reindex(ids).fillna(0).astype(int) meta = _meta_last(fut, ids=ids, cc=cc2) y_prev = int(years_all[-1]) new_parts: list[pd.DataFrame] = [] for j, y_new in enumerate(tgt, start=1): r50_new = _predict( t_fit, r50, t_new=int(y_new), method=cc2.method, ) fac = _growth(cc2.unc_growth, j) fac = fac * float(cc2.unc_scale) r10_new = r50_new - lo_gap * fac r90_new = r50_new + hi_gap * fac if out_kind == "rate": q10 = r10_new q50 = r50_new q90 = r90_new else: dt = float(int(y_new) - int(y_prev)) if not np.isfinite(dt) or dt <= 0: dt = 1.0 c10 = c10 + r10_new * dt c50 = c50 + r50_new * dt c90 = c90 + r90_new * dt q10 = c10 q50 = c50 q90 = c90 df_new = pd.DataFrame( { cc2.group_col: ids.to_numpy(int), "forecast_step": last_step.to_numpy(int) + j, cc2.time_col: int(y_new), "subsidence_q10": q10.astype(float), "subsidence_q50": q50.astype(float), "subsidence_q90": q90.astype(float), "extended": True, "extend_kind": cc2.kind, "extend_method": cc2.method, "unc_growth": cc2.unc_growth, } ) if not meta.empty: df_new = df_new.join(meta.reset_index(drop=True)) new_parts.append(df_new) t_fit = np.append(t_fit[1:], int(y_new)) r50 = np.column_stack([r50[:, 1:], r50_new]) y_prev = int(y_new) out = pd.concat([fut] + new_parts, ignore_index=True) out = out.sort_values( [cc2.group_col, cc2.time_col] ).copy() out[cc2.group_col] = out[cc2.group_col].astype(int) out[cc2.time_col] = out[cc2.time_col].astype(int) out["forecast_step"] = out["forecast_step"].astype(int) if "subsidence_unit" in out.columns: out["subsidence_unit"] = "mm" return out
[docs] def extend_future_csv_file( future_csv: Any, *, eval_csv: Any | None = None, out_csv: Any | None = None, add_years: int = 1, years: list[int] | None = None, cc: ExtendCfg | None = None, ) -> Path: """ Load -> extend -> write an updated future CSV. If out_csv is relative, it is placed in scripts/out/. """ cc2 = cc or ExtendCfg() fut = load_future_csv(future_csv, cc=cc2) ev = None if eval_csv is not None: ev = load_eval_csv(eval_csv, cc=cc2) out = extend_future_df( fut, eval_df=ev, add_years=add_years, years=years, cc=cc2, ) name = out_csv or "future_extended.csv" p = utils.resolve_out_out(str(name)) p.parent.mkdir(parents=True, exist_ok=True) out.to_csv(p, index=False) return p
# How to use it (quick examples) # from scripts import forecast_extend_utils as fx # cc = fx.ExtendCfg( # kind="cumulative", # out_kind="same", # method="linear_fit", # window=3, # unc_growth="sqrt", # unc_scale=1.15, # ) # out_path = fx.extend_future_csv_file( # future_csv="..._future_calibrated.csv", # eval_csv="..._eval_calibrated.csv", # out_csv="future_plus_2026.csv", # years=[2026], # cc=cc, # ) # print(out_path) # Produce annual rate instead of cumulative # cc = fx.ExtendCfg( # kind="cumulative", # out_kind="rate", # method="linear_last", # unc_growth="sqrt", # ) # df_ext = fx.extend_future_df( # future_df=future_df, # eval_df=eval_df, # years=[2026, 2027], # cc=cc, # )