Source code for geoprior.utils.forecast_utils

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# https://lkouadio.com
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>

"""
Forecast utilities.
"""

from __future__ import annotations

import inspect
import json
import logging
import os
import re
import warnings
from collections.abc import (
    Callable,
    Iterable,
    Mapping,
    MutableMapping,
    Sequence,
)
from datetime import date, datetime
from pathlib import Path
from typing import (
    Any,
    Literal,
)

import matplotlib.pyplot as plt
import matplotlib.style as style
import numpy as np
import pandas as pd
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
)
from sklearn.preprocessing import MinMaxScaler

from ..core.checks import check_empty, check_spatial_columns
from ..core.handlers import columns_manager
from ..core.io import is_data_readable
from ..decorators import isdf
from ..logging import get_logger
from ..metrics._registry import get_metric
from .calibrate import calibrate_quantile_forecasts
from .generic_utils import ensure_directory_exists, vlog
from .scale_metrics import _resolve_stage1_entry
from .subsidence_utils import (
    convert_target_units_df,
)
from .validator import is_frame

logger = get_logger(__name__)

__all__ = [
    "detect_forecast_type",
    "format_forecast_dataframe",
    "get_value_prefixes",
    "get_value_prefixes_in",
    "pivot_forecast_dataframe",
    "get_step_names",
    "stack_quantile_predictions",
    "adjust_time_predictions",
    "add_forecast_times",
    "pivot_forecast",
    "plot_reliability_diagram",
    "format_and_forecast",
    "evaluate_forecast",
]

_DIGIT_RE = re.compile(r"\d+")


[docs] def evaluate_forecast( eval_data: str | os.PathLike | pd.DataFrame, *, target_name: str = "subsidence", column_map: Mapping[str, Any] | None = None, quantile_interval: tuple[float, float] = (0.1, 0.9), per_horizon: bool = False, extra_metrics: Sequence[str] | Mapping[str, Callable[..., Any]] | None = None, extra_metric_kwargs: Mapping[str, Mapping[str, Any]] | None = None, overall_key: str | None = "__overall__", savefile: str | os.PathLike | bool | None = None, save_format: str = ".json", time_as_str: bool = True, verbose: int = 1, logger: logging.Logger | None = None, ) -> dict[str, Any] | pd.DataFrame: """ Evaluate forecast diagnostics from an evaluation DataFrame. This helper consumes the ``df_eval`` output from :func:`format_and_forecast` (or a compatible DataFrame) and computes aggregate metrics such as MAE, MSE, :math:`R^2`, coverage, and sharpness. It can also optionally evaluate metrics per forecast horizon and apply additional user-defined metrics. By default it expects the following columns: - ``'sample_idx'`` - ``'forecast_step'`` - ``'coord_t'`` (time) - Quantile or point-prediction columns for the target, e.g.: - Quantile mode: ``f'{target_name}_q10'``, ``f'{target_name}_q50'``, ``f'{target_name}_q90'``, ... - Point mode: ``f'{target_name}_pred'``. - Actual column: ``f'{target_name}_actual'``. A flexible ``column_map`` allows remapping these logical roles to arbitrary column names, e.g.:: column_map = { 'coord_t': 'date', 'actual': 'true_subs', 'pred': 'subs_predicted', } or, for quantile columns:: column_map = { 'coord_t': 'date', 'quantiles': { 0.1: 'subs_q10', 0.5: 'subs_q50', 0.9: 'subs_q90', }, } Parameters ---------- eval_data : str, path-like, or pandas.DataFrame Either a path to a CSV file containing the evaluation DataFrame (as saved by :func:`format_and_forecast`) or an in-memory DataFrame. target_name : str, default='subsidence' Base name for the target columns. Used to infer default column names such as ``f'{target_name}_q10'``, ``f'{target_name}_pred'``, and ``f'{target_name}_actual'``. column_map : dict, optional Optional mapping to override default column names. The following keys are recognized: - ``'sample_idx'`` : sample index column name (default ``'sample_idx'``). - ``'forecast_step'`` : horizon index column name (default ``'forecast_step'``). - ``'coord_t'`` : time coordinate column (default ``'coord_t'``). - ``'actual'`` : name or list of names for the actual target column(s). Currently a single column is supported; default ``f'{target_name}_actual'``. - ``'pred'`` : point prediction column for non-quantile mode, default ``f'{target_name}_pred'``. - ``'quantiles'`` : * If a mapping: ``{q: col_name}`` for quantile levels, where ``q`` is a float in (0, 1). * If a sequence of column names, the quantile value will be inferred from suffix patterns like ``f'{target_name}_q{int(q*100):d}'``. quantile_interval : tuple of float, default=(0.1, 0.9) Interval (lower, upper) used for coverage and sharpness metrics, typically corresponding to an 80% interval between Q10 and Q90. per_horizon : bool, default=False If ``True``, compute per-horizon MAE/MSE/R² grouped by the ``forecast_step`` column. extra_metrics : sequence of str or mapping, optional Optional additional metrics to compute. - If a sequence of strings (e.g. ``['pss', 'pit']``), each name is resolved via :func:`geoprior.metrics._registry.get_metric`. If the name is not present in the registry, an error is raised, prompting the user to pass a callable instead. - If a mapping ``{name: func}``, each ``func`` is called as:: func(y_true, y_pred, **extra_metric_kwargs.get(name, {})) where ``y_pred`` is the median (Q50) or point forecast. For more complex metrics that require full quantile structure or temporal sequences, pass a suitable wrapper function that internally uses the DataFrame as needed. extra_metric_kwargs : mapping, optional Optional mapping of per-metric keyword arguments. Keys must match the names in ``extra_metrics``. Each value is a dict of kwargs forwarded to the corresponding metric function. savefile : str, path-like, or bool, optional If provided, metrics are saved to disk. - If ``True``: a filename is auto-generated near ``eval_data`` (if it is a path) or in the current working directory. - If a string/path without extension: the extension is taken from ``save_format``. - If a string/path with extension: that extension takes precedence over ``save_format``. save_format : {'.json', 'json', '.csv', 'csv'}, default='.json' Output format when ``savefile`` is truthy. JSON preserves nested structure; CSV is flattened into a tall table. - For JSON, the function returns the metrics dictionary. - For CSV, the function returns the metrics DataFrame. time_as_str : bool, default=True If ``True``, time keys in the result dictionary are converted to strings (useful for JSON serialization). If there is only a single time value, the result is flattened and the time key is omitted. verbose : int, default=1 Verbosity level passed to :func:`vlog`. logger : logging.Logger, optional Optional logger instance used by :func:`vlog`. Returns ------- results : dict or pandas.DataFrame If ``save_format`` is JSON (default), returns a dict: - Single time value: .. code-block:: python { "overall_mae": ..., "overall_mse": ..., "overall_r2": ..., "coverage80": ..., "sharpness80": ..., "per_horizon_mae": {1: ..., 2: ..., ...}, ... } - Multiple time values: .. code-block:: python { "2021": { ...metrics... }, "2022": { ...metrics... }, } If ``save_format`` is CSV, returns a DataFrame with flattened rows: - Columns include: ``coord_t``, ``metric``, ``horizon``, and ``value``. Notes ----- - Default metrics in *quantile* mode: - ``overall_mae``, ``overall_mse``, ``overall_r2`` - ``coverage80`` and ``sharpness80`` (using the requested interval, e.g., Q10–Q90) If ``per_horizon=True``, also: - ``per_horizon_mae``, ``per_horizon_mse``, ``per_horizon_r2`` (each a mapping from horizon index to score). - Default metrics in *point* mode (no quantiles): - ``mae``, ``mse``, ``r2`` And optionally, if ``per_horizon=True``: - ``per_horizon_mae``, ``per_horizon_mse``, ``per_horizon_r2``. """ vlog( "[evaluate_forecast] Starting metrics computation.", verbose=verbose, level=1, logger=logger, ) # ------------------------------------------------------------------ # 1. Load DataFrame # ------------------------------------------------------------------ source_path: Path | None = None if isinstance(eval_data, str | os.PathLike): source_path = Path(eval_data) df = pd.read_csv(source_path) vlog( f"[evaluate_forecast] Loaded eval_data from '{source_path}'.", verbose=verbose, level=2, logger=logger, ) elif isinstance(eval_data, pd.DataFrame): df = eval_data.copy() vlog( "[evaluate_forecast] Using in-memory eval DataFrame.", verbose=verbose, level=2, logger=logger, ) else: raise TypeError( "eval_data must be a path or a pandas.DataFrame. " f"Got type={type(eval_data).__name__!r}." ) column_map = dict(column_map or {}) # Logical column names (with defaults) # sample_col = column_map.get("sample_idx", "sample_idx") step_col = column_map.get( "forecast_step", "forecast_step" ) time_col = column_map.get("coord_t", "coord_t") # Actual column (required) actual_spec = column_map.get( "actual", f"{target_name}_actual" ) if isinstance(actual_spec, str): actual_cols = [actual_spec] else: actual_cols = list(actual_spec) if len(actual_cols) != 1: raise ValueError( "evaluate_forecast currently supports exactly one " "actual target column. Got " f"{len(actual_cols)}: {actual_cols!r}." ) actual_col = actual_cols[0] if actual_col not in df.columns: raise KeyError( f"Actual column '{actual_col}' is not present in " f"DataFrame columns={list(df.columns)!r}." ) # ------------------------------------------------------------------ # 2. Detect quantile vs point mode and build quantile map # ------------------------------------------------------------------ quantiles_spec = column_map.get("quantiles", None) quantile_map: dict[float, str] = {} if quantiles_spec is not None: # User explicitly provided quantile columns if isinstance(quantiles_spec, Mapping): for q, col in quantiles_spec.items(): q_float = float(q) if col not in df.columns: raise KeyError( f"Quantile column '{col}' for q={q_float} " "not present in DataFrame." ) quantile_map[q_float] = col else: # Sequence of column names; try to infer q from suffix from re import escape, match for col in quantiles_spec: if col not in df.columns: raise KeyError( f"Quantile column '{col}' not present in " "DataFrame." ) # Pattern: f'{target_name}_q{int(q*100)}' pat = rf"^{escape(target_name)}_q(\d+)$" m = match(pat, col) if not m: raise ValueError( f"Cannot infer quantile level from column " f"name '{col}'. Expected pattern " f"'{target_name}_qXX'." ) q_int = int(m.group(1)) quantile_map[q_int / 100.0] = col else: # Auto-detect quantiles from column names from re import escape, match pat = rf"^{escape(target_name)}_q(\d+)$" for col in df.columns: m = match(pat, col) if m: q_int = int(m.group(1)) quantile_map[q_int / 100.0] = col quantile_mode = len(quantile_map) > 0 # Prediction (median / point) column if quantile_mode: # Choose q closest to 0.5 as "median" median_q = min( quantile_map.keys(), key=lambda q: abs(q - 0.5) ) median_col = quantile_map[median_q] vlog( f"[evaluate_forecast] Detected quantile mode with " f"{len(quantile_map)} quantiles; median q~{median_q:.2f} " f"via column '{median_col}'.", verbose=verbose, level=2, logger=logger, ) else: pred_col = column_map.get( "pred", f"{target_name}_pred" ) if pred_col not in df.columns: raise KeyError( "Point-prediction mode inferred, but prediction " f"column '{pred_col}' is missing." ) median_col = pred_col vlog( f"[evaluate_forecast] Detected point mode using " f"column '{median_col}' for predictions.", verbose=verbose, level=2, logger=logger, ) # ------------------------------------------------------------------ # 3. Group by time (coord_t) if available # ------------------------------------------------------------------ has_time = time_col in df.columns if has_time: groups = df.groupby(time_col) vlog( f"[evaluate_forecast] Grouping by time column " f"'{time_col}'. Found {groups.ngroups} group(s).", verbose=verbose, level=2, logger=logger, ) else: # Single pseudo-group with key None groups = [(None, df)] vlog( "[evaluate_forecast] No time column present; " "treating entire DataFrame as one group.", verbose=verbose, level=2, logger=logger, ) # ------------------------------------------------------------------ # 4. Prepare extra metrics (names -> callables) # ------------------------------------------------------------------ metric_fns: dict[str, Callable[..., Any]] = {} if extra_metrics is not None: if isinstance(extra_metrics, Mapping): metric_fns = dict(extra_metrics) else: # Sequence of names: resolve via get_metric for name in extra_metrics: try: metric_fns[name] = get_metric(name) except KeyError as exc: raise ValueError( f"Metric name '{name}' not found in " "geoprior.metrics registry. " "Please either:\n" " - register it in METRIC_REGISTRY, or\n" " - pass an explicit callable via " "`extra_metrics={'name': fn}`." ) from exc extra_metric_kwargs = dict(extra_metric_kwargs or {}) # ------------------------------------------------------------------ # 5. Core metric computation loop # ------------------------------------------------------------------ results_by_time: MutableMapping[Any, dict[str, Any]] = {} # Pre-resolve coverage & sharpness metrics, if quantile mode if quantile_mode: try: coverage_fn = get_metric("coverage_score") except KeyError: coverage_fn = None try: miw_fn = get_metric("mean_interval_width_score") except KeyError: miw_fn = None else: coverage_fn = None miw_fn = None q_lower_target, q_upper_target = quantile_interval for t_val, g in groups if has_time else groups: # When has_time is False, groups is a list of tuples # (None, df). When True, groups is a pandas GroupBy and # iteration yields (t_val, g) pairs directly. if not has_time: t_key = None else: t_key = t_val y_true = g[actual_col].to_numpy() y_pred = g[median_col].to_numpy() metrics_here: dict[str, Any] = {} # ---- 5a. Default deterministic metrics ------------------------ if quantile_mode: metrics_here["overall_mae"] = float( mean_absolute_error(y_true, y_pred) ) metrics_here["overall_mse"] = float( mean_squared_error(y_true, y_pred) ) metrics_here["overall_rmse"] = float( np.sqrt(metrics_here["overall_mse"]) ) # R² requires at least 2 samples if y_true.size >= 2: metrics_here["overall_r2"] = float( r2_score(y_true, y_pred) ) else: metrics_here["overall_r2"] = np.nan else: metrics_here["mae"] = float( mean_absolute_error(y_true, y_pred) ) metrics_here["mse"] = float( mean_squared_error(y_true, y_pred) ) if y_true.size >= 2: metrics_here["r2"] = float( r2_score(y_true, y_pred) ) else: metrics_here["r2"] = np.nan # ---- 5b. Coverage and sharpness (quantile mode only) ---------- if ( quantile_mode and coverage_fn is not None and miw_fn is not None ): # Choose quantiles closest to requested interval qs_sorted = sorted(quantile_map.keys()) q_low = min( qs_sorted, key=lambda q: abs(q - q_lower_target), ) q_high = min( qs_sorted, key=lambda q: abs(q - q_upper_target), ) y_lower = g[quantile_map[q_low]].to_numpy() y_upper = g[quantile_map[q_high]].to_numpy() # coverage80 try: cov_val = coverage_fn( y_true, y_lower, y_upper, nan_policy="omit", verbose=0, ) except TypeError: cov_val = coverage_fn( y_true, y_lower, y_upper ) metrics_here["coverage80"] = float(cov_val) # sharpness80 (mean interval width) try: miw_val = miw_fn( y_lower, y_upper, nan_policy="omit", verbose=0, ) except TypeError: miw_val = miw_fn(y_lower, y_upper) metrics_here["sharpness80"] = float(miw_val) # ---- 5c. Per-horizon deterministic metrics -------------------- if per_horizon and step_col in g.columns: per_mae: dict[int, float] = {} per_mse: dict[int, float] = {} per_rmse: dict[int, float] = {} per_r2: dict[int, float] = {} for h, g_h in g.groupby(step_col): yt_h = g_h[actual_col].to_numpy() yp_h = g_h[median_col].to_numpy() h_int = int(h) per_mae[h_int] = float( mean_absolute_error(yt_h, yp_h) ) mse_h = float(mean_squared_error(yt_h, yp_h)) per_mse[h_int] = mse_h per_rmse[h_int] = float(np.sqrt(mse_h)) if yt_h.size >= 2: per_r2[h_int] = float( r2_score(yt_h, yp_h) ) else: per_r2[h_int] = np.nan metrics_here["per_horizon_mae"] = per_mae metrics_here["per_horizon_mse"] = per_mse metrics_here["per_horizon_rmse"] = per_rmse metrics_here["per_horizon_r2"] = per_r2 # ---- 5d. Extra metrics ---------------------------------------- apply_extra_metrics( dest=metrics_here, y_true=y_true, y_pred=y_pred, metric_fns=metric_fns, extra_metric_kwargs=extra_metric_kwargs, horizon_steps=( g[step_col].to_numpy() if step_col in g.columns else None ), verbose=verbose, logger=logger, ) results_by_time[t_key] = metrics_here # ------------------------------------------------------------------ # 5bis. Overall totals across *all* rows (all times/horizons) # ------------------------------------------------------------------ # When we group by time (e.g. coord_t) and each time slice corresponds # to a single forecast horizon, per-time "per_horizon_*" metrics will # naturally contain only one entry. Adding a global summary makes it # easier to track the model's overall behavior across the full eval # split (all horizons together). # overall_key: Optional[str] (or str but allow None) _write_overall = (overall_key is not None) and ( str(overall_key).strip() != "" ) if ( has_time and len(results_by_time) > 1 ) and _write_overall: y_true_all = df[actual_col].astype(float).to_numpy() y_pred_all = df[median_col].astype(float).to_numpy() horizon_all = ( df[step_col].to_numpy() if step_col in df.columns else None ) metrics_all: dict[str, Any] = {} metrics_all["overall_mae"] = float( mean_absolute_error(y_true_all, y_pred_all) ) metrics_all["overall_mse"] = float( mean_squared_error(y_true_all, y_pred_all) ) metrics_all["overall_rmse"] = float( np.sqrt(metrics_all["overall_mse"]) ) metrics_all["overall_r2"] = ( float(r2_score(y_true_all, y_pred_all)) if y_true_all.size >= 2 else float("nan") ) # Coverage / sharpness for overall (quantile mode) if quantile_mode: qs_sorted = sorted(quantile_map.keys()) q_low = min( qs_sorted, key=lambda q: abs(q - q_lower_target), ) q_high = min( qs_sorted, key=lambda q: abs(q - q_upper_target), ) y_lower_all = ( df[quantile_map[q_low]] .astype(float) .to_numpy() ) y_upper_all = ( df[quantile_map[q_high]] .astype(float) .to_numpy() ) if coverage_fn is not None and miw_fn is not None: try: cov_val = coverage_fn( y_true_all, y_lower_all, y_upper_all, nan_policy="omit", verbose=0, ) except TypeError: cov_val = coverage_fn( y_true_all, y_lower_all, y_upper_all ) metrics_all["coverage80"] = float(cov_val) try: miw_val = miw_fn( y_lower_all, y_upper_all, nan_policy="omit", verbose=0, ) except TypeError: miw_val = miw_fn(y_lower_all, y_upper_all) metrics_all["sharpness80"] = float(miw_val) # Per-horizon totals across all rows if horizon_all is not None: per_mae: dict[int, float] = {} per_mse: dict[int, float] = {} per_rmse: dict[int, float] = {} per_r2: dict[int, float] = {} for h in sorted(set(horizon_all.tolist())): h_int = int(h) mask_h = horizon_all == h yt_h = y_true_all[mask_h] yp_h = y_pred_all[mask_h] if yt_h.size == 0: continue per_mae[h_int] = float( mean_absolute_error(yt_h, yp_h) ) mse_h = float(mean_squared_error(yt_h, yp_h)) per_mse[h_int] = mse_h per_rmse[h_int] = float(np.sqrt(mse_h)) per_r2[h_int] = ( float(r2_score(yt_h, yp_h)) if yt_h.size >= 2 else float("nan") ) metrics_all["per_horizon_mae"] = per_mae metrics_all["per_horizon_mse"] = per_mse metrics_all["per_horizon_rmse"] = per_rmse metrics_all["per_horizon_r2"] = per_r2 # Extra metrics: reuse exact same calling logic as per-group apply_extra_metrics( dest=metrics_all, y_true=y_true_all, y_pred=y_pred_all, metric_fns=metric_fns, extra_metric_kwargs=extra_metric_kwargs, horizon_steps=horizon_all, verbose=verbose, logger=logger, ) # Put it under a configurable key (default "__overall__") results_by_time[str(overall_key)] = metrics_all # ------------------------------------------------------------------ # 6. Flatten for single vs multi time # ------------------------------------------------------------------ if len(results_by_time) == 1: # Single time or no time column → return metrics only final_result: dict[str, Any] | dict[Any, Any] final_result = next(iter(results_by_time.values())) else: # Multiple time values → dict keyed by time final_result = {} for k, v in results_by_time.items(): if time_as_str and k is not None: final_result[str(k)] = v else: final_result[k] = v # ------------------------------------------------------------------ # 7. Optionally save to disk # ------------------------------------------------------------------ if savefile: # Normalize save_format fmt = save_format.lower() if not fmt.startswith("."): fmt = f".{fmt}" # Determine output path if isinstance(savefile, bool): # Auto-generate filename base_dir = ( source_path.parent if source_path is not None else Path.cwd() ) ts = datetime.now().strftime("%Y%m%d-%H%M%S") if fmt in (".csv",): fname = ( f"eval_{target_name}_diagnostics_{ts}.csv" ) else: fname = f"eval_{target_name}_diagnostics_{ts}.json" out_path = base_dir / fname else: out_path = Path(savefile) if out_path.suffix: # Explicit extension takes precedence fmt = out_path.suffix.lower() else: out_path = out_path.with_suffix(fmt) out_path.parent.mkdir(parents=True, exist_ok=True) if fmt in (".json", "json"): # Ensure JSON-serializable (convert numpy scalars) def _to_jsonable(obj: Any) -> Any: if isinstance(obj, np.generic): return obj.item() if isinstance(obj, np.ndarray): return obj.tolist() return obj json_ready = {} if len(results_by_time) == 1: json_ready = { k: _to_jsonable(v) for k, v in final_result.items() # type: ignore[arg-type] } else: for ( t_key, metrics_here, ) in final_result.items(): # type: ignore[union-attr] json_ready[t_key] = { k: _to_jsonable(v) for k, v in metrics_here.items() } with open(out_path, "w", encoding="utf-8") as f: json.dump( json_ready, f, indent=2, ensure_ascii=False, ) vlog( f"[evaluate_forecast] Metrics saved to JSON at " f"'{out_path}'.", verbose=verbose, level=1, logger=logger, ) return final_result # dict elif fmt in (".csv", "csv"): # Flatten to long-form DataFrame rows = [] for ( t_key, metrics_here, ) in results_by_time.items(): t_val = t_key if time_as_str and t_val is not None: t_val = str(t_val) for m_name, m_val in metrics_here.items(): if isinstance(m_val, Mapping): # e.g. per_horizon metrics: dict[horizon -> value] for h, v in m_val.items(): rows.append( { "coord_t": t_val, "metric": m_name, "horizon": h, "value": v, } ) else: rows.append( { "coord_t": t_val, "metric": m_name, "horizon": None, "value": m_val, } ) df_out = pd.DataFrame(rows) df_out.to_csv(out_path, index=False) vlog( f"[evaluate_forecast] Metrics saved to CSV at " f"'{out_path}'.", verbose=verbose, level=1, logger=logger, ) return df_out else: raise ValueError( f"Unsupported save_format/extension '{fmt}'. " "Use '.json' or '.csv'." ) # No saving requested: just return in-memory result return final_result
def apply_extra_metrics( *, dest: dict[str, Any], y_true: np.ndarray, y_pred: np.ndarray, metric_fns: Mapping[str, Callable[..., Any]], extra_metric_kwargs: Mapping[str, Mapping[str, Any]], horizon_steps: np.ndarray | None = None, verbose: int = 1, logger: logging.Logger | None = None, ) -> None: """ Apply extra metrics into `dest` using robust calling heuristics. Supports: - fn(y_true=..., y_pred=..., **kwargs) - fn(y_true, y_pred, **kwargs) - fn(y_pred=..., **kwargs) or fn(y_pred, **kwargs) (prediction-only) If a metric supports `horizon_steps`, it will be passed when provided. """ for m_name, fn in metric_fns.items(): kwargs = dict( extra_metric_kwargs.get(m_name, {}) or {} ) # Pass horizon_steps if supported and available if horizon_steps is not None: try: sig = inspect.signature(fn) if "horizon_steps" in sig.parameters: kwargs["horizon_steps"] = horizon_steps except Exception: # If signature inspection fails, ignore. pass try: sig = inspect.signature(fn) param_names = list(sig.parameters.keys()) # Case 1: standard supervised metric: metric(y_true, y_pred, ...) if ("y_true" in param_names) and ( "y_pred" in param_names ): try: m_val = fn( y_true=y_true, y_pred=y_pred, **kwargs ) except TypeError: m_val = fn(y_true, y_pred, **kwargs) # Case 2: prediction-only metric: metric(y_pred, ...) elif param_names and param_names[0] in ( "y_pred", "y_forecast", "y_hat", ): # prefer keyword when possible try: m_val = fn(y_pred=y_pred, **kwargs) except TypeError: m_val = fn(y_pred, **kwargs) else: # Fallback heuristic: try (y_true, y_pred) then (y_pred,) try: m_val = fn(y_true, y_pred, **kwargs) except TypeError: m_val = fn(y_pred, **kwargs) if ( isinstance(m_val, np.ndarray) and m_val.size == 1 ): m_val = float(m_val.reshape(())) dest[m_name] = m_val except Exception as e: vlog( f"[Warn] extra metric '{m_name}' failed: {e}", verbose=verbose, level=2, logger=logger, ) def _find_scaler_block( scaler_info, keys=("targets", "target", "y") ): """Best-effort helper to locate a scaler block inside scaler_info.""" if not isinstance(scaler_info, dict): return None for k in keys: if k in scaler_info: block = scaler_info.get(k) if isinstance(block, dict): return block return None def _inverse_with_stage1( values: np.ndarray, scaler_info: dict | None, *, target_name: str | None, scaler_name: str | None, ) -> np.ndarray: """Inverse-scale values using the stage-1 scaler schema.""" if scaler_info is None: return np.asarray(values) scaler, idx, n_features = _resolve_stage1_entry( scaler_info, target_name=target_name, scaler_name=scaler_name, allow_passthrough=True, ) # If scaler_name points to a non-scaler entry, retry by target_name. if scaler is None and target_name: scaler, idx, n_features = _resolve_stage1_entry( scaler_info, target_name=target_name, scaler_name=None, allow_passthrough=True, ) if scaler is None or not hasattr( scaler, "inverse_transform" ): return np.asarray(values) arr = np.asarray(values, dtype=float) shp = arr.shape flat = arr.reshape(-1) idx = int(idx or 0) n_features = int(n_features or 1) tmp = np.zeros((flat.size, n_features), dtype=np.float32) tmp[:, idx] = flat.astype(np.float32) try: inv = scaler.inverse_transform(tmp)[:, idx] except Exception: return np.asarray(values) return inv.reshape(shp) def _ensure_q_order(qmat: np.ndarray) -> np.ndarray: """Ensure q_low <= q_high by flipping Q-axis if needed.""" qmat = np.asarray(qmat) if qmat.ndim != 2 or qmat.shape[1] < 2: return qmat lo = np.nanmedian(qmat[:, 0]) hi = np.nanmedian(qmat[:, -1]) if np.isfinite(lo) and np.isfinite(hi) and lo > hi: return qmat[:, ::-1] return qmat def _inverse_with_block(values, block, col_name): """Inverse-transform a 1D array using a scaler block. Supports both: - old schema : block["columns"] / block["cols"] - new PINN schema: block["all_features"] + block["idx"] """ if block is None or not isinstance(block, dict): return values scaler = block.get("scaler") if scaler is None or not hasattr( scaler, "inverse_transform" ): return values # ---- Old style: columns/cols present --------------------------------- cols = block.get("columns") or block.get("cols") if cols: if col_name not in cols: return values idx = cols.index(col_name) n_features = len(cols) else: # ---- New PINN style: use all_features + idx ----------------------- idx = block.get("idx") if idx is None: return values all_features = block.get("all_features") if all_features: n_features = len(all_features) else: # Fallback: trust the scaler n_features = getattr( scaler, "n_features_in_", idx + 1 ) values = np.asarray(values).reshape(-1) tmp = np.zeros( (values.shape[0], n_features), dtype=np.float32 ) tmp[:, idx] = values try: inv = scaler.inverse_transform(tmp)[:, idx] except Exception: return values return inv def _auto_output_target_name( target_name: str, output_target_name: str | None, *, suffixes: tuple[str, ...] = ("_cum", "_cumulative"), ) -> str: """ If output_target_name is not provided, derive a clean output prefix. Examples -------- target_name='subsidence_cum' -> 'subsidence' target_name='subsidence' -> 'subsidence' target_name='subsidence_cumulative' -> 'subsidence' """ if ( output_target_name is not None and str(output_target_name).strip() ): return str(output_target_name) name = str(target_name) for sfx in suffixes: if name.endswith(sfx): return name[: -len(sfx)] return name
[docs] def format_and_forecast( y_pred: dict, y_true: dict | None, *, coords: np.ndarray | None = None, quantiles: list[float] | None = None, target_name: str = "subsidence", output_target_name: str | None = None, scaler_target_name: str | None = None, target_key_pred: str = "subs_pred", component_index: int = 0, scaler_info: dict | None = None, coord_scaler: object | None = None, coord_columns: tuple[str, str, str] = ( "coord_t", "coord_x", "coord_y", ), # # Temporal config train_end_time: float | int | str | None = None, forecast_start_time: float | int | str | None = None, forecast_horizon: int | None = None, future_time_grid: np.ndarray | None = None, eval_forecast_step: int | None = None, # eval_export: object = "all", value_mode: str = "rate", input_value_mode: str = "rate", # what MODEL produced (NEW) rate_first: str = "cum_over_dtref", # for cumulative->rate (NEW) absolute_baseline: float | Mapping[int, float] | None = None, # Output / I/O sample_index_offset: int = 0, city_name: str | None = None, model_name: str | None = None, dataset_name: str | None = None, csv_eval_path: str | None = None, csv_future_path: str | None = None, # # Time dtype control time_as_datetime: bool = False, time_format: str | None = None, # calibrations calibration: str | bool = False, calibration_kwargs: Mapping[str, Any] | None = None, calibration_save_stats: str | os.PathLike | None = None, # Evaluation options (metrics, optional) eval_metrics: bool = False, metrics_column_map: Mapping[str, Any] | None = None, metrics_quantile_interval: tuple[float, float] = ( 0.1, 0.9, ), metrics_per_horizon: bool = False, metrics_extra: ( Sequence[str] | Mapping[str, Callable[..., Any]] | None ) = None, metrics_extra_kwargs: ( Mapping[str, Mapping[str, Any]] | None ) = None, metrics_savefile: str | os.PathLike | bool | None = None, metrics_save_format: str = ".json", metrics_time_as_str: bool = True, # --- NEW: unit conversion before export/metrics --- output_unit: str | None = None, output_unit_from: str = "m", output_unit_mode: str = "overwrite", output_unit_suffix: str = "_mm", output_unit_col: str | None = None, # Logging verbose: int = 1, logger: logging.Logger | None = None, **kws, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Format PINN forecasts into evaluation and future DataFrames. This helper takes the raw model outputs (already split into ``y_pred['subs_pred']`` / ``y_pred['gwl_pred']``), the matching ground-truth dictionary (``y_true``), and optional coordinate and scaler information, and returns two DataFrames: - ``df_eval``: predictions + actuals for an evaluation year (typically the last training year, e.g. 2022). - ``df_future``: predictions for the future horizon (e.g. 2023–2025), without actuals. Parameters ---------- y_pred : dict Dictionary of model predictions, as returned by ``GeoPriorSubsNet.predict`` post-processed into ``{'subs_pred': ..., 'gwl_pred': ...}``. For subsidence, the expected shapes are: - Quantile mode: ``(B, H, Q, O)`` where: ``B`` = batch size, ``H`` = horizon steps, ``Q`` = number of quantiles, ``O`` = output dim. - Point mode: ``(B, H, O)``. y_true : dict or None Dictionary of true targets, typically ``{'subsidence': ..., 'gwl': ...}`` or ``{'subs_pred': ..., 'gwl_pred': ...}``. If ``None``, evaluation DataFrame is still created but without the actual-value column. coords : ndarray, optional Optional coordinates array aligned with predictions. Commonly shaped ``(B, H, 3)`` with columns ``[t_scaled, x_scaled, y_scaled]``. Only ``x`` and ``y`` are used when inverse-transforming spatial coordinates; time is overwritten by the provided temporal config if given. quantiles : list of float or None, optional List of quantiles (e.g. ``[0.1, 0.5, 0.9]``) if the model was trained in probabilistic mode. If ``None``, a single prediction column is emitted instead. target_name : str, default='subsidence' Logical target identifier used as the default key for locating the target scaler in ``scaler_info`` and as a fallback for resolving truth arrays in ``y_true``. Column naming is controlled by ``output_target_name`` (or the auto-derived output prefix when it is ``None``). output_target_name : str or None, optional Output prefix used when creating DataFrame columns for predictions and actuals. This controls the *column naming* only (e.g. the function will emit ``f"{output_target_name}_q10"``, ``f"{output_target_name}_pred"``, and ``f"{output_target_name}_actual"``). If ``None`` (default), the function derives the output prefix from ``target_name`` and applies a small convenience rule: if ``target_name`` ends with ``"_cum"`` or ``"_cumulative"``, that suffix is stripped for output naming. This keeps downstream tooling consistent (many plotting and metrics utilities expect names like ``subsidence_q10`` rather than ``subsidence_cum_q10``), while still allowing the scaler lookup to use the true target key. For example, with ``target_name="subsidence_cum"`` and ``output_target_name=None``, output columns become ``subsidence_q10``, ``subsidence_q50``, and ``subsidence_actual``. If ``output_target_name="subsidence_cum"``, the output columns keep the suffix such as ``subsidence_cum_q10``. scaler_target_name : str or None, optional Name used to locate the target scaling block inside ``scaler_info`` and to perform inverse-transform for predictions and actuals. This controls the *scaler key and inverse scaling*, not the output column naming. If ``None`` (default), the scaler key is assumed to be ``target_name``. This is important when you want clean output columns but the scaler was fitted/stored under the original target name. A common pattern is to keep ``target_name="subsidence_cum"`` so the scaler lookup matches the Stage-1 schema, while letting ``output_target_name=None`` produce clean output columns. In that setup, inverse transform still uses the ``subsidence_cum`` scaler key, while output columns use the ``subsidence_`` prefix because of the auto-strip rule. target_key_pred : str, default='subs_pred' Key inside ``y_pred`` that holds the subsidence forecasts. component_index : int, default=0 Index along the output dimension O to use when ``output_subsidence_dim > 1``. For scalar subsidence this is 0. scaler_info : dict, optional Optional Stage-1 ``scaler_info`` mapping containing a target scaler under keys such as ``'targets'`` or ``'target'``. The target block is expected to provide an sklearn-like transformer under ``'scaler'`` together with column names under ``'columns'`` or ``'cols'``. If present and consistent, subsidence values (predicted and actual) are inverse-transformed for ``target_name``. coord_scaler : object, optional Optional scaler used for coordinates. If provided, it is only used to inverse-transform ``coord_x`` and ``coord_y`` when ``coords`` is given and ``coord_columns`` can be matched. Time is *not* taken from the inverse transform; it is controlled by the temporal config. coord_columns : tuple of str, default=('coord_t','coord_x','coord_y') Logical names of the time, x, and y coordinate columns. These are used for DataFrame column naming and for mapping into ``coord_scaler`` if its block carries column names. train_end_time : scalar or str or datetime, optional Physical time associated with the evaluation year (e.g. 2022). If ``eval_forecast_step`` is not given, the last horizon step is assumed to correspond to this time. forecast_start_time : scalar or str or datetime, optional First time in the future forecast horizon (e.g. 2023). forecast_horizon : int, optional Number of forecast steps in the future horizon (e.g. 3). If ``future_time_grid`` is not given, this is used together with ``forecast_start_time`` to build a regular grid. future_time_grid : array-like, optional Explicit physical times for each forecast step, length H. For yearly data this might be ``[2023, 2024, 2025]``. If provided, it overrides any automatic construction from ``forecast_start_time`` and ``forecast_horizon``. eval_forecast_step : int or None, optional Horizon step index (1-based) to use for evaluation. If ``None``, defaults to the last horizon step H. eval_export : {"all", "last"} or str or int or sequence, optional Controls which evaluation rows are exported in ``df_eval`` and written to ``csv_eval_path``. By default (``"all"``), the function exports the multi-horizon evaluation DataFrame (``df_eval_all``), which contains one row per sample and forecast step (e.g. years 2020, 2021, 2022 for ``H=3``). Accepted values are: * ``"all"`` or ``"full"`` or ``"horizons"`` : export all horizons from ``df_eval_all``. * ``"last"`` or ``"single"`` or ``"default"`` : export only the single evaluation step specified by ``eval_forecast_step`` (backwards-compatible behaviour). * Other ``str`` (e.g. ``"2022"``) : interpreted as a time value for ``coord_t``; only rows of ``df_eval_all`` whose time column matches this value are exported. * ``int`` or scalar non-string : interpreted as a single time value (e.g. ``2022``). * sequence of values (e.g. ``[2021, 2022]``) : interpreted as a set of time values; only rows whose ``coord_t`` belongs to this set are exported. If ``time_as_datetime=True``, the selection values are converted with ``pandas.to_datetime`` using ``time_format`` before filtering. If ``df_eval_all`` is not available (e.g. no ground truth was provided), the function falls back to exporting the single-step ``df_eval`` regardless of ``eval_export``. value_mode : {"rate", "cumulative", "absolute_cumulative"}, optional Controls how forecast values are interpreted along the temporal horizon for each sample. The default is ``"rate"``, which treats each forecast step as an incremental rate (e.g. annual subsidence rate) and leaves predictions unchanged. Supported modes are: * ``"rate"`` : keep per-step predictions as provided by the model (current behaviour). * ``"cumulative"`` or ``"cum"`` : convert per-step rates into *relative cumulative* values by applying a cumulative sum over ``forecast_step`` for each ``sample_idx``. For example, for years 2023–2025, the value at 2024 is the sum of the 2023 and 2024 rates. * ``"absolute_cumulative"`` or ``"abs_cum"`` or ``"absolute"`` : same as ``"cumulative"``, then add an absolute baseline provided by ``absolute_baseline`` (e.g. cumulative subsidence at the end of the training period), yielding absolute cumulative trajectories. Cumulative transforms are applied consistently to: * the future forecast DataFrame (``df_future``), * the multi-horizon evaluation DataFrame (``df_eval_all``), * and the single-step evaluation DataFrame (``df_eval``, which is regenerated from ``df_eval_all`` after the transformation). When an unsupported string is given, the function logs a warning and falls back to ``"rate"``. absolute_baseline : float or Mapping[int, float], optional Baseline value to use when ``value_mode`` requests absolute cumulative outputs (``"absolute_cumulative"``, ``"abs_cum"``, ``"absolute"``). This baseline is interpreted as the pre-forecast cumulative level for each sample, for example, cumulative subsidence at ``train_end_time`` (e.g. end of 2022), and is added *after* applying the cumulative sum over the forecast horizon. If a scalar ``float`` is provided, the same baseline value is added to all samples. If a mapping is provided, it must map ``sample_idx`` (integers) to baseline values, allowing per-sample baselines: * ``absolute_baseline = {sample_idx: baseline_value, ...}`` Only prediction columns for ``target_name`` are shifted (e.g. ``"subsidence_q10"``, ``"subsidence_q50"``, ``"subsidence_q90"`` or ``"subsidence_pred"``). When ``df_eval_all`` is present, the corresponding ``"<target_name>_actual"`` column is shifted as well, so evaluation metrics operate on absolute cumulative values. If ``value_mode`` is an absolute cumulative variant but ``absolute_baseline`` is ``None``, the function logs a warning and degrades gracefully to relative cumulative mode (i.e. no baseline shift is applied). sample_index_offset : int, default=0 Offset added to ``sample_idx`` (useful when concatenating multiple tiles). city_name, model_name, dataset_name : str, optional Optional metadata used only for logging. csv_eval_path : str, optional If provided, ``df_eval`` is written to this path (directories are created if needed). csv_future_path : str, optional If provided, ``df_future`` is written to this path. time_as_datetime : bool, default=False If ``True``, time values are converted using :func:`pandas.to_datetime` with the provided ``time_format`` (if any). time_format : str or None, optional Optional format string passed to :func:`pandas.to_datetime` when ``time_as_datetime=True``. eval_metrics : bool, default=False If ``True``, automatically call :func:`evaluate_forecast` on the resulting ``df_eval`` to compute diagnostics. Metrics are not returned by this function; they are either written to disk (if ``metrics_savefile`` is provided) or discarded. For programmatic access to the metrics dictionary, call :func:`evaluate_forecast` directly. metrics_column_map : mapping, optional Optional column mapping forwarded to :func:`evaluate_forecast` (see its documentation for details). If ``None``, default column names such as ``'coord_t'``, ``'forecast_step'``, ``f'{target_name}_q10'``, and ``f'{target_name}_actual'`` are assumed. metrics_quantile_interval : tuple of float, default=(0.1, 0.9) Interval used for coverage and sharpness diagnostics in quantile mode, forwarded to :func:`evaluate_forecast`. metrics_per_horizon : bool, default=False If ``True``, per-horizon MAE/MSE/R² are computed by :func:`evaluate_forecast` and included in the diagnostics. metrics_extra : sequence or mapping, optional Optional additional metrics to compute, forwarded to :func:`evaluate_forecast`. Can be: - A sequence of metric names (resolved via ``geoprior.metrics._registry.get_metric``). - A mapping ``{name: func}`` where ``func`` is a callable taking ``(y_true, y_pred, **kwargs)``. metrics_extra_kwargs : mapping, optional Optional per-metric keyword arguments, forwarded to :func:`evaluate_forecast`. Keys must match metric names in ``metrics_extra``. metrics_savefile : str, path-like, bool, or None If truthy, diagnostics from :func:`evaluate_forecast` are written to disk. Behavior matches the ``savefile`` argument of :func:`evaluate_forecast`. When ``True``, a filename is auto-generated near the evaluation CSV (if any) or in the current working directory. metrics_save_format : {'.json', 'json', '.csv', 'csv'}, default='.json' Output format for diagnostics written by :func:`evaluate_forecast`. JSON preserves the nested metric structure; CSV flattens it into a tall table. metrics_time_as_str : bool, default=True If ``True``, time keys in the diagnostics written by :func:`evaluate_forecast` are converted to strings (useful for JSON serialization). verbose : int, default=1 Verbosity level passed to :func:`vlog`. logger : logging.Logger, optional Logger instance; if ``None``, a module-level ``LOG`` is used. Returns ------- df_eval_to_write : pandas.DataFrame DataFrame containing predictions and actuals for the evaluation time. Columns include: - ``'sample_idx'`` - ``'forecast_step'`` - quantile columns (e.g. ``subsidence_q10``) or ``subsidence_pred`` - ``'subsidence_actual'`` (if y_true given) - ``coord_t``, ``coord_x``, ``coord_y`` (names from ``coord_columns``). df_future : pandas.DataFrame DataFrame containing predictions for the future horizon, without actuals. Same structure as ``df_eval`` but without the actual-value column. Notes ----- This function separates *scaler lookup* (``scaler_target_name``) from *output column naming* (``output_target_name``). This is useful when the stored scaler key contains suffixes like ``"_cum"`` but downstream tools expect canonical names such as columns prefixed with ``subsidence_``. """ t_col, x_col, y_col = coord_columns scale_name = scaler_target_name or target_name out_name = _auto_output_target_name( target_name, output_target_name ) vlog( "[format_and_forecast] Starting formatting of predictions " f"for city={city_name}, model={model_name}, " f"dataset={dataset_name}", verbose=verbose, level=1, logger=logger, ) if target_key_pred not in y_pred: raise KeyError( f"y_pred must contain key '{target_key_pred}'. " f"Got keys={list(y_pred.keys())}" ) subs_pred = np.asarray(y_pred[target_key_pred]) if subs_pred.ndim not in (3, 4): raise ValueError( f"{target_key_pred} must have ndim 3 or 4. " f"Got shape={subs_pred.shape!r}" ) B = subs_pred.shape[0] H = subs_pred.shape[1] # Choose evaluation horizon step (1-based → 0-based index) if eval_forecast_step is None or not ( 1 <= eval_forecast_step <= H ): eval_forecast_step = H # last step by default s_idx = eval_forecast_step - 1 vlog( f"[format_and_forecast] B={B}, H={H}, eval_step={eval_forecast_step}", verbose=verbose, level=2, logger=logger, ) # ------------------------------------------------------------------ # Split predictions into eval (one horizon step) and future (all H) # ------------------------------------------------------------------ if quantiles is not None: # Expected shape: (B, H, Q, O) if subs_pred.ndim != 4: raise ValueError( "Quantile mode requires subs_pred.ndim == 4. " f"Got shape={subs_pred.shape!r}" ) Q = subs_pred.shape[2] if len(quantiles) != Q: vlog( "[format_and_forecast] Warning: len(quantiles) does not " f"match Q={Q}; using first {min(len(quantiles), Q)}.", verbose=verbose, level=1, logger=logger, ) Q = min(len(quantiles), Q) # (B, Q) eval, (B, H, Q) future subs_eval_q = subs_pred[:, s_idx, :Q, component_index] subs_future_q = subs_pred[:, :, :Q, component_index] else: # Point forecasts: expected shape (B, H, O) if subs_pred.ndim != 3: raise ValueError( "Point mode requires subs_pred.ndim == 3. " f"Got shape={subs_pred.shape!r}" ) subs_eval_q = subs_pred[ :, s_idx, component_index ] # (B,) subs_future_q = subs_pred[ :, :, component_index ] # (B, H) # ------------------------------------------------------------------ # Ground truth for evaluation (last step + all horizons) # ------------------------------------------------------------------ subs_true_eval = None # (B,) subs_true_all_flat = None # (B*H,) true_arr = None true_key_candidates = ( out_name, scale_name, target_key_pred, target_name, ) true_arr = None if y_true is not None: for k in true_key_candidates: if k in y_true: true_arr = np.asarray(y_true[k]) break if true_arr is not None: if true_arr.ndim != 3: vlog( "[format_and_forecast] y_true has unexpected ndim; " "skipping actual inverse transform.", verbose=verbose, level=1, logger=logger, ) else: # (B, H, O) → pick component_index subs_true_eval = true_arr[ :, s_idx, component_index ] # (B,) subs_true_all_flat = true_arr[ :, :, component_index ].reshape(-1) # (B*H,) # ------------------------------------------------------------------ # Coords (only used for x/y; time will come from temporal config) # ------------------------------------------------------------------ coord_x_eval = coord_y_eval = None coord_x_future = coord_y_future = None if coords is not None: coords = np.asarray(coords) if coords.ndim == 3 and coords.shape[-1] >= 3: B_coords, H_coords, D_coords = coords.shape # Optional sanity check: coords horizon should match predictions if H_coords != H: vlog( "[format_and_forecast] Warning: coords.shape[1] " f"({H_coords}) != H ({H}); continuing but shapes " "may be misaligned.", verbose=verbose, level=1, logger=logger, ) # ---- NEW: inverse-transform coords if we have a coord_scaler ---- if coord_scaler is not None and hasattr( coord_scaler, "inverse_transform" ): try: flat = coords.reshape( -1, D_coords ) # (B*H, D) flat_inv = coord_scaler.inverse_transform( flat ) coords = flat_inv.reshape( B_coords, H_coords, D_coords ) except Exception: vlog( "[format_and_forecast] coord_scaler inverse_transform " "failed; falling back to scaled coords.", verbose=verbose, level=1, logger=logger, ) # Now coords are in physical units; extract x/y for eval + future coord_x_eval = coords[:, s_idx, 1] coord_y_eval = coords[:, s_idx, 2] coord_x_future = coords[:, :, 1].reshape(-1) coord_y_future = coords[:, :, 2].reshape(-1) else: vlog( "[format_and_forecast] coords has unexpected shape; " "spatial coords will not be included.", verbose=verbose, level=1, logger=logger, ) # ------------------------------------------------------------------ # Inverse-scale subsidence (pred + actual) using Stage-1 resolver # (schema-agnostic; avoids relying on block["scaler"]/["cols"]). # ------------------------------------------------------------------ if quantiles is not None: subs_eval_q = _ensure_q_order(subs_eval_q) subs_eval_q = _inverse_with_stage1( subs_eval_q, scaler_info, target_name=scale_name, scaler_name=scaler_target_name, ) if subs_true_eval is not None: subs_true_eval = _inverse_with_stage1( subs_true_eval, scaler_info, target_name=scale_name, scaler_name=scaler_target_name, ) if subs_true_all_flat is not None: subs_true_all_flat = _inverse_with_stage1( subs_true_all_flat, scaler_info, target_name=scale_name, scaler_name=scaler_target_name, ) # Future if quantiles is not None: Q = int(subs_future_q.shape[2]) flat = subs_future_q.reshape(-1, Q) flat = _ensure_q_order(flat) flat = _inverse_with_stage1( flat, scaler_info, target_name=scale_name, scaler_name=scaler_target_name, ) subs_future_flat = _ensure_q_order(flat) else: subs_future_flat = _inverse_with_stage1( subs_future_q.reshape(-1), scaler_info, target_name=scale_name, scaler_name=scaler_target_name, ).reshape(-1) # ------------------------------------------------------------------ # Build temporal grid for future horizon (if not given) # ------------------------------------------------------------------ if ( future_time_grid is None and forecast_start_time is not None ): if forecast_horizon is None: forecast_horizon = H # For generality, we don't assume "years"; we simply treat # forecast_start_time as the value for step=1 and increment # by 1 for each subsequent step. If the user needs monthly # or arbitrary spacing, they can pass an explicit # ``future_time_grid``. base = forecast_start_time try: # Try numeric base_val = float(base) future_time_grid = np.array( [ base_val + i for i in range(forecast_horizon) ], dtype=float, ) except Exception: # Leave as raw objects (e.g., strings or datetimes) future_time_grid = np.array( [base for _ in range(forecast_horizon)], dtype=object, ) if future_time_grid is None: # Fallback to a simple range [1..H] future_time_grid = np.arange(1, H + 1) future_time_grid = np.asarray(future_time_grid) if future_time_grid.shape[0] != H: raise ValueError( "future_time_grid must have length H. " f"Got len={future_time_grid.shape[0]}, H={H}" ) # ------------------------------------------------------------------ # Convert times to datetime if requested # ------------------------------------------------------------------ def _to_time(values): if not time_as_datetime: return values return pd.to_datetime( values, format=time_format, errors="coerce" ) # ------------------------------------------------------------------ # Build evaluation time grid for ALL horizons first (length H) # ------------------------------------------------------------------ if train_end_time is not None: # Numeric arithmetic: end-(H-1), ..., end try: end_val = float(train_end_time) eval_time_grid_full = np.array( [end_val - (H - 1) + i for i in range(H)], dtype=float, ) except Exception: # Fallback: same label repeated eval_time_grid_full = np.array( [train_end_time for _ in range(H)], dtype=object, ) else: # If no train_end_time provided, use future grid if it matches H, # otherwise fall back to 1..H. if ( future_time_grid is not None and len(future_time_grid) == H ): eval_time_grid_full = np.asarray(future_time_grid) else: eval_time_grid_full = np.arange(1, H + 1) # ------------------------------------------------------------------ # eval time must correspond to the chosen eval_forecast_step (s_idx) # ------------------------------------------------------------------ eval_time_value = eval_time_grid_full[s_idx] eval_time_series = _to_time( np.full(B, eval_time_value, dtype=object) ) # Future times: repeat grid for each sample future_time_series = _to_time( np.tile(np.asarray(future_time_grid), B) ) # Repeat per sample for (B*H) eval_all_time_series = _to_time( np.tile(eval_time_grid_full, B) ) # ------------------------------------------------------------------ # Build DataFrames # ------------------------------------------------------------------ # Evaluation DataFrame (one row per sample) sample_idx_eval = np.arange( sample_index_offset, sample_index_offset + B ).astype(int) forecast_step_eval = np.full( B, eval_forecast_step, dtype=int ) df_eval = pd.DataFrame( { "sample_idx": sample_idx_eval, "forecast_step": forecast_step_eval, } ) if quantiles is not None: for j, q in enumerate( quantiles[: subs_eval_q.shape[1]] ): q_name = int(round(q * 100)) df_eval[f"{out_name}_q{q_name:d}"] = subs_eval_q[ :, j ] else: df_eval[f"{out_name}_pred"] = subs_eval_q if subs_true_eval is not None: df_eval[f"{out_name}_actual"] = subs_true_eval df_eval[t_col] = eval_time_series if coord_x_eval is not None: df_eval[x_col] = coord_x_eval if coord_y_eval is not None: df_eval[y_col] = coord_y_eval # Future DataFrame (one row per sample × horizon) N = B * H sample_idx_future = np.repeat( np.arange( sample_index_offset, sample_index_offset + B ), H, ).astype(int) forecast_step_future = np.tile( np.arange(1, H + 1), B ).astype(int) df_future = pd.DataFrame( { "sample_idx": sample_idx_future, "forecast_step": forecast_step_future, } ) if quantiles is not None: for j, q in enumerate( quantiles[: subs_future_flat.shape[1]] ): q_name = int(round(q * 100)) df_future[f"{out_name}_q{q_name:d}"] = ( subs_future_flat[:, j] ) else: df_future[f"{out_name}_pred"] = subs_future_flat df_future[t_col] = future_time_series if ( coord_x_future is not None and coord_x_future.shape[0] == N ): df_future[x_col] = coord_x_future if ( coord_y_future is not None and coord_y_future.shape[0] == N ): df_future[y_col] = coord_y_future # ------------------------------------------------------------------ # Build "all horizons" evaluation DF (for per-year metrics) # ------------------------------------------------------------------ df_eval_all = None if subs_true_all_flat is not None: # Base index columns: same structure as df_future df_eval_all = pd.DataFrame( { "sample_idx": sample_idx_future, # length B*H "forecast_step": forecast_step_future, t_col: eval_all_time_series, # 2020,2021,2022 f"{out_name}_actual": subs_true_all_flat, } ) # Add predictions if quantiles is not None: for j, q in enumerate( quantiles[: subs_future_flat.shape[1]] ): q_name = int(round(q * 100)) df_eval_all[f"{out_name}_q{q_name:d}"] = ( subs_future_flat[:, j] ) else: df_eval_all[f"{out_name}_pred"] = subs_future_flat # Add coords if available if ( coord_x_future is not None and coord_x_future.shape[0] == N ): df_eval_all[x_col] = coord_x_future if ( coord_y_future is not None and coord_y_future.shape[0] == N ): df_eval_all[y_col] = coord_y_future # ------------------------------------------------------------------ # Optional: transform values between (rate <-> cumulative) # ------------------------------------------------------------------ out_mode = (value_mode or "rate").lower() in_mode = (input_value_mode or "rate").lower() # prediction columns to transform pred_cols = [ c for c in df_future.columns if c.startswith(f"{out_name}_q") or c == f"{out_name}_pred" ] # normalize mode aliases CUM_MODES = { "cum", "cumulative", "relative_cumulative", "absolute_cumulative", "abs_cum", "absolute", } RATE_MODES = {"rate", "increment", "diff", "delta"} if out_mode not in (CUM_MODES | RATE_MODES): vlog( f"[format_and_forecast] Unknown value_mode={value_mode!r}; " "falling back to 'rate'.", verbose=verbose, level=1, logger=logger, ) out_mode = "rate" if in_mode not in (CUM_MODES | RATE_MODES): vlog( f"[format_and_forecast] Unknown input_value_mode={input_value_mode!r}; " "falling back to 'rate'.", verbose=verbose, level=1, logger=logger, ) in_mode = "rate" def _rate_by_sample( df: pd.DataFrame, cols: list[str], *, include_actual: bool, time_col: str, group_col: str = "sample_idx", first: str = "cum_over_dtref", ) -> pd.DataFrame: out = df.copy() use_cols = list(cols) if include_actual: act = f"{out_name}_actual" if act in out.columns: use_cols.append(act) # group per sample (each sample has H rows) for _, gidx in out.groupby( group_col, sort=False ).groups.items(): g = out.loc[gidx].sort_values("forecast_step") t = pd.to_numeric( g[time_col], errors="coerce" ).to_numpy(float) if t.size <= 1: continue dt = np.diff(t) good = np.isfinite(dt) & (dt != 0) dt_ref = ( float(np.median(dt[good])) if np.any(good) else 1.0 ) if not np.isfinite(dt_ref) or dt_ref == 0: dt_ref = 1.0 dt_i = np.diff(t) dt_i = np.where( (~np.isfinite(dt_i)) | (dt_i == 0), dt_ref, dt_i, ) for ccol in use_cols: c = pd.to_numeric( g[ccol], errors="coerce" ).to_numpy(float) r = np.zeros_like(c, dtype=float) if first == "nan": r[0] = np.nan elif first == "cum_over_dtref": r[0] = c[0] / dt_ref else: raise ValueError( "rate_first must be 'nan' or 'cum_over_dtref'." ) r[1:] = np.diff(c) / dt_i out.loc[g.index, ccol] = r return out # --- transform path ------------------------------------------------- want_cum = out_mode in CUM_MODES have_cum = in_mode in CUM_MODES if want_cum and not have_cum: # rate -> cumulative (legacy behavior) df_future = _cum_by_sample( df_future, pred_cols, include_actual=False, target_name=target_name, verbose=verbose, ) if df_eval_all is not None: df_eval_all = _cum_by_sample( df_eval_all, pred_cols, include_actual=True, target_name=target_name, verbose=verbose, ) df_eval = df_eval_all[ df_eval_all["forecast_step"] == eval_forecast_step ].copy() elif (not want_cum) and have_cum: # cumulative -> rate (NEW) df_future = _rate_by_sample( df_future, pred_cols, include_actual=False, time_col=t_col, first=rate_first, ) if df_eval_all is not None: df_eval_all = _rate_by_sample( df_eval_all, pred_cols, include_actual=True, time_col=t_col, first=rate_first, ) df_eval = df_eval_all[ df_eval_all["forecast_step"] == eval_forecast_step ].copy() # absolute baseline shift ONLY makes sense when output is cumulative if want_cum and out_mode in ( "absolute_cumulative", "abs_cum", "absolute", ): if absolute_baseline is None: vlog( "[format_and_forecast] absolute cumulative requested but " "absolute_baseline is None; keeping relative cumulative.", verbose=verbose, level=1, logger=logger, ) else: df_future = _add_baseline( df_future, pred_cols, include_actual=False, target_name=out_name, absolute_baseline=absolute_baseline, ) if df_eval_all is not None: df_eval_all = _add_baseline( df_eval_all, pred_cols, include_actual=True, target_name=out_name, absolute_baseline=absolute_baseline, ) df_eval = df_eval_all[ df_eval_all["forecast_step"] == eval_forecast_step ].copy() # ------------------------------------------------------------------ # Drop actuals from future DF and save CSVs if requested # ------------------------------------------------------------------ actual_col = f"{out_name}_actual" if actual_col in df_future.columns: df_future = df_future.drop(columns=[actual_col]) if output_unit is not None: df_future = convert_target_units_df( df_future, base=out_name, from_unit=output_unit_from, to_unit=output_unit, mode=output_unit_mode, suffix=output_unit_suffix, unit_col=output_unit_col, copy_df=False, ) if df_eval_all is not None: df_eval_all = convert_target_units_df( df_eval_all, base=out_name, from_unit=output_unit_from, to_unit=output_unit, mode=output_unit_mode, suffix=output_unit_suffix, unit_col=output_unit_col, copy_df=False, ) df_eval = df_eval_all[ df_eval_all["forecast_step"] == eval_forecast_step ].copy() else: df_eval = convert_target_units_df( df_eval, base=out_name, from_unit=output_unit_from, to_unit=output_unit, mode=output_unit_mode, suffix=output_unit_suffix, unit_col=output_unit_col, copy_df=False, ) cal_do = calibration if isinstance(cal_do, str): cal_do = cal_do.strip().lower() cal_stats: dict[str, Any] = {} if ( cal_do not in (False, "false", "0", "no", "off") and quantiles is not None and df_eval_all is not None ): ckw = dict(calibration_kwargs or {}) ckw.setdefault("target_name", out_name) ckw.setdefault("step_col", "forecast_step") ckw.setdefault("interval", metrics_quantile_interval) ckw.setdefault("use", cal_do) ckw.setdefault("verbose", verbose) ckw.setdefault("logger", logger) if calibration_save_stats is not None: ckw.setdefault( "save_stats", calibration_save_stats, ) df_eval_cal, df_future_cal, cal_stats = ( calibrate_quantile_forecasts( df_eval=df_eval_all, df_future=df_future, **ckw, ) ) if df_eval_cal is not None: df_eval_all = df_eval_cal df_eval = df_eval_all[ df_eval_all["forecast_step"] == eval_forecast_step ].copy() if df_future_cal is not None: df_future = df_future_cal # Decide which evaluation DataFrame to export df_eval_to_write = df_eval # fallback if eval_export is not None and df_eval_all is not None: # Case 1: string control ("all", "last", "2022", ...) if isinstance(eval_export, str): key = eval_export.lower() if key in ("all", "full", "horizons"): # Use the full multi-horizon DF (2020,2021,2022,...) df_eval_to_write = df_eval_all elif key in ("last", "single", "default"): # Old behaviour: single eval_forecast_step only df_eval_to_write = df_eval else: vals = [eval_export] if time_as_datetime: vals = pd.to_datetime( vals, format=time_format, errors="coerce", ) df_eval_to_write = df_eval_all[ df_eval_all[t_col].isin(vals) ] else: # Non-string: scalar or sequence of time values (2021, [2021, 2022], ...) if isinstance(eval_export, Sequence): vals = list(eval_export) else: vals = [eval_export] if time_as_datetime: vals = pd.to_datetime( vals, format=time_format, errors="coerce", ) df_eval_to_write = df_eval_all[ df_eval_all[t_col].isin(vals) ] if csv_eval_path: ensure_directory_exists( os.path.dirname(csv_eval_path) ) df_eval_to_write.to_csv(csv_eval_path, index=False) vlog( f"[format_and_forecast] df_eval written to {csv_eval_path} " f"(rows={len(df_eval_to_write)})", verbose=verbose, level=1, logger=logger, ) if csv_future_path: ensure_directory_exists( os.path.dirname(csv_future_path) ) df_future.to_csv(csv_future_path, index=False) vlog( f"[format_and_forecast] df_future written to " f"{csv_future_path}", verbose=verbose, level=1, logger=logger, ) # ------------------------------------------------------------------ # Optional metrics evaluation (using df_eval) # ------------------------------------------------------------------ # df_for_metrics = df_eval_all if df_eval_all is not None else df_eval df_for_metrics = df_eval_to_write if df_for_metrics is None or df_for_metrics.empty: df_for_metrics = ( df_eval_all if df_eval_all is not None else df_eval ) if eval_metrics: # Prefer full-horizon eval DF (per-year metrics), # fall back to single-step df_eval if needed. vlog( "[format_and_forecast] Running evaluate_forecast on " f"{'df_eval_all' if df_eval_all is not None else 'df_eval'}.", verbose=verbose, level=1, logger=logger, ) # We pass df_eval directly; if metrics_savefile is True # without a path, evaluate_forecast will auto-generate # a filename in the current working directory. If you # want it near csv_eval_path, pass an explicit path. metrics = evaluate_forecast( df_for_metrics, target_name=out_name, column_map=metrics_column_map, quantile_interval=metrics_quantile_interval, per_horizon=metrics_per_horizon, extra_metrics=metrics_extra, extra_metric_kwargs=metrics_extra_kwargs, savefile=metrics_savefile, save_format=metrics_save_format, time_as_str=metrics_time_as_str, verbose=verbose, logger=logger, ) overall = metrics.get("__overall__") if overall is not None: vlog( "Overall (all horizons): " f"MAE={overall.get('overall_mae'):.4f}, " f"RMSE={overall.get('overall_rmse'):.4f}, " f"R2={overall.get('overall_r2'):.4f}", verbose=verbose, ) vlog( "[format_and_forecast] Done.", verbose=verbose, level=1, logger=logger, ) return df_eval_to_write, df_future
# Helper to cumulative-sum per sample over forecast_step def _cum_by_sample( df: pd.DataFrame, cols: list[str], include_actual: bool = False, target_name="subsidence", verbose: int = 2, ) -> pd.DataFrame: if df is None or df.empty: return df if not {"sample_idx", "forecast_step"}.issubset( df.columns ): vlog( "[format_and_forecast] Cannot apply cumulative mode: " "missing 'sample_idx' or 'forecast_step'.", verbose=verbose, level=1, logger=logger, ) return df df_sorted = df.sort_values( ["sample_idx", "forecast_step"], kind="mergesort" ).copy() g = df_sorted.groupby("sample_idx", sort=False) df_sorted[cols] = g[cols].cumsum() if ( include_actual and f"{target_name}_actual" in df_sorted.columns ): df_sorted[f"{target_name}_actual"] = g[ f"{target_name}_actual" ].cumsum() return df_sorted # Helper to add an absolute baseline per sample def _add_baseline( df: pd.DataFrame, cols: list[str], target_name: str, absolute_baseline: Mapping, include_actual: bool = False, ) -> pd.DataFrame: if df is None or df.empty: return df if absolute_baseline is None: return df if isinstance(absolute_baseline, Mapping): base_series = df["sample_idx"].map(absolute_baseline) else: # Scalar baseline base_series = pd.Series( float(absolute_baseline), index=df.index, dtype=float, ) base_series = base_series.fillna(0.0).astype(float) for c in cols: if c in df.columns: df[c] = ( df[c].astype(float) + base_series.to_numpy() ) if include_actual: a_col = f"{target_name}_actual" if a_col in df.columns: df[a_col] = ( df[a_col].astype(float) + base_series.to_numpy() ) return df
[docs] @check_empty(["models_data"]) def plot_reliability_diagram( models_data: dict, y_true: pd.Series = None, prefix: str = "subsidence", figsize: tuple[int, int] = (8, 8), title: str = "Reliability Diagram", plot_style: str = "seaborn-whitegrid", verbose: int | None = None, _logger=None, ): # Initialize logging vlog( "Starting reliability diagram plot...", verbose, level=3, logger=_logger, ) # Apply plotting style and set figure size style.use(plot_style) plt.figure(figsize=figsize) ax = plt.gca() # Plot perfect calibration baseline ax.plot( [0, 1], [0, 1], "k--", label="Perfect Calibration", lw=2.5, zorder=1, ) # Default colors and markers colors = [ "#E41A1C", "#377EB8", "#4DAF4A", "#984EA3", "#FF7F00", ] markers = ["o", "s", "^", "D", "P"] # Wrap simple inputs into nested structure first = next(iter(models_data.values())) if isinstance(first, pd.DataFrame): models_data = { name: {"forecasts": df} for name, df in models_data.items() } # Define probability intervals intervals = { 0.8: (10, 90), # 80% PI uses q10 and q90 0.6: (20, 80), # 60% PI uses q20 and q80 0.4: (30, 70), # 40% PI uses q30 and q70 0.2: (40, 60), # 20% PI uses q40 and q60 } # Loop through each model for i, (model_name, data) in enumerate( models_data.items() ): vlog( f"Processing model: {model_name}", verbose, level=3, logger=_logger, ) # Allow a mixed mapping where one entry is a DataFrame if isinstance(data, pd.DataFrame): data = {"forecasts": data} # Resolve plotting style for both branches style_line = data.get("style", "-") marker = data.get( "marker", markers[i % len(markers)], ) color = data.get( "color", colors[i % len(colors)], ) # Use precomputed points if available if ( "observed_probs" in data and "nominal_probs" in data ): nominal = list(data["nominal_probs"]) observed = list(data["observed_probs"]) if len(nominal) != len(observed): raise ValueError( "nominal_probs and observed_probs " "must have the same length." ) else: # Require forecasts key if "forecasts" not in data: vlog( f"Skip {model_name}: missing forecasts", verbose, level=2, logger=_logger, ) continue forecasts = data["forecasts"] # Compute reliability points nominal = sorted( intervals.keys(), reverse=True, ) observed = [] for prob in nominal: low_q, up_q = intervals[prob] low_c = f"{prefix}_q{low_q}" up_c = f"{prefix}_q{up_q}" if ( low_c not in forecasts.columns or up_c not in forecasts.columns ): continue covered = (y_true >= forecasts[low_c]) & ( y_true <= forecasts[up_c] ) observed.append(float(np.mean(covered))) # Trim nominal to match observed length nominal = nominal[: len(observed)] # Plot the curve if nominal: ax.plot( nominal, observed, marker=marker, linestyle=style_line, color=color, label=model_name, lw=2, markersize=7, zorder=2, ) # Finalize labels and layout ax.set_xlabel( "Forecast Probability (Nominal)", fontsize=14, ) ax.set_ylabel( "Observed Frequency (Empirical)", fontsize=14, ) ax.set_title( title, fontsize=18, fontweight="bold", ) ax.legend(fontsize=12, loc="upper left") ax.grid( True, which="both", linestyle=":", linewidth=0.7, ) ax.set_aspect("equal", adjustable="box") ax.set_xlim(0, 1) ax.set_ylim(0, 1) plt.tight_layout() plt.show()
plot_reliability_diagram.__doc__ = r""" Plot a reliability diagram for one or multiple models. Parameters ---------- models_data : dict Mapping of model names to forecast data. Each value can be a pandas.DataFrame or a nested dict with keys 'forecasts', 'color', 'marker', and 'style'. y_true : pandas.Series, optional Observed values for empirical coverage calculations. Required when forecasts need processing. prefix : str, default 'subsidence' Column prefix for quantile forecast fields. figsize : tuple of int, default (8, 8) Figure size (width, height) in inches. title : str, default 'Reliability Diagram' Text title displayed at the top of the plot. plot_style : str, default 'seaborn-whitegrid' Matplotlib style sheet name to apply. verbose : int, optional Verbosity level passed to geoprior.utils.generic_utils.vlog. _logger : Logger or callable, optional Function or logger instance for internal messages. Returns ------- None Displays the calibration plot and returns nothing. Notes ----- This function draws a diagonal baseline (perfect calibration) and computes empirical coverage for probabilistic intervals using specified quantiles. It wraps simple DataFrame inputs into the required nested format and uses `vlog` for conditional logging. Examples -------- >>> import pandas as pd >>> import numpy as np >>> from geoprior.utils.forecast_utils import plot_reliability_diagram >>> # Create dummy true time series >>> dates = pd.date_range('2020-01-01', periods=100) >>> y_true = pd.Series( ... np.random.randn(100), index=dates ... ) >>> # Create forecasts for ModelA >>> dfA = pd.DataFrame({ ... 'subsidence_q10': y_true - 0.5, ... 'subsidence_q90': y_true + 0.5 ... }, index=dates) >>> # Simple usage with one model >>> plot_reliability_diagram( ... models_data={'ModelA': dfA}, ... y_true=y_true, ... verbose=2 ... ) >>> # Create forecasts for ModelB ... # with custom styling >>> dfB = pd.DataFrame({ ... 'subsidence_q10': y_true - 1.0, ... 'subsidence_q90': y_true + 1.0 ... }, index=dates) >>> custom_logger = print >>> # Custom styling and logger >>> plot_reliability_diagram( ... models_data={ ... 'ModelA': { ... 'forecasts': dfA, ... 'color': 'C0', ... 'marker': 'x' ... }, ... 'ModelB': { ... 'forecasts': dfB, ... 'color': 'C1', ... 'marker': 'o' ... } ... }, ... y_true=y_true, ... verbose=4, ... _logger=custom_logger ... ) """ def _plot_reliability_diagram( models_data, y_true, figsize=(8, 8), title="Reliability Diagram", show_grid=True, plot_style="seaborn-whitegrid", verbose=0, ): """ Plots a reliability diagram for one or multiple models. """ style.use(plot_style) plt.figure(figsize=figsize) ax = plt.gca() # Plot the perfect calibration line ax.plot( [0, 1], [0, 1], "k--", label="Perfect Calibration", lw=2.5, zorder=1, ) # Default properties for iterating through models colors = [ "#E41A1C", "#377EB8", "#4DAF4A", "#984EA3", "#FF7F00", ] markers = ["o", "s", "^", "D", "P"] for i, (model_name, data) in enumerate( models_data.items() ): forecasts = data["forecasts"] line_style = data.get("style", "-") marker = data.get("marker", markers[i % len(markers)]) color = data.get("color", colors[i % len(colors)]) nominal_probs = np.arange(0.1, 1.0, 0.1) observed_probs = [] for prob in nominal_probs: lower_q = int(50 - (prob * 100 / 2)) upper_q = int(50 + (prob * 100 / 2)) lower_bound_col = f"q{lower_q}" upper_bound_col = f"q{upper_q}" if ( lower_bound_col not in forecasts.columns or upper_bound_col not in forecasts.columns ): if verbose: vlog( f"Warning: Columns for {prob * 100:.0f}% PI" f" not found in '{model_name}'. Skipping.", level=1, verbose=verbose, ) continue lower_bound = forecasts[lower_bound_col] upper_bound = forecasts[upper_bound_col] is_covered = (y_true >= lower_bound) & ( y_true <= upper_bound ) observed_probs.append(np.mean(is_covered)) # Filter nominal_probs to match available data plottable_nominal = nominal_probs[ : len(observed_probs) ] ax.plot( plottable_nominal, observed_probs, marker=marker, linestyle=line_style, color=color, label=model_name, lw=2, markersize=7, zorder=2, ) ax.set_xlabel( "Forecast Probability (Nominal)", fontsize=14 ) ax.set_ylabel( "Observed Frequency (Empirical)", fontsize=14 ) ax.set_title(title, fontsize=18, fontweight="bold") ax.legend(fontsize=12, loc="upper left") if show_grid: ax.grid( True, which="both", linestyle=":", linewidth=0.7 ) ax.set_aspect("equal", adjustable="box") ax.set_xlim(0, 1) ax.set_ylim(0, 1) plt.tight_layout() plt.show() @check_empty(["df"]) @isdf def compute_quantile_coverage( df: pd.DataFrame, quantiles: Sequence[float], q_prefix: str, actual_col: str, ) -> pd.DataFrame: """ For each nominal quantile q in `quantiles`, compute the fraction of samples where actual <= predicted q‑quantile. Returns a DataFrame with columns ['nominal', 'empirical']. """ records = [] for q in quantiles: col = f"{q_prefix}_q{int(q * 100)}" if col not in df: raise KeyError(f"Column '{col}' not found") # fraction of times actual ≤ prediction emp = np.mean(df[actual_col] <= df[col]) records.append({"nominal": q, "empirical": emp}) return pd.DataFrame.from_records(records) @check_empty(["df"]) @isdf def plot_reliability_diagram_in( coverage_df: pd.DataFrame, ax: plt.Axes = None, figsize=(6, 6), title: str = "Reliability Diagram", diag_kwargs: dict = None, pts_kwargs: dict = None, ) -> plt.Axes: """ Plot nominal vs empirical probabilities. """ if ax is None: fig, ax = plt.subplots(figsize=figsize) diag_kwargs = diag_kwargs or { "linestyle": "--", "color": "gray", } pts_kwargs = pts_kwargs or { "marker": "o", "linewidth": 1.5, } # diagonal ax.plot([0, 1], [0, 1], **diag_kwargs) # points ax.plot( coverage_df["nominal"], coverage_df["empirical"], **pts_kwargs, ) ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.set_xlabel("Nominal probability") ax.set_ylabel("Empirical frequency") ax.set_title(title) ax.grid(True) return ax
[docs] @check_empty(["df"]) @isdf def pivot_forecast( df: pd.DataFrame, *, index_col: str = "sample_idx", pivot_col: str | None = None, step_col: str = "forecast_step", time_col: str = "coord_t", value_cols: str | Sequence[str] | None = None, spatial_cols: Sequence[str] | None = None, aggfunc: str | Callable = "first", fill_value: Any = np.nan, sep: str = "_", time_formatter: Callable[[Any], str] = lambda t: ( pd.to_datetime(t).strftime("%Y-%m-%d") ), inplace: bool = False, savefile: str | None = None, verbose: int = 0, ) -> pd.DataFrame: """ Pivot a long-format forecast DataFrame into a wide one. This will take rows identified by `index_col` + a `step_col` (or datetime `time_col`) and spread each forecast step/time into its own set of columns for each value in `value_cols`, then re-attach the `spatial_cols`. Parameters ---------- df Long-format forecasts. Must include `index_col` and at least one of `step_col` or `time_col`. index_col Column that identifies each sample (e.g. "sample_idx"). pivot_col If provided, pivot on this column instead of auto-detecting. Must be either `step_col` or `time_col`. step_col Name of the integer 1‑based forecast step column. time_col Name of the datetime column (e.g. "coord_t"). value_cols Which forecast columns to pivot (e.g. "subsidence_q50" or ["subsidence_q10","subsidence_q50","subsidence_q90"]). If None, will auto-pick all numeric columns except index/pivot/spatial. spatial_cols List of columns holding static spatial info (e.g. ["longitude","latitude"]) to join back once pivoted. aggfunc Aggregation function for pivot (default "first"). fill_value What to put where a sample/step is missing (default NaN). sep Separator between value name and step/time in the new column names (default "_"). time_formatter How to turn a datetime/timestamp into a string for column names (default "%Y-%m-%d"). inplace If True, modifies `df` instead of copying. savefile If given, writes the resulting wide DataFrame to CSV at this path. verbose Passed to `vlog` for logging. Returns ------- pd.DataFrame Wide-format DataFrame with one row per `index_col` and columns like `<value><sep><step>` or `<value><sep><formatted time>`. Example -------- >>> from geoprior.utils.forecast_utils import pivot_forecast >>> dff = pivot_forecast( ... df_, ... index_col="sample_idx", ... pivot_col="coord_t", # ← force pivot on the datetime ... value_cols=["subsidence_q10","subsidence_q50","subsidence_q90"], ... spatial_cols=["longitude","latitude"], ... sep="_", # you’ll get subsidence_q50_2022 etc. ... time_formatter=lambda t: f"{t.year}", ... verbose=1 ... ) [INFO] Pivoting on 'coord_t' for values ['subsidence_q10', 'subsidence_q50', 'subsidence_q90'] [INFO] Joining back spatial cols ['longitude', 'latitude'] >>> dff.columns Out[37]: Index(['sample_idx', 'subsidence_q10_2022', 'subsidence_q10_2023', 'subsidence_q10_2024', 'subsidence_q50_2022', 'subsidence_q50_2023', 'subsidence_q50_2024', 'subsidence_q90_2022', 'subsidence_q90_2023', 'subsidence_q90_2024', 'longitude', 'latitude'], dtype='object') """ if not inplace: df = df.copy() if ( time_formatter is not (lambda t: t) and pivot_col is None ): pivot_col = time_col # 1) Decide pivot column if pivot_col is None: if step_col in df.columns: pivot_col = step_col elif time_col in df.columns: pivot_col = time_col else: raise KeyError( f"Neither '{step_col}' nor '{time_col}' found; " "cannot auto-detect pivot column." ) elif pivot_col not in df.columns: raise KeyError( f"pivot_col='{pivot_col}' not in DataFrame" ) # 2) Determine value columns if value_cols is None: excluded = { index_col, step_col, time_col, *(spatial_cols or ()), } value_cols = [ c for c in df.select_dtypes( include="number" ).columns if c not in excluded ] elif isinstance(value_cols, str): value_cols = [value_cols] # 3) Pivot vlog( f"Pivoting on '{pivot_col}' for values {value_cols}", level=3, verbose=verbose, ) wide = df.pivot_table( index=index_col, columns=pivot_col, values=value_cols, aggfunc=aggfunc, fill_value=fill_value, ) # 4) Flatten column MultiIndex → single level new_cols = [] for val_name, pivot_val in wide.columns: if pivot_col == time_col: pv_str = time_formatter(pivot_val) else: pv_str = str(pivot_val) new_cols.append(f"{val_name}{sep}{pv_str}") wide.columns = new_cols # 5) Re-attach spatial columns if spatial_cols: vlog( f"Joining back spatial cols {spatial_cols}", level=3, verbose=verbose, ) sp = ( df[[index_col, *spatial_cols]] .drop_duplicates(index_col) .set_index(index_col) ) wide = wide.join(sp, how="left") # 6) Reset index so index_col is a column again result = wide.reset_index() # 7) Optionally save to CSV if savefile: os.makedirs( os.path.dirname(savefile) or ".", exist_ok=True ) result.to_csv(savefile, index=False) vlog( f"Pivoted forecasts saved to: {savefile}", level=1, verbose=verbose, ) return result
[docs] @check_empty(["df"]) @isdf def add_forecast_times( df: pd.DataFrame, *, forecast_times: Sequence[ int | str | date | datetime | pd.Timestamp ] | None = None, start: int | str | date | datetime | pd.Timestamp | None = None, freq: str = "YS", step_col: str = "forecast_step", time_col: str = "coord_t", error: Literal["raise", "warn", "ignore"] = "raise", inplace: bool = False, savefile: str | None = None, verbose: int = 0, ) -> pd.DataFrame: """ Map each 1‑based forecast_step into an explicit calendar time. You may either: 1. Pass `forecast_times` of length H (one per step), **or** 2. Pass a single `start` plus a pandas‐style `freq` to generate H dates. If any entry in `forecast_times` is an integer of exactly 4 digits, it will be interpreted as January 1 of that year. Parameters ---------- df : pd.DataFrame Long‐format forecast table. Must contain an integer column `step_col` with values 1..H. forecast_times : sequence, optional Explicit sequence of length H specifying the target times. Each entry may be: - int (interpreted as January 1 of that year) - str/`pd.Timestamp`/`datetime.date` start : int or str or date or Timestamp, optional Only used if `forecast_times` is None. The first time in the sequence; subsequent times will be generated via `pd.date_range`. If int, treated as a year at Jan 1. freq : str, default "YS" Pandas offset alias for frequency (e.g. "YS"=year start, "MS"=month start, "D"=day, etc.). Only used when `start` is set. step_col : str, default "forecast_step" Name of the 1‑based step index in `df`. time_col : str, default "coord_t" Name of the new column to create with mapped times. error : {'raise','warn','ignore'}, default 'raise' Policy if `df[step_col].max()` > number of provided times: - 'raise': throw ValueError - 'warn': issue warning, then still map what you can (truncate) - 'ignore': silently truncate to available times inplace : bool, default False If True, modify `df` in place; otherwise return a new DataFrame. savefile : str, optional If provided, path to CSV where the resulting DataFrame will be saved. verbose : int, default 0 Passed to `vlog` for debug logging. Returns ------- pd.DataFrame DataFrame with an added column `time_col` of dtype datetime64. Raises ------ ValueError If neither `forecast_times` nor `start` is provided, or if `error='raise'` and there aren’t enough times. Examples -------- >>> from geoprior.utils.forecast_utils import add_forecast_times >>> df = pd.DataFrame({ ... "sample_idx": [0]*3 + [1]*3, ... "forecast_step": [1,2,3]*2 ... }) >>> add_forecast_times(df, ... forecast_times=[2022,2023,2024]) sample_idx forecast_step coord_t 0 0 1 2022-01-01 1 0 2 2023-01-01 2 0 3 2024-01-01 3 1 1 2022-01-01 4 1 2 2023-01-01 5 1 3 2024-01-01 >>> # Or generate from a start + yearly freq: >>> add_forecast_times(df, start="2022-06-15", freq="YS") """ if not inplace: df = df.copy() if step_col not in df.columns: raise KeyError(f"'{step_col}' not found in DataFrame") # 1. Build raw list of time candidates if forecast_times is not None: raw_times = list(forecast_times) elif start is not None: # interpret 4-digit int or parse otherwise if isinstance(start, int) and 1000 <= start <= 9999: start = pd.Timestamp(year=start, month=1, day=1) raw_times = pd.date_range( start=start, periods=int(df[step_col].max()), freq=freq, ).tolist() else: raise ValueError( "Must provide either `forecast_times` or `start`" ) # 2. Normalize each entry to pd.Timestamp times: list[pd.Timestamp] = [] for t in raw_times: if isinstance(t, int) and 1000 <= t <= 9999: # 4‑digit → Jan 1 of that year times.append(pd.Timestamp(year=t, month=1, day=1)) else: # let pandas parse strings/dates/timestamps times.append(pd.to_datetime(t)) H = len(times) max_step = int(df[step_col].max()) if max_step > H: msg = ( f"Found {step_col} up to {max_step}, " f"but only {H} forecast times provided" ) if error == "raise": raise ValueError(msg) elif error == "warn": warnings.warn(msg, UserWarning, stacklevel=2) # truncate H = min(H, max_step) vlog( f"Mapping steps 1..{H} -> times {times[:H]}", level=3, verbose=verbose, ) arr = np.array(times[:H], dtype="datetime64[ns]") # clip to [1,H], subtract 1 for zero-based idx = df[step_col].to_numpy().clip(1, H).astype(int) - 1 df[time_col] = arr[idx] # 3. Optionally save to CSV if savefile: os.makedirs( os.path.dirname(savefile) or ".", exist_ok=True ) df.to_csv(savefile, index=False) vlog( f"Forecast times DataFrame saved to: {savefile}", level=1, verbose=verbose, ) return df
[docs] @check_empty(["df"]) def normalize_for_pinn( df: pd.DataFrame, time_col: str, coord_x: str, coord_y: str, cols_to_scale: list[str] | str | None = "auto", scale_coords: bool = True, exclude_cols: list[str] | None = None, protect_si_suffix: str = "__si", shift_time_by_horizon: bool = False, verbose: int = 1, forecast_horizon: int | None = None, _logger: logging.Logger | Callable[[str], None] | None = None, coord_scaler: MinMaxScaler | None = None, fit_coord_scaler: bool = True, other_scaler: MinMaxScaler | None = None, fit_other_scaler: bool = True, **kws, ) -> tuple[ pd.DataFrame, MinMaxScaler | None, MinMaxScaler | None, ]: """ Apply Min-Max normalization to spatial-temporal coordinates and optionally to other numeric columns. If ``cols_to_scale == "auto"``, automatically select numeric columns excluding categorical and one-hot features. By default, this function scales the time, longitude, and latitude columns (if ``scale_coords=True``). Then, it either scales explicitly provided columns in ``cols_to_scale`` or automatically infers numeric columns (excluding coordinates if ``scale_coords`` is False, and excluding one-hot/boolean columns). The Min-Max scaling for a feature :math:`x` is: .. math:: x' = \\frac{x - \\min(x)}{\\max(x) - \\min(x)} Parameters ---------- df : pd.DataFrame The input DataFrame containing at least ``time_col``, ``coord_x``, and ``coord_y`` columns. The DataFrame should contain temporal and spatial information to be scaled. time_col : str The name of the numeric time column (e.g., year as numeric or datetime). This column will be used to adjust and scale the temporal data. coord_x : str The name of the longitude column in the DataFrame. This column will be scaled along with the latitude and time columns. coord_y : str The name of the latitude column in the DataFrame. This column will be scaled along with the longitude and time columns. cols_to_scale : list of str or "auto" or None, default "auto" If a list of column names, scales exactly those columns. If ``"auto"``, selects all numeric columns, excluding ``time_col``, ``coord_x``, and ``coord_y`` if ``scale_coords=False``, and excluding one-hot encoded columns whose values are only ``{0, 1}``. If None, no extra columns are scaled. scale_coords : bool, default True If True, scales the ``[time_col, coord_x, coord_y]`` columns. If False, these columns remain unchanged. verbose : int, default 1 Verbosity level for logging. Values higher than 1 provide more detailed logging information. forecast_horizon : Optional[int], default None The number of time steps to shift the time column by. This is added to the time values before scaling if provided. _logger : Optional[Union[logging.Logger, Callable[[str], None]]], default None Logger or function to handle logging messages. If None, the default logging mechanism is used. kws : dict, optional These will be passed on to any other internal function used in the data processing or scaling steps. Returns ------- df_scaled : pd.DataFrame A new DataFrame with the specified columns normalized. coord_scaler : MinMaxScaler or None The fitted scaler for the ``[time_col, coord_x, coord_y]`` columns if ``scale_coords=True``, else ``None``. other_scaler : MinMaxScaler or None The fitted scaler for any additional columns that were scaled (either explicitly provided or auto-selected). None if no columns were scaled beyond the coordinates. Raises ------ TypeError If `df` is not a DataFrame, or `cols_to_scale` is neither a list nor "auto" nor None, or if any explicitly provided column is not a string. ValueError If required columns (``time_col``, ``coord_x``, ``coord_y``) or any of ``cols_to_scale`` do not exist in ``df``, or cannot be converted to numeric. Examples -------- >>> import pandas as pd >>> from geoprior.nn.pinn.utils import normalize_for_pinn >>> data = { ... "year_num": [0.0, 1.0, 2.0], ... "lon": [100.0, 101.0, 102.0], ... "lat": [30.0, 31.0, 32.0], ... "feat1": [10.0, 20.0, 30.0], ... "one_hot_A": [0, 1, 0] ... } >>> df = pd.DataFrame(data) >>> df_scaled, coord_scl, feat_scl = normalize_for_pinn( ... df, ... time_col="year_num", ... coord_x="lon", ... coord_y="lat", ... cols_to_scale="auto", ... scale_coords=True, ... verbose=2 ... ) >>> # 'year_num','lon','lat','feat1' get scaled; 'one_hot_A' excluded >>> df_scaled["year_num"].tolist() [0.0, 0.5, 1.0] >>> df_scaled["feat1"].tolist() [0.0, 0.5, 1.0] Notes ----- - When ``cols_to_scale="auto"``, numeric columns with only ``{0, 1}`` values are assumed to be one-hot and excluded from scaling. - If ``scale_coords=False``, coordinate columns remain unchanged, and auto-selection (if used) will exclude them. - Returned ``coord_scaler`` is ``None`` if ``scale_coords=False``. Returned ``other_scaler`` is ``None`` if ``cols_to_scale`` is ``None`` or results in an empty set after filtering. See Also -------- sklearn.preprocessing.MinMaxScaler : Scales features to [0,1]. """ # --- Validate df --- if not isinstance(df, pd.DataFrame): raise TypeError( f"`df` must be a pandas DataFrame, got " f"{type(df).__name__}" ) # --- Validate core column names --- for name in (time_col, coord_x, coord_y): if not isinstance(name, str): raise TypeError( f"Column names must be strings, got {name}" ) if name not in df.columns: raise ValueError( f"Column '{name}' not found in DataFrame" ) # --- Validate cols_to_scale type --- if cols_to_scale is not None and cols_to_scale != "auto": if not isinstance(cols_to_scale, list) or not all( isinstance(c, str) for c in cols_to_scale ): raise TypeError( "`cols_to_scale` must be a list of strings, " "'auto', or None" ) # Make a copy to avoid side effects df_scaled = df.copy(deep=True) # --- 1. Optionally adjust time before scaling (OFF by default) --- if shift_time_by_horizon and forecast_horizon is not None: if pd.api.types.is_numeric_dtype(df_scaled[time_col]): df_scaled[time_col] = df_scaled[time_col] + float( forecast_horizon ) vlog( f"Time column shifted by forecast_horizon={forecast_horizon}", verbose=verbose, level=4, logger=_logger, ) elif pd.api.types.is_datetime64_any_dtype( df_scaled[time_col] ): df_scaled = increment_dates_by_horizon( df_scaled, time_col, forecast_horizon ) vlog( f"Datetime column shifted by forecast_horizon={forecast_horizon}", verbose=verbose, level=4, logger=_logger, ) # --- 2. Scale coordinates if requested --- if scale_coords: vlog( "Scaling time, lon, lat columns...", verbose=verbose, level=2, logger=_logger, ) coord_cols = [time_col, coord_x, coord_y] for col in coord_cols: if not pd.api.types.is_numeric_dtype( df_scaled[col] ): try: df_scaled[col] = pd.to_numeric( df_scaled[col] ) vlog( f"Converted '{col}' to numeric.", verbose=verbose, level=3, logger=_logger, ) except Exception as e: raise ValueError( f"Cannot convert '{col}' to numeric: {e}" ) if coord_scaler is None: if not fit_coord_scaler: raise ValueError( "coord_scaler must be provided when fit_coord_scaler=False" ) coord_scaler = MinMaxScaler() if fit_coord_scaler: df_scaled[coord_cols] = ( coord_scaler.fit_transform( df_scaled[coord_cols] ) ) else: if not hasattr(coord_scaler, "min_"): raise ValueError( "fit_coord_scaler=False but `coord_scaler` is not fitted." ) df_scaled[coord_cols] = coord_scaler.transform( df_scaled[coord_cols] ) # --- 3. Determine `other_cols_to_scale` --- exclude_set = set(exclude_cols or []) if protect_si_suffix: exclude_set |= { c for c in df_scaled.columns if str(c).endswith(protect_si_suffix) } if cols_to_scale == "auto": numeric_cols = df_scaled.select_dtypes( include=[np.number] ).columns.tolist() # always exclude coords from "other" scaling for c in (time_col, coord_x, coord_y): if c in numeric_cols: numeric_cols.remove(c) auto_cols = [] for c in numeric_cols: if c in exclude_set: vlog( f"Excluding locked column '{c}' from auto-scaling.", verbose=verbose, level=3, logger=_logger, ) continue uniq = pd.unique(df_scaled[c]) if set(np.unique(uniq)) <= {0, 1}: vlog( f"Excluding one-hot/boolean column '{c}' from auto-scaling.", verbose=verbose, level=3, logger=_logger, ) continue auto_cols.append(c) other_cols_to_scale = auto_cols elif isinstance(cols_to_scale, list): other_cols_to_scale = [ c for c in cols_to_scale if c not in exclude_set ] dropped = [ c for c in cols_to_scale if c in exclude_set ] if dropped: vlog( f"Dropped locked columns from scaling list: {dropped}", verbose=verbose, level=2, logger=_logger, ) else: # cols_to_scale is None other_cols_to_scale = [] # --- 4. Scale `other_cols_to_scale` if any --- if other_cols_to_scale: vlog( f"Scaling additional columns: {other_cols_to_scale}", verbose=verbose, level=2, logger=_logger, ) # Verify existence and numeric type valid_cols = [] for col in other_cols_to_scale: if col not in df_scaled.columns: raise ValueError( f"Column '{col}' not found for scaling." ) if not pd.api.types.is_numeric_dtype( df_scaled[col] ): try: df_scaled[col] = pd.to_numeric( df_scaled[col] ) vlog( f"Converted '{col}' to numeric.", verbose=verbose, level=3, logger=_logger, ) except Exception as e: raise ValueError( f"Cannot convert '{col}' to numeric: {e}" ) valid_cols.append(col) if valid_cols: if other_scaler is None: if not fit_other_scaler: raise ValueError( "other_scaler must be provided when fit_other_scaler=False" ) other_scaler = MinMaxScaler() if fit_other_scaler: df_scaled[valid_cols] = ( other_scaler.fit_transform( df_scaled[valid_cols] ) ) else: if not hasattr(other_scaler, "min_"): raise ValueError( "fit_other_scaler=False but `other_scaler` is not fitted." ) df_scaled[valid_cols] = ( other_scaler.transform( df_scaled[valid_cols] ) ) vlog( f" other_scaler.data_min_: {other_scaler.data_min_}", verbose=verbose, level=3, logger=_logger, ) vlog( f" other_scaler.data_max_: {other_scaler.data_max_}", verbose=verbose, level=3, logger=_logger, ) return df_scaled, coord_scaler, other_scaler
def increment_dates_by_horizon( df: pd.DataFrame, time_col: str, forecast_horizon: int ) -> pd.DataFrame: """ Increments the values in a datetime column by the forecast horizon. Parameters ---------- df : pd.DataFrame The DataFrame containing the time column to be adjusted. time_col : str The name of the datetime column in the DataFrame. forecast_horizon : int The forecast horizon (number of years or time steps to add). Returns ------- pd.DataFrame The DataFrame with the adjusted time column. """ # Convert the time column to datetime if it's not already if not pd.api.types.is_datetime64_any_dtype(df[time_col]): df[time_col] = pd.to_datetime(df[time_col]) # Add the forecast horizon (years) df[time_col] = df[time_col] + pd.DateOffset( years=forecast_horizon ) return df
[docs] def adjust_time_predictions( df: pd.DataFrame, time_col: str, forecast_horizon: int, coord_scaler: MinMaxScaler | None = None, inverse_transformed: bool = False, verbose: int = 1, ) -> pd.DataFrame: """ Adjusts time predictions by adding the forecast horizon to inverse normalized time. If the time column has already been inverse-transformed, skip the inverse transformation. Parameters ---------- df : pd.DataFrame The DataFrame containing the time predictions (inverse scaled). The time column specified by `time_col` should contain the time values that need to be adjusted. time_col : str The name of the time column in the DataFrame. This column will be adjusted by adding the forecast horizon. forecast_horizon : int The forecast horizon (e.g., number of years or time steps) that will be added to the time predictions. This value shifts the time predictions forward. coord_scaler : MinMaxScaler, optional The scaler that was used for the coordinates. It is necessary to reverse the scaling for the time column if it was previously normalized. If not provided, the time column should already be inverse-transformed. inverse_transformed : bool, default False If `True`, skips the inverse transformation of the time column and directly adds the forecast horizon. This is useful when the time column has already been inverse-transformed, and you only need to adjust the time by the forecast horizon. verbose : int, default 1 Verbosity level for logging. Higher values (e.g., `verbose=2`) provide more detailed information about the operation. Returns ------- pd.DataFrame The adjusted DataFrame with the time column updated to reflect the forecast horizon. The time predictions are adjusted by adding the `forecast_horizon` to each entry in the time column. Raises ------ ValueError If the time column is not found in the DataFrame or if the scaler is not available when necessary. Examples -------- >>> import pandas as pd >>> from sklearn.preprocessing import MinMaxScaler >>> # Sample data for illustration >>> df = pd.DataFrame({ >>> 'year': [0.0, 0.5, 1.0], >>> 'subsidence': [0.1, 0.2, 0.3] >>> }) >>> scaler = MinMaxScaler() >>> df_scaled = df.copy() >>> df_scaled['year'] = scaler.fit_transform(df_scaled[['year']]) >>> adjusted_df = adjust_time_predictions( >>> df_scaled, >>> time_col='year', >>> forecast_horizon=4, >>> coord_scaler=scaler, >>> inverse_transformed=False, >>> verbose=2 >>> ) >>> adjusted_df['year'] [0.0, 0.5, 1.0] -> After adjustment, will be shifted to the future. Notes ----- - The time column must be in a normalized scale if not already inverse-transformed. - If `inverse_transformed=True`, the time values will directly be adjusted by the `forecast_horizon` without applying the inverse transformation. - The forecast horizon is added directly to the time values after the necessary inverse transformation (if applicable). See Also -------- sklearn.preprocessing.MinMaxScaler : Scales features to [0,1]. """ if time_col not in df.columns: raise ValueError( f"Column '{time_col}' not found in DataFrame." ) if coord_scaler is None and not inverse_transformed: raise ValueError( "coord_scaler is required unless `inverse_transformed` is True." ) # Apply inverse scaling to the time predictions if they haven't been inverse-transformed yet time_predictions = df[time_col].values if not inverse_transformed: # Revert the time predictions from the normalized scale to the original scale time_predictions = coord_scaler.inverse_transform( time_predictions.reshape(-1, 1) ).flatten() # Add the forecast horizon to the time predictions adjusted_time_predictions = ( time_predictions + forecast_horizon ) # Update the DataFrame with the adjusted time predictions df[time_col] = adjusted_time_predictions if verbose >= 1: logger.info( f"Time predictions adjusted by {forecast_horizon} years." ) return df
[docs] def stack_quantile_predictions( q_lower: np.ndarray | Sequence, q_median: np.ndarray | Sequence, q_upper: np.ndarray | Sequence, ) -> np.ndarray: """ Stack three quantile trajectories into a single y_pred array of shape (n_samples, 3, n_timesteps), ready for PSS. Parameters ---------- q_lower, q_median, q_upper : array-like Each is either - 1D: (n_timesteps,) → interpreted as a single sample, or - 2D: (n_samples, n_timesteps) Returns ------- y_pred : np.ndarray, shape (n_samples, 3, n_timesteps) Where axis=1 indexes [lower, median, upper]. Raises ------ ValueError If the three inputs (after promotion) do not share the same shape. """ def _ensure_2d(arr): a = np.asarray(arr) if a.ndim == 1: return a.reshape(1, -1) if a.ndim == 2: return a raise ValueError( f"Each quantile array must be 1D or 2D, got shape {a.shape}" ) lower = _ensure_2d(q_lower) median = _ensure_2d(q_median) upper = _ensure_2d(q_upper) if not (lower.shape == median.shape == upper.shape): raise ValueError( "All three quantile arrays must have the same shape " f"after promotion, got {lower.shape}, {median.shape}, {upper.shape}" ) # Stack along new axis=1 → (n_samples, 3, n_timesteps) y_pred = np.stack([lower, median, upper], axis=1) return y_pred
[docs] def get_step_names( forecast_steps: Iterable[int], step_names: Mapping[Any, str] | Sequence[str] | None | None = None, default_name: str = "", ) -> dict[int, str]: r""" Build a *step → label* mapping for multi‑horizon plots. The helper reconciles an integer list ``forecast_steps`` with an optional *alias* container (*dict* or *sequence*) and returns a dictionary whose keys are the integer steps and whose values are human‑readable labels. Matching is **case‑insensitive** and tolerant to common delimiters—e.g. ``"Step 1"``, ``"step‑1"``, or ``"forecast step 1"`` will all map to integer step ``1``. Parameters ---------- forecast_steps : Iterable[int] Ordered steps, e.g. ``[1, 2, 3]``. step_names : dict | list | tuple | None, default=None Custom labels. Accepted forms * **dict** – keys may be ``int`` or *any* string representation of the step. * **sequence** – positional, where the *k*‑th element labels step ``k+1``. * **None** – no custom mapping. default_name : str, default="" Fallback label for steps missing from *step_names*. If empty, the step number itself is used (as a string). Returns ------- dict[int, str] Mapping ``{step : label}`` for every element of *forecast_steps*. Notes ----- * Dictionary keys are normalised with ``int(re.sub(r"[^0-9]", "", str(key)))`` before matching. * Duplicate keys in *step_names* are resolved by **last‐one wins** semantics. Examples -------- >>> from geoprior.utils.forecast_utils import get_step_names >>> get_step_names( ... forecast_steps=[1, 2, 3], ... step_names={"1": "Year 2021", 2: "2022", "step 3": "2023"}, ... ) {1: 'Year 2021', 2: '2022', 3: '2023'} >>> get_step_names( ... forecast_steps=[1, 2, 3, 4], ... step_names={"1": "2021", "2": "2022"}, ... ) {1: '2021', 2: '2022', 3: '3', 4: '4'} >>> get_step_names( ... [1, 2, 3, 4], ... step_names=None, ... default_name="step with no name", ... ) {1: 'step with no name', 2: 'step with no name', 3: 'step with no name', 4: 'step with no name'} See Also -------- geoprior.utils.data_utils.widen_temporal_columns : Converts long format to wide; often used with *forecast_steps* when plotting. """ # Ensure we have a concrete list to preserve order and allow # multiple passes. forecast_steps = columns_manager( forecast_steps, empty_as_none=False ) steps: list[int] = [int(s) for s in forecast_steps] lookup: dict[int, str] = {} if step_names is None: pass # remain empty elif isinstance(step_names, Mapping): for k, v in step_names.items(): idx = _to_int_key(k) # Skip keys that cannot be coerced to int (e.g. None, dict) if idx is not None: lookup[idx] = str(v) elif isinstance(step_names, Sequence) and not isinstance( step_names, str | bytes ): for idx, v in enumerate(step_names, start=1): lookup[idx] = str(v) else: raise TypeError( "`step_names` must be a mapping, a sequence, or None " f"(got {type(step_names).__name__})." ) # Build the final mapping, applying defaults where necessary. result: dict[int, str] = {} for step in steps: if step in lookup: result[step] = lookup[step] elif default_name: result[step] = default_name else: result[step] = str(step) return result
def _to_int_key(key: Any) -> int | None: """Try to coerce a mapping key to int by stripping non‑digits.""" if isinstance(key, int): return key digits = "".join(_DIGIT_RE.findall(str(key))) return int(digits) if digits else None
[docs] @isdf def format_forecast_dataframe( df: pd.DataFrame, to_wide: bool = True, time_col: str = "coord_t", spatial_cols: tuple[str] = ("coord_x", "coord_y"), value_prefixes: list[str] | None = None, _logger: logging.Logger | Callable[[str], None] | None = None, **pivot_kwargs, ) -> pd.DataFrame | str: """Auto-detects DataFrame format and conditionally pivots to wide format. This function serves as a smart wrapper. It first determines if the input DataFrame is in a 'long' or 'wide' forecast format based on its column structure. If `to_wide` is True and the format is 'long', it calls :func:`pivot_forecast_dataframe` to perform the transformation. Parameters ---------- df : pd.DataFrame The input DataFrame to check and potentially transform. to_wide : bool, default True - If ``True``, the function's goal is to return a wide-format DataFrame. It will pivot a long-format frame or return a wide-format frame as is. - If ``False``, the function only performs detection and returns a string ('wide', 'long', or 'unknown'). time_col : str, default 'coord_t' The name of the column that indicates the time step. Its presence is a primary indicator of a long-format DataFrame. value_prefixes : list of str, optional A list of prefixes for the value columns (e.g., ['subsidence', 'GWL']). If ``None``, the function will attempt to infer them from column names that do not match common ID columns. **pivot_kwargs Additional keyword arguments to pass down to the :func:`pivot_forecast_dataframe` function if it is called. Common arguments include `id_vars`, `static_actuals_cols`, `verbose`, etc. Returns ------- pd.DataFrame or str - If `to_wide` is ``True``, returns the (potentially pivoted) wide-format ``pd.DataFrame``. - If `to_wide` is ``False``, returns a string: 'wide', 'long', or 'unknown'. See Also -------- pivot_forecast_dataframe : The underlying function that performs the pivot operation. Examples -------- >>> # df_long is a typical long-format forecast output >>> df_long.columns Index(['sample_idx', 'forecast_step', 'coord_t', 'coord_x', ...]) >>> # Detect format >>> format_str = format_forecast_dataframe(df_long, to_wide=False) >>> print(format_str) 'long' >>> >>> # Convert to wide format >>> df_wide = format_forecast_dataframe( ... df_long, ... to_wide=True, ... id_vars=['sample_idx', 'coord_x', 'coord_y'], ... value_prefixes=['subsidence', 'GWL'], ... static_actuals_cols=['subsidence_actual'] ... ) >>> # print(df_wide.columns) # Index(['sample_idx', 'coord_x', 'coord_y', 'subsidence_actual', # 'GWL_2018_q50', ...], dtype='object') """ # --- Format Detection Logic --- # Heuristic 1: If time_col exists, it's very likely 'long' format. is_long_format = time_col in df.columns # Heuristic 2: Check for wide-format columns like 'prefix_YYYY_suffix' # Use a regex to look for (prefix)_(4-digit year)_... _spatial_cols = columns_manager( spatial_cols, empty_as_none=False ) if spatial_cols: spatial_cols = columns_manager(spatial_cols) check_spatial_columns(df, spatial_cols=spatial_cols) # coord_x, coord_y = spatial_cols if value_prefixes is None: # Auto-infer prefixes if not provided # Exclude common ID columns non_value_cols = { "sample_idx", *_spatial_cols, "forecast_step", time_col, } value_prefixes = sorted( list( set( [ c.split("_")[0] for c in df.columns if c not in non_value_cols ] ) ) ) wide_col_pattern = re.compile( r"(" + "|".join(re.escape(p) for p in value_prefixes) + r")_(\d{4})_?.*" ) has_wide_columns = any( wide_col_pattern.match(col) for col in df.columns ) detected_format = "unknown" if is_long_format: detected_format = "long" elif has_wide_columns: detected_format = "wide" verbose = pivot_kwargs.get("verbose", 0) vlog( f"Auto-detected DataFrame format: '{detected_format}'", level=1, verbose=verbose, logger=_logger, ) # --- Action based on mode --- if to_wide: if detected_format == "long": vlog( "`to_wide` is True and format is 'long'. " "Pivoting DataFrame...", level=1, verbose=verbose, logger=_logger, ) # Pass necessary args to the pivot function pivot_args = { "time_col": time_col, "value_prefixes": value_prefixes, **pivot_kwargs, # Pass through other args } if "id_vars" not in pivot_args: # Provide a sensible default for id_vars if not given pivot_args["id_vars"] = [ c for c in ["sample_idx", *_spatial_cols] if c in df.columns ] vlog( f"Using default id_vars: {pivot_args['id_vars']}", level=2, verbose=verbose, logger=_logger, ) return pivot_forecast_dataframe( df.copy(), _logger=_logger, **pivot_args ) elif detected_format == "wide": vlog( "`to_wide` is True but DataFrame is already in wide " "format. Returning as is.", level=1, verbose=verbose, logger=_logger, ) return df else: # 'unknown' vlog( "Warning: DataFrame format is 'unknown'. " "Cannot pivot. Returning original DataFrame.", level=1, verbose=verbose, logger=_logger, ) return df else: # to_wide is False return detected_format
[docs] @isdf def get_value_prefixes( df: pd.DataFrame, exclude_cols: list[str] | None = None, spatial_cols: tuple[str, str] = ("coord_x", "coord_y"), time_col: str = "coord_t", ) -> list[str]: """ Automatically detects the prefixes of value columns from a DataFrame. This utility inspects the column names to infer the base names of the metrics being forecasted (e.g., 'subsidence', 'GWL'), excluding common ID and coordinate columns. It works with both long and wide format forecast DataFrames. Parameters ---------- df : pd.DataFrame The DataFrame from which to detect value prefixes. exclude_cols : list of str, optional A list of columns to explicitly ignore during detection. If None, a default list of common ID/coordinate columns is used (e.g., 'sample_idx', 'coord_x', 'coord_t', etc.). Returns ------- list of str A sorted list of unique prefixes found in the column names. Examples -------- >>> from geoprior.utils.data_utils import get_values_prefixes >>> # For a long-format DataFrame >>> long_cols = ['sample_idx', 'coord_t', 'subsidence_q50', 'GWL_q50'] >>> df_long = pd.DataFrame(columns=long_cols) >>> get_value_prefixes(df_long) ['GWL', 'subsidence'] >>> # For a wide-format DataFrame >>> wide_cols = ['sample_idx', 'coord_x', 'subsidence_2022_q90', 'GWL_2022_q50'] >>> df_wide = pd.DataFrame(columns=wide_cols) >>> get_value_prefixes(df_wide) ['GWL', 'subsidence'] """ if exclude_cols is None: # Default set of columns that are not value columns exclude_cols = { "sample_idx", "forecast_step", time_col, *spatial_cols, } else: exclude_cols = set(exclude_cols) prefixes = set() for col in df.columns: if col in exclude_cols: continue # The prefix is assumed to be the part before the first underscore prefix = col.split("_")[0] prefixes.add(prefix) return sorted(list(prefixes))
def get_test_data_from( df: pd.DataFrame, time_col: str, time_steps: int, train_end_year: int | None = None, forecast_horizon: int | None = 1, strategy: str = "onwards", objective: str = "pure", # for forecasting verbose: int = 1, _logger: logging.Logger | Callable[[str], None] | None = None, ) -> pd.DataFrame: r""" Prepares the test data for forecasting by ensuring there is enough future data. It adjusts the start and end years based on the forecast horizon and strategy provided. Parameters ---------- df : pd.DataFrame The scaled dataframe containing the time series data. time_col : str The column name for time in the dataframe (e.g., year). time_steps : int The number of steps to look back for generating sequences. train_end_year : int, optional The last year used for training. If not provided, defaults to the last year in the dataset. forecast_horizon : int, optional The forecast horizon, i.e., the number of years to predict. Default is 1. strategy : {'onwards', 'lookback'}, default 'onwards' - 'onwards': Start the test data from `train_end_year + time_steps`. - 'lookback': Use the available data until `train_end_year`. verbose : int, optional Verbosity level for logging (default 1). _logger : logging.Logger or Callable, optional Logger or callable for logging the messages. Returns ------- pd.DataFrame The prepared test data, based on the forecast horizon and selected strategy. Notes ----- - If `train_end_year` is not provided, it is determined automatically from the latest year in the `time_col`. - The `strategy` parameter defines how the test data is selected: * `onwards`: Select data starting from `train_end_year + time_steps`. * `lookback`: Select data until `train_end_year`. - If there isn't enough data for the `forecast_horizon`, the function will adjust and either fetch earlier data or use the `lookback` strategy. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ >>> 'year': [2015, 2016, 2017, 2018, 2019, 2020], >>> 'subsidence': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6] >>> }) >>> test_data = get_test_data_from( >>> df=df, >>> time_col='year', >>> time_steps=3, >>> forecast_horizon=2, >>> strategy='onwards', >>> verbose=2 >>> ) >>> print(test_data) """ def _v(msg, lvl): vlog(msg, verbose=verbose, level=lvl, logger=_logger) # Determine the last available year if not explicitly given if train_end_year is None: if pd.api.types.is_numeric_dtype(df[time_col]): train_end_year = df[ time_col ].max() # Use the last numeric year elif pd.api.types.is_datetime64_any_dtype( df[time_col] ): # Handle various datetime formats if df[time_col].dt.is_year_end.any(): train_end_year = df[ time_col ].dt.year.max() # Year format elif df[time_col].dt.is_month_end.any(): train_end_year = ( df[time_col].dt.to_period("M").max().year ) # Month format elif df[time_col].dt.week.any(): train_end_year = ( df[time_col].dt.to_period("W").max().year ) # Week format else: # Default to full datetime handling train_end_year = df[time_col].dt.year.max() if verbose >= 1: _v( "Train end year not specified." f" Using last available year: {train_end_year}", lvl=1, ) # Adjust forecast end year based on the datetime frequency forecast_end_year = train_end_year + forecast_horizon forecast_start_year = train_end_year - forecast_horizon if objective == "forecasting": forecast_end_year = train_end_year + time_steps forecast_start_year = train_end_year - time_steps if verbose >= 2: _v( f"Forecast start year: {forecast_start_year}," f" Forecast end year: {forecast_end_year}", lvl=2, ) # Handle data based on the specified strategy if strategy == "onwards": # Get the test data starting from the forecast start year # (train_end_year + time_steps) test_data = df[df[time_col] >= forecast_end_year] elif strategy == "lookback": # If 'lookback', select data that ends at `train_end_year` # (train_end_year - time_steps) if objective == "forecasting": test_data = df[ df[time_col] >= forecast_start_year ] else: test_data = df[df[time_col] > forecast_start_year] else: raise ValueError( f"Invalid strategy '{strategy}'." " Choose either 'onwards' or 'lookback'." ) # Check if enough data is available for the forecast horizon if ( test_data.empty or len(test_data[time_col].unique()) < forecast_horizon ): if verbose >= 1: _v( f"Not enough data for the forecast horizon" f" ({forecast_horizon} years). Adjusting test data.", lvl=1, ) # Fall back to using a lookback strategy if there # isn't enough data for the forecast horizon # (train_end_year - time_steps) if objective == "forecasting": test_data = df[ df[time_col] >= forecast_start_year ] else: test_data = df[df[time_col] > forecast_start_year] if verbose >= 1: _v( f"Using data from {forecast_start_year} onwards.", lvl=1, ) # Log the years included in the test data if verbose >= 1: _v( f"Test data years: {test_data[time_col].unique()}", lvl=2, ) # Return the test data return test_data
[docs] @check_empty(["data"]) @is_data_readable def pivot_forecast_dataframe( data: pd.DataFrame, id_vars: list[str], time_col: str, value_prefixes: list[str], static_actuals_cols: list[str] | None = None, time_col_is_float_year: bool | str = "auto", round_time_col: bool = False, verbose: int = 0, savefile: str | None = None, _logger: logging.Logger | Callable[[str], None] | None = None, **kws, ) -> pd.DataFrame: """Transforms a long-format forecast DataFrame to a wide format. This utility reshapes time series prediction data from a "long" format, where each row represents a single time step for a given sample, to a "wide" format, where each row represents a single sample and columns correspond to values at different time steps. Parameters ---------- data : pd.DataFrame The input long-format DataFrame. It must contain the columns specified in `id_vars` and `time_col`, as well as value columns that start with the strings in `value_prefixes`. id_vars : list of str A list of column names that uniquely identify each sample or group. These columns will be preserved in the wide-format output. For example: ``['sample_idx', 'coord_x', 'coord_y']``. time_col : str The name of the column that represents the time step or year of the forecast (e.g., 'coord_t' or 'forecast_step'). This column's values will become part of the new column names. value_prefixes : list of str A list of prefixes for the value columns that need to be pivoted. The function identifies columns starting with these prefixes. For instance, ``['subsidence', 'GWL']`` would match 'subsidence_q10', 'GWL_q50', etc. static_actuals_cols : list of str, optional A list of columns containing static "actual" or ground truth values for each sample. These values are assumed to be constant for each unique `sample_idx` and are merged back into the wide DataFrame after pivoting. Example: ``['subsidence_actual']``. time_col_is_float_year : bool or 'auto', default 'auto' Controls how the `time_col` values are formatted into new column names. - If ``'auto'``, automatically detects if `time_col` has a float dtype. - If ``True``, treats `time_col` values (e.g., 2018.0) as years and converts them to integer strings ('2018'). - If ``False``, uses the string representation of the value as is. round_time_col : bool, default False If ``True`` and `time_col` is a float type, its values will be rounded to the nearest integer before being used in column names. This is useful for cleaning up float years (e.g., 2018.0001 -> 2018). verbose : int, default 0 Controls the verbosity of logging messages. `0` is silent. Higher values print more details about the process. savefile : str, optional If a file path is provided, the final wide-format DataFrame will be saved as a CSV file to that location. Returns ------- pd.DataFrame A wide-format DataFrame with one row per unique combination of `id_vars`. New columns are created in the format `{prefix}_{time_str}{_suffix}` (e.g., 'subsidence_2018_q10'). See Also -------- pandas.pivot_table : The core function used for reshaping data. pandas.merge : Used to re-join static columns after pivoting. Notes ----- - The combination of columns in `id_vars` and `time_col` must uniquely identify each row in `df_long` for the pivot to succeed without data loss. - If using `static_actuals_cols`, the `id_vars` list must contain 'sample_idx' to correctly merge the static data back. Examples -------- >>> import pandas as pd >>> from geoprior.utils.data_utils import pivot_forecast_dataframe >>> data = { ... 'sample_idx': [0, 0, 1, 1], ... 'coord_t': [2018.0, 2019.0, 2018.0, 2019.0], ... 'coord_x': [0.1, 0.1, 0.5, 0.5], ... 'coord_y': [0.2, 0.2, 0.6, 0.6], ... 'subsidence_q50': [-8, -9, -13, -14], ... 'subsidence_actual': [-8.5, -8.5, -13.2, -13.2], ... 'GWL_q50': [1.2, 1.3, 2.2, 2.3], ... } >>> df_long_example = pd.DataFrame(data) >>> df_wide = pivot_forecast_dataframe( ... data=df_long_example, ... id_vars=['sample_idx', 'coord_x', 'coord_y'], ... time_col='coord_t', ... value_prefixes=['subsidence', 'GWL'], ... static_actuals_cols=['subsidence_actual'], ... verbose=0 ... ) >>> print(df_wide.columns) Index(['sample_idx', 'coord_x', 'coord_y', 'subsidence_actual', 'GWL_2018_q50', 'GWL_2019_q50', 'subsidence_2018_q50', 'subsidence_2019_q50'], dtype='object') """ is_frame(data, df_only=True) df_processed = data.copy() vlog( f"Starting pivot operation. Initial shape: {df_processed.shape}", level=2, verbose=verbose, logger=_logger, ) if not isinstance(df_processed, pd.DataFrame): raise TypeError( "`df_long` must be a pandas DataFrame." ) value_cols_to_pivot = [ col for col in df_processed.columns if any( col.startswith(prefix) for prefix in value_prefixes ) ] required_cols = id_vars + [time_col] + value_cols_to_pivot missing_cols = [ col for col in required_cols if col not in df_processed.columns ] if missing_cols: raise ValueError( f"Missing required columns in DataFrame: {missing_cols}" ) # Determine if the time column should be treated as a float year is_float_year = False if time_col_is_float_year == "auto": if pd.api.types.is_float_dtype( df_processed[time_col] ): is_float_year = True vlog( f"'{time_col}' auto-detected as float year.", level=2, verbose=verbose, logger=_logger, ) elif time_col_is_float_year is True: is_float_year = True # Round the time column before pivoting if requested if round_time_col and is_float_year: vlog( f"Rounding time column '{time_col}'.", level=1, verbose=verbose, logger=_logger, ) df_processed[time_col] = ( df_processed[time_col].round().astype(int) ) # After rounding, it's no longer a float year is_float_year = False elif round_time_col and not is_float_year: vlog( f"Warning: `round_time_col` is True but '{time_col}' " "is not a float year. Skipping rounding.", level=1, verbose=verbose, logger=_logger, ) static_df = None if static_actuals_cols: if "sample_idx" not in df_processed.columns: raise ValueError( "'sample_idx' must be in df_long to handle " "static_actuals_cols." ) vlog( f"Extracting static columns: {static_actuals_cols}", level=2, verbose=verbose, logger=_logger, ) static_df = ( df_processed[["sample_idx"] + static_actuals_cols] .drop_duplicates(subset=["sample_idx"]) .set_index("sample_idx") ) pivot_index = list( set(id_vars) & set(df_processed.columns) ) pivot_columns = [time_col] pivot_values = value_cols_to_pivot vlog( f"Pivoting data with index={pivot_index}, " f"columns='{time_col}'.", level=2, verbose=verbose, logger=_logger, ) try: df_pivoted = df_processed.pivot_table( index=pivot_index, columns=pivot_columns, values=pivot_values, aggfunc="first", ) except Exception as e: raise RuntimeError( "Pandas pivot_table failed. Check if `id_vars` and " f"`time_col` uniquely identify rows. Error: {e}" ) vlog( "Flattening pivoted column names.", level=2, verbose=verbose, logger=_logger, ) new_columns = [] for value_col, time_val in df_pivoted.columns: parts = value_col.split("_", 1) prefix = parts[0] suffix = f"_{parts[1]}" if len(parts) > 1 else "" time_str = str(time_val) if is_float_year: try: time_str = str(int(time_val)) except (ValueError, TypeError): pass new_col_name = f"{prefix}_{time_str}{suffix}" new_columns.append(new_col_name) df_pivoted.columns = new_columns df_pivoted = df_pivoted.reset_index() if static_df is not None: vlog( "Merging static columns back into the wide DataFrame.", level=2, verbose=verbose, logger=_logger, ) df_wide = pd.merge( df_pivoted, static_df, on="sample_idx", how="left" ) cols_order = ( id_vars + static_actuals_cols + [ c for c in df_wide.columns if c not in id_vars + static_actuals_cols ] ) df_wide = df_wide[cols_order] else: df_wide = df_pivoted vlog( f"Pivot complete. Final shape: {df_wide.shape}", level=1, verbose=verbose, logger=_logger, ) if savefile: try: vlog( f"Saving DataFrame to '{savefile}'.", level=1, verbose=verbose, logger=_logger, ) save_dir = os.path.dirname(savefile) if save_dir and not os.path.exists(save_dir): os.makedirs(save_dir, exist_ok=True) df_wide.to_csv(savefile, index=False) vlog( "Save successful.", level=2, verbose=verbose, logger=_logger, ) except Exception as e: logger.error( f"Failed to save file to '{savefile}': {e}" ) return df_wide
[docs] @isdf def get_value_prefixes_in( df: pd.DataFrame, exclude_cols: list[str] | None = None ) -> list[str]: """ Automatically detects the prefixes of value columns from a DataFrame. (This is a dependency for the function below) """ if exclude_cols is None: exclude_cols = { "sample_idx", "forecast_step", "coord_t", "coord_x", "coord_y", } else: exclude_cols = set(exclude_cols) prefixes = set() for col in df.columns: if col in exclude_cols: continue # The prefix is assumed to be the # part before the first underscore prefix = col.split("_")[0] prefixes.add(prefix) return sorted(list(prefixes))
[docs] @isdf def detect_forecast_type( df: pd.DataFrame, value_prefixes: list[str] | None = None, ) -> str: """ Auto-detects whether a DataFrame contains deterministic or quantile forecasts, supporting both long and wide formats. This utility inspects column names to determine the nature of the predictions. - It identifies a 'quantile' forecast if it finds columns containing a ``_qXX`` pattern (e.g., 'subsidence_q10', 'GWL_2022_q50'). - It identifies a 'deterministic' forecast if no quantile columns are found, but columns ending in ``_pred``, `_actual`, or matching a base prefix exist (e.g., 'subsidence_pred', 'subsidence_2022_actual', 'GWL'). Parameters ---------- df : pd.DataFrame The DataFrame to inspect. value_prefixes : list of str, optional A list of value prefixes (e.g., ['subsidence', 'GWL']) to focus the search on. If ``None``, prefixes are inferred from column names. Returns ------- str One of 'quantile', 'deterministic', or 'unknown'. Examples -------- >>> import pandas as pd >>> from geoprior.utils.forecast_utils import detect_forecast_type >>> # Long format quantile >>> df_quant_long = pd.DataFrame(columns=['subsidence_q50', 'GWL_q90']) >>> detect_forecast_type(df_quant_long) 'quantile' >>> # Wide format quantile >>> df_quant_wide = pd.DataFrame(columns=['subsidence_2022_q50']) >>> detect_forecast_type(df_quant_wide) 'quantile' >>> # Deterministic forecast >>> df_determ = pd.DataFrame(columns=['subsidence_pred', 'GWL']) >>> detect_forecast_type(df_determ) 'deterministic' """ if value_prefixes is None: # Auto-detect prefixes if not provided. value_prefixes = get_value_prefixes(df) if not value_prefixes: return "unknown" # Updated regex to handle both long (_q50) and wide (_YYYY_q50) formats # It looks for '_q' followed by one or more digits anywhere in the string. quantile_pattern = re.compile(r"_q\d+") # Regex to find deterministic suffixes like _pred or _actual at the end pred_pattern = re.compile(r"_(pred|actual)$") has_quantile = False has_pred_or_actual = False has_bare_prefix = False for col in df.columns: if quantile_pattern.search(col): # Check if it belongs to one of our prefixes for prefix in value_prefixes: if col.startswith(prefix): has_quantile = True break if has_quantile: break if has_quantile: return "quantile" # If no quantiles, check for deterministic patterns for col in df.columns: for prefix in value_prefixes: # Check for exact prefix match (e.g., 'subsidence') if col == prefix: has_bare_prefix = True # Check for patterns like 'subsidence_pred', 'GWL_2022_actual' elif col.startswith( prefix ) and pred_pattern.search(col): has_pred_or_actual = True if has_pred_or_actual or has_bare_prefix: break if has_pred_or_actual or has_bare_prefix: return "deterministic" # If neither format is detected, return 'unknown'. return "unknown"