# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# https://lkouadio.com
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
r"""Sequence-building helpers for temporal model inputs."""
from __future__ import annotations
import logging
import os
import warnings
from collections.abc import Callable
from typing import (
Any,
Literal,
)
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view
from sklearn.preprocessing import MinMaxScaler
from geoprior._optdeps import HAS_TQDM, with_progress
from ..core.checks import (
check_datetime,
exist_features,
)
from ..decorators import isdf
from .generic_utils import vlog
__all__ = [
"check_sequence_feasibility",
"get_sequence_counts",
"generate_pinn_sequences",
"generate_ts_sequences",
"build_future_sequences_npz",
]
[docs]
@isdf
def build_future_sequences_npz(
df_scaled: pd.DataFrame,
*,
time_col: str,
time_col_num: str | None,
lon_col: str,
lat_col: str,
time_steps: int,
train_end_time: object | None = None,
forecast_start_time: object | None = None,
forecast_horizon: int | None = None,
subs_col: str | None = None,
gwl_col: str | None = None,
h_field_col: str | None = None,
static_features: list[str] | None = None,
dynamic_features: list[str] | None = None,
future_features: list[str] | None = None,
group_id_cols: list[str] | None = None,
mode: str | None = None,
model_name: str | None = None,
artifacts_dir: str | None = None,
prefix: str = "future",
future_mode: str = "auto",
normalize_coords: bool = False,
coord_scaler: Any | None = None,
verbose: int = 1,
logger=None,
stop_check: Callable[[], bool] = None,
progress_hook: Callable[[float], None] | None = None,
**kws,
) -> dict:
"""
Build history–future sequences and save them as compressed NPZ files.
This helper constructs, for each spatial group, a sliding window of
`time_steps` "history" points followed by a multi–step forecast
horizon and exports the resulting NumPy arrays to disk. It is
time-agnostic: the `time_col` can be numeric (e.g. year, index),
year-like floats, datetimes, or strings, as long as equality on
that column is meaningful.
If `train_end_time`, `forecast_start_time`, or `forecast_horizon`
are not provided, they are inferred from the sorted unique values
in ``df_scaled[time_col]``:
* ``train_end_time``: by default the second-to-last unique time,
leaving at least one future step.
* ``forecast_start_time``: by default the first time strictly
after ``train_end_time``.
* ``forecast_horizon``: by default one time step ahead, clipped to
the number of available future points.
For each valid group, the function builds history dynamic features of
shape ``(time_steps, n_dynamic)``, future features of shape
``(time_steps + H, n_future)`` when ``mode`` starts with ``"tft"``
or ``(H, n_future)`` otherwise, one static feature vector of shape
``(n_static,)``, coordinates over the horizon of shape ``(H, 3)``
with columns ``[time_num, lon, lat]``, an ``H_field`` array of shape
``(H, 1)``, and optional subsidence and groundwater targets of shape
``(H, 1)`` each.
All per-group arrays are stacked along a new batch dimension and
written as two NPZ files:
* ``<prefix>_inputs.npz``: coordinates, dynamic, static, future
features and H field.
* ``<prefix>_targets.npz``: subsidence and groundwater targets.
Parameters
----------
df_scaled : pandas.DataFrame
Pre-processed (typically scaled) dataframe containing all
required columns: time, spatial coordinates, static/dynamic/
future features and optional targets.
time_col : str
Name of the column encoding the temporal index (e.g.
``"year"``, ``"date"``, ``"t_index"``). May be numeric,
datetime, or string.
time_col_num : str or None
Optional numeric time column used as a tie-breaker when
multiple rows share the same ``time_col`` value. If provided
and present in a group, the last row sorted by this column is
selected for that time.
lon_col : str
Name of the longitude (or x-coordinate) column.
lat_col : str
Name of the latitude (or y-coordinate) column.
time_steps : int
Length of the history window (number of time steps in the
past). Must be strictly positive.
train_end_time : object, optional
Effective end of the training period. If ``None``, it is
inferred as the second-to-last unique value in
``df_scaled[time_col]`` (after sorting).
forecast_start_time : object, optional
First time step of the forecast horizon. If ``None``, it is
inferred as the first unique time strictly greater than
``train_end_time``.
forecast_horizon : int, optional
Number of future time steps to include. If ``None``, a
default horizon of ``1`` is used and clipped to the maximum
number of available future time points.
subs_col : str, optional
Name of the subsidence target column. If ``None`` or missing
from a group, subsidence targets are filled with ``NaN``.
gwl_col : str, optional
Name of the groundwater-level target column. If ``None`` or
missing from a group, groundwater targets are filled with
``NaN``.
h_field_col : str, optional
Name of the hydraulic-head field column used as an additional
horizon-level input (``H_field``). If ``None`` or missing,
a zero field is used.
static_features : list of str, optional
Names of static (time-invariant) feature columns. Any names
not present in the dataframe are silently ignored.
dynamic_features : list of str, optional
Names of dynamic (history) feature columns used to build the
``(time_steps, n_dynamic)`` sequence. Missing columns are
ignored.
future_features : list of str, optional
Names of future covariate columns used to build the
history+future or future-only sequence, depending on
``mode``. Missing columns are ignored.
group_id_cols : list of str, optional
Columns used to define spatial (or logical) groups, typically
something like ``["lon", "lat"]`` or a station identifier.
If ``None`` or empty, the entire dataframe is treated as a
single global group.
mode : str, optional
Controls how future features are constructed. If the
lower-cased value starts with ``"tft"`` (e.g. ``"tft_like"``),
future features are built on top of both history and future
rows. Otherwise, only the forecast horizon rows are used.
model_name : str, optional
Optional model identifier used only in logging messages.
artifacts_dir : str, optional
Directory where NPZ files are written. If ``None`` or empty,
the current working directory is used.
prefix : str, default="future"
Prefix for the output NPZ filenames:
``"<prefix>_inputs.npz"`` and ``"<prefix>_targets.npz"``.
future_mode : {'auto', 'pure-inference', 'pure-data-driven'}, \
default 'auto'
Strategy used to construct the future (forecast) portion of
the sequences.
- ``'pure-data-driven'``:
Use only time points that actually exist in ``df_scaled``
strictly after the history window. All future time indices
must be present in the data; otherwise a ``ValueError`` is
raised. This corresponds to the original, strictly
data-driven behaviour.
- ``'pure-inference'``:
Always synthesize future time points from the last history
time, using the median positive time step (or ``1.0`` as a
fallback). Future inputs are built by re-using the last
available history row (for ``future_features``, ``H_field``,
etc.), and future targets (e.g. subsidence, GWL) are filled
with ``NaN`` since the true future is unknown. This mode
does not require any rows beyond ``train_end_time``.
- ``'auto'``:
Try data-driven mode first. If there are enough actual
future time points after ``train_end_time`` to cover the
requested ``forecast_horizon``, behave like
``'pure-data-driven'``. If not, automatically fall back to
the synthetic ``'pure-inference'`` behaviour described
above and emit an informational log message via ``vlog``.
verbose : int, default=1
Verbosity level forwarded to :func:`geoprior.utils.vlog`. A
value ``>= 3`` provides detailed progress logs (temporal
inference, per-group status, dropped groups, etc.).
logger : logging.Logger or callable, optional
Optional logger or logging function used by
:func:`geoprior.utils.vlog`. If ``None``, messages are
printed to standard output.
**kws
Reserved for future extensions. Currently ignored.
Returns
-------
dict
A small dictionary with the absolute paths to the written NPZ
files:
``{"future_inputs_npz": <path>, "future_targets_npz": <path>}``.
Raises
------
ValueError
If there are not enough history points before
``train_end_time`` to satisfy ``time_steps``, if no future
points are available after ``forecast_start_time``, or if all
groups are dropped due to incomplete history/horizon windows.
Notes
-----
Groups that do not contain all required history and future times
are silently dropped, but the number of dropped groups is reported
via :func:`geoprior.utils.vlog` when ``verbose > 0``.
Examples
--------
>>> from geoprior.nn.pinn.sequences import (
... build_future_sequences_npz,
... )
>>> result = build_future_sequences_npz(
... df_scaled=df_scaled,
... time_col="year",
... time_col_num="t_index",
... lon_col="lon",
... lat_col="lat",
... time_steps=5,
... # Let the function infer times/horizon:
... train_end_time=None,
... forecast_start_time=None,
... forecast_horizon=None,
... subs_col="subsidence",
... gwl_col="gwl",
... h_field_col="H_field",
... static_features=["lithology_class"],
... dynamic_features=["rainfall_mm", "GWL_depth_bgs_z"],
... future_features=["normalized_urban_load_proxy"],
... group_id_cols=["lon", "lat"],
... mode="tft_like",
... model_name="GeoPriorSubsNet",
... artifacts_dir="results/zhongshan/future_npz",
... prefix="zhongshan_future",
... verbose=2,
... )
>>> result["future_inputs_npz"]
'results/zhongshan/future_npz/zhongshan_future_inputs.npz'
>>> result["future_targets_npz"]
'results/zhongshan/future_npz/zhongshan_future_targets.npz'
"""
def _p(frac: float) -> None:
if progress_hook is not None:
# clamp, avoid crashing
try:
f = max(0.0, min(1.0, float(frac)))
progress_hook(f)
except Exception:
pass
_p(0.0) # start
# ------------------------------------------------------------------
# Small helpers
# ------------------------------------------------------------------
def _save_npz(path: str, arrays: dict) -> str:
np.savez_compressed(path, **arrays)
return path
def _to_time_index(series: pd.Series) -> np.ndarray:
"""Convert a time column to a numeric index (year-like)."""
if pd.api.types.is_datetime64_any_dtype(series):
return series.dt.year.to_numpy(dtype=float)
try:
return pd.to_numeric(
series, errors="coerce"
).to_numpy(dtype=float)
except Exception:
vals, _ = pd.factorize(series)
return vals.astype(float)
future_mode_norm = (future_mode or "auto").lower()
if future_mode_norm not in (
"auto",
"pure-inference",
"pure-data-driven",
):
raise ValueError(
"build_future_sequences_npz: 'future_mode' must be one of "
"{'auto', 'pure-inference', 'pure-data-driven'}; "
f"got {future_mode!r}."
)
if (
artifacts_dir is None
or not str(artifacts_dir).strip()
):
artifacts_dir = os.getcwd()
normalize_coords = bool(normalize_coords)
if normalize_coords:
if coord_scaler is None or not hasattr(
coord_scaler, "transform"
):
raise ValueError(
"build_future_sequences_npz: normalize_coords=True requires a "
"fitted `coord_scaler` (same one used in Stage-1)."
)
# ------------------------------------------------------------------
# Step 1: infer temporal configuration (history + future windows)
# ------------------------------------------------------------------
exist_features(
df_scaled,
features=time_col,
message="Time col{time_col} column is missing.",
)
vlog(
"Validating time-series dataset...",
level=1,
verbose=verbose,
)
check_datetime(
df_scaled,
dt_cols=time_col,
ops="check_only",
consider_dt_as="numeric",
accept_dt=True,
allow_int=True,
)
t_series = df_scaled[time_col]
t_idx_all = _to_time_index(t_series)
mask_finite = np.isfinite(t_idx_all)
if not mask_finite.any():
raise ValueError(
"build_future_sequences_npz: time column contains no finite "
"values after conversion."
)
unique_times = np.unique(t_idx_all[mask_finite])
unique_times.sort()
T = int(time_steps)
if T <= 0:
raise ValueError(
"build_future_sequences_npz: 'time_steps' must be > 0; "
f"got {time_steps!r}."
)
# Resolve train_end_time
if train_end_time is None:
train_end_idx = float(unique_times.max())
else:
try:
train_end_idx = float(train_end_time)
except Exception as e:
raise ValueError(
"build_future_sequences_npz: could not convert "
f"train_end_time={train_end_time!r} to numeric index."
) from e
# History times: last T distinct times <= train_end_idx
hist_candidates = unique_times[
unique_times <= train_end_idx
]
if hist_candidates.size < T:
raise ValueError(
"build_future_sequences_npz: not enough past time points "
f"<= train_end_time to build history of length {T}. "
f"Available={hist_candidates.size}."
)
hist_times = hist_candidates[-T:]
last_hist = hist_times[-1]
# Step (for synthetic future): median positive diff or 1.0
if hist_times.size >= 2:
diffs = np.diff(hist_times)
step = (
float(np.median(diffs[diffs > 0]))
if np.any(diffs > 0)
else 1.0
)
else:
step = 1.0
# All future times *in data* (strictly after last_hist)
data_future_all = unique_times[unique_times > last_hist]
# Horizon H
if forecast_horizon is None:
H = (
int(data_future_all.size)
if data_future_all.size > 0
else 1
)
else:
H = int(forecast_horizon)
if H <= 0:
raise ValueError(
"build_future_sequences_npz: 'forecast_horizon' must be > 0; "
f"got {forecast_horizon!r}."
)
# Resolve forecast_start_time index
if forecast_start_time is None:
if data_future_all.size > 0:
f_start_idx = float(data_future_all[0])
else:
f_start_idx = float(last_hist + step)
else:
try:
f_start_idx = float(forecast_start_time)
except Exception as e:
raise ValueError(
"build_future_sequences_npz: could not convert "
f"forecast_start_time={forecast_start_time!r} to numeric index."
) from e
# Candidate data-driven future times (>= f_start_idx, after last_hist)
data_future_needed = data_future_all[
data_future_all >= f_start_idx
][:H]
has_enough_future_data = data_future_needed.size == H
_p(0.1)
# Decide effective mode + future times
using_synthetic_future = False
if future_mode_norm == "pure-data-driven":
if not has_enough_future_data:
raise ValueError(
"build_future_sequences_npz: pure-data-driven mode "
f"requested but only {data_future_needed.size} future "
f"time point(s) found starting from {f_start_idx}; "
f"need {H}."
)
fut_times = data_future_needed
eff_future_mode = "pure-data-driven"
elif future_mode_norm == "pure-inference":
using_synthetic_future = True
eff_future_mode = "pure-inference"
start = max(float(last_hist), float(f_start_idx))
fut_times = np.array(
[start + step * (i + 1) for i in range(H)],
dtype=float,
)
else: # auto
if has_enough_future_data:
fut_times = data_future_needed
eff_future_mode = "pure-data-driven"
else:
using_synthetic_future = True
eff_future_mode = "pure-inference"
start = max(float(last_hist), float(f_start_idx))
fut_times = np.array(
[start + step * (i + 1) for i in range(H)],
dtype=float,
)
# Window actually needed in df_scaled
if using_synthetic_future:
all_times_needed = hist_times
else:
all_times_needed = np.concatenate(
[hist_times, fut_times]
)
if verbose:
vlog(
"[Future] Step 1/3: temporal config resolved.\n"
f" hist_times = {hist_times.tolist()}\n"
f" fut_times = {fut_times.tolist()}\n"
f" history length = {T}\n"
f" horizon length = {H}\n"
f" effective mode = {eff_future_mode}\n"
f" train_end_time = {train_end_time!r}\n"
f" forecast_start = {forecast_start_time!r}",
verbose=verbose,
level=1,
logger=logger,
)
if using_synthetic_future:
vlog(
"[Future] No usable future time points after train_end_time; "
"falling back to synthetic 'pure-inference' horizon "
"(targets will be NaN).",
verbose=verbose,
level=2,
logger=logger,
)
# ------------------------------------------------------------------
# Step 2: subset df_scaled to the required window
# ------------------------------------------------------------------
mask_window = np.isin(t_idx_all, all_times_needed)
df_win = df_scaled.loc[mask_window].copy()
if df_win.empty:
raise ValueError(
"build_future_sequences_npz: no rows in df_scaled for the "
f"required time window {all_times_needed.tolist()}."
)
# Normalize lists
static_features = (
list(static_features)
if static_features is not None
else []
)
dynamic_features = (
list(dynamic_features)
if dynamic_features is not None
else []
)
future_features = (
list(future_features)
if future_features is not None
else []
)
group_id_cols = (
list(group_id_cols)
if group_id_cols is not None
else []
)
static_cols = [
c for c in static_features if c in df_win.columns
]
dyn_cols = [
c for c in dynamic_features if c in df_win.columns
]
fut_cols = [
c for c in future_features if c in df_win.columns
]
mode_norm = (mode or "").lower()
is_tft_like = mode_norm.startswith("tft")
if verbose:
vlog(
"[Future] Step 2/3: feature and grouping config.\n"
f" static_cols = {static_cols}\n"
f" dyn_cols = {dyn_cols}\n"
f" fut_cols = {fut_cols}\n"
f" group_id_cols = {group_id_cols or ['<global>']}\n"
f" mode = {mode_norm or 'None'}",
verbose=verbose,
level=1,
logger=logger,
)
# ------------------------------------------------------------------
# Step 3: per-group construction
# ------------------------------------------------------------------
def _rows_for_times(
required: np.ndarray, g_df: pd.DataFrame
) -> list | None:
"""Pick one row per required time from a group."""
g_time_idx = _to_time_index(g_df[time_col])
order = np.argsort(g_time_idx)
g_sorted = g_df.iloc[order].reset_index(drop=True)
g_time_sorted = g_time_idx[order]
rows: list[pd.Series] = []
for t_req in required:
m = g_time_sorted == t_req
if not np.any(m):
return None
sub = g_sorted.loc[m]
if (
time_col_num is not None
and time_col_num in sub.columns
):
sub = sub.sort_values(time_col_num)
rows.append(sub.iloc[-1])
return rows
def _build_synthetic_future_rows(
hist_rows: list[pd.Series],
fut_times_arr: np.ndarray,
) -> list[pd.Series]:
"""Replicate last history row and overwrite time columns."""
if not hist_rows:
return []
last_row = hist_rows[-1]
out = []
for t_val in fut_times_arr:
row = last_row.copy()
row[time_col] = t_val
if (
time_col_num is not None
and time_col_num in row.index
):
try:
row[time_col_num] = float(t_val)
except Exception:
# keep original numeric coord
pass
out.append(row)
return out
coords_list, dyn_list, fut_list = [], [], []
static_list, H_list = [], []
subs_target_list, gwl_target_list = [], []
if group_id_cols:
grouped = df_win.groupby(group_id_cols)
n_groups = grouped.ngroups
group_iter = grouped
else:
n_groups = 1
group_iter = [(None, df_win)]
# Progress logging frequency for non-tqdm fallback
log_every = max(1, n_groups // 10)
# Small adapter so tqdm output goes into your logger instead of terminal
def _log_progress_line(msg: str) -> None:
# Use vlog so it respects verbose + user logger
vlog(
msg,
verbose=verbose,
level=2,
logger=logger,
)
# Optionally wrap with tqdm, but redirect its ASCII bar to the log
use_tqdm = HAS_TQDM and verbose >= 1 and n_groups > 1
if use_tqdm:
iter_groups = with_progress(
group_iter,
total=n_groups,
desc=f"[Future] groups ({model_name or ''})".strip(),
leave=False,
ascii=True,
log_fn=_log_progress_line,
mininterval=1.0, # optional: don't spam too often
)
else:
iter_groups = group_iter
dropped_groups = 0
for gi, (gid, g) in enumerate(iter_groups, start=1):
# after finishing this group:
_p(0.1 + 0.8 * (gi + 1) / n_groups)
if stop_check and stop_check():
raise InterruptedError(
"Sequence generation aborted."
)
# If tqdm is present, keep vlog quieter; if not, use your previous pattern
if not use_tqdm:
if verbose and (
gi == 1
or gi % log_every == 0
or gi == n_groups
):
vlog(
f"[Future] Processing group {gi}/{n_groups} gid={gid!r}",
verbose=verbose,
level=2,
logger=logger,
)
else:
vlog(
f"[Future] Proc.subset group {gi}/{n_groups} gid={gid!r}",
verbose=verbose,
level=6,
logger=logger,
)
# 1) History rows (must exist in data)
hist_rows = _rows_for_times(hist_times, g)
if hist_rows is None:
dropped_groups += 1
if verbose >= 2:
vlog(
f"[Future] Dropping group {gid!r}: incomplete history "
f"for times={hist_times.tolist()}.",
verbose=verbose,
level=2,
logger=logger,
)
continue
# 2) Future rows: data-driven or synthetic
if using_synthetic_future:
fut_rows = _build_synthetic_future_rows(
hist_rows, fut_times
)
else:
fut_rows = _rows_for_times(fut_times, g)
if fut_rows is None:
dropped_groups += 1
if verbose >= 2:
vlog(
f"[Future] Dropping group {gid!r}: incomplete future "
f"for times={fut_times.tolist()} in data-driven mode.",
verbose=verbose,
level=2,
logger=logger,
)
continue
# Dynamic features: history only
if dyn_cols:
dyn_seq = np.stack(
[
row[dyn_cols].to_numpy(dtype=np.float32)
for row in hist_rows
],
axis=0,
)
else:
dyn_seq = np.zeros(
(len(hist_rows), 0), dtype=np.float32
)
# Future features
if fut_cols:
if is_tft_like:
rows_for_future = hist_rows + fut_rows
else:
rows_for_future = fut_rows
fut_seq = np.stack(
[
row[fut_cols].to_numpy(dtype=np.float32)
for row in rows_for_future
],
axis=0,
)
else:
T_fut = (
len(hist_rows) + len(fut_rows)
if is_tft_like
else len(fut_rows)
)
fut_seq = np.zeros((T_fut, 0), dtype=np.float32)
# Coords for horizon (t, lon, lat)
t_vals = []
for row in fut_rows:
if (
time_col_num is not None
and time_col_num in row.index
):
t_vals.append(row[time_col_num])
else:
t_vals.append(row[time_col])
t_vals = np.array(t_vals, dtype=np.float32)
lon_vals = np.array(
[row[lon_col] for row in fut_rows],
dtype=np.float32,
)
lat_vals = np.array(
[row[lat_col] for row in fut_rows],
dtype=np.float32,
)
coords = np.stack(
[t_vals, lon_vals, lat_vals], axis=-1
)
# H_field over horizon
if (
h_field_col is not None
and h_field_col in g.columns
):
h_vals = np.array(
[row[h_field_col] for row in fut_rows],
dtype=np.float32,
)
else:
h_vals = np.zeros(len(fut_rows), dtype=np.float32)
H_seq = h_vals.reshape(-1, 1)
# Static features (one vector per group)
if static_cols:
static_vec = hist_rows[0][static_cols].to_numpy(
dtype=np.float32
)
else:
static_vec = np.zeros(0, dtype=np.float32)
# Targets over horizon
if using_synthetic_future:
subs_vals = np.full(
len(fut_rows), np.nan, dtype=np.float32
)
gwl_vals = np.full(
len(fut_rows), np.nan, dtype=np.float32
)
else:
if subs_col is not None and subs_col in g.columns:
subs_vals = np.array(
[row[subs_col] for row in fut_rows],
dtype=np.float32,
)
else:
subs_vals = np.full(
len(fut_rows), np.nan, dtype=np.float32
)
if gwl_col is not None and gwl_col in g.columns:
gwl_vals = np.array(
[row[gwl_col] for row in fut_rows],
dtype=np.float32,
)
else:
gwl_vals = np.full(
len(fut_rows), np.nan, dtype=np.float32
)
subs_seq = subs_vals.reshape(-1, 1)
gwl_seq = gwl_vals.reshape(-1, 1)
coords_list.append(coords)
dyn_list.append(dyn_seq)
fut_list.append(fut_seq)
static_list.append(static_vec)
H_list.append(H_seq)
subs_target_list.append(subs_seq)
gwl_target_list.append(gwl_seq)
if not coords_list:
raise ValueError(
"build_future_sequences_npz: no valid groups with complete "
"history window (and future, if required)."
)
if dropped_groups and verbose:
vlog(
f"[Future] Dropped {dropped_groups} group(s) due to incomplete "
"history/future windows.",
verbose=verbose,
level=1,
logger=logger,
)
# ------------------------------------------------------------------
# Stack and save
# ------------------------------------------------------------------
coords_arr = np.stack(coords_list, axis=0)
dyn_arr = np.stack(dyn_list, axis=0)
fut_arr = np.stack(fut_list, axis=0)
static_arr = np.stack(static_list, axis=0)
H_arr = np.stack(H_list, axis=0)
subs_targets_arr = np.stack(subs_target_list, axis=0)
gwl_targets_arr = np.stack(gwl_target_list, axis=0)
# --------------------------------------------------------------
# Normalize coords exactly like Stage-1 (if requested)
# coords_arr shape: (N, H, 3) with order [t, x, y]
# --------------------------------------------------------------
if normalize_coords:
coords_flat = coords_arr.reshape(-1, 3).astype(
np.float32, copy=False
)
if not np.isfinite(coords_flat).all():
raise ValueError(
"build_future_sequences_npz: coords contain non-finite values; "
"cannot apply coord_scaler.transform()."
)
coords_flat = coord_scaler.transform(coords_flat)
coords_arr = coords_flat.reshape(
coords_arr.shape
).astype(np.float32, copy=False)
else:
coords_arr = coords_arr.astype(np.float32, copy=False)
future_inputs_np = {
"coords": coords_arr,
"dynamic_features": dyn_arr,
"static_features": static_arr,
"future_features": fut_arr,
"H_field": H_arr,
}
future_targets_np = {
"subsidence": subs_targets_arr,
"gwl": gwl_targets_arr,
}
os.makedirs(artifacts_dir, exist_ok=True)
future_inputs_npz = os.path.join(
artifacts_dir, f"{prefix}_inputs.npz"
)
future_targets_npz = os.path.join(
artifacts_dir, f"{prefix}_targets.npz"
)
_save_npz(future_inputs_npz, future_inputs_np)
_save_npz(future_targets_npz, future_targets_np)
if verbose:
vlog(
"[Future] Step 3/3: NPZs saved.\n"
f" inputs : {future_inputs_npz}\n"
f" targets: {future_targets_npz}\n"
f" mode : {eff_future_mode}"
+ (
" (synthetic future targets=NaN)"
if using_synthetic_future
else ""
),
verbose=verbose,
level=1,
logger=logger,
)
# After saving NPZs:
_p(1.0)
return {
f"{prefix}_inputs_npz": future_inputs_npz,
f"{prefix}_targets_npz": future_targets_npz,
}
[docs]
@isdf
def check_sequence_feasibility(
df: pd.DataFrame,
*,
time_col: str,
group_id_cols: list[str] | None = None,
time_steps: int = 12,
forecast_horizon: int = 3,
engine: Literal[
"vectorized", "native", "pyarrow"
] = "vectorized",
mode: str | None = None,
logger: Callable[[str], None] = print,
verbose: int = 0,
error: Literal["raise", "warn", "ignore"] = "warn",
) -> tuple[bool, dict[str | tuple, int]]:
"""
Quick pre-flight feasibility check for sliding-window sequence
generation
Checks whether the input table is *long enough*—per group—to yield at
least one `(look-back + horizon)` sliding window, **without** allocating
large NumPy tensors. It is typically called immediately before
:func:`prepare_pinn_data_sequences` or similar generators to “fail
fast’’ on data shortages.
Parameters
----------
df : pandas.DataFrame
Tidy time-series table in **long** format. Every row represents one
observation timestamp (and optionally one entity when
*group_id_cols* is given). The function never mutates *df*.
time_col : str
Column that defines temporal order inside each trajectory. Must be
sortable; no other assumptions (numeric, datetime, …) are made.
group_id_cols : list of str or None, default None
Column names that jointly identify independent trajectories
(e.g. ``["well_id"]`` or ``["site", "layer_id"]``). When *None* the
whole DataFrame is treated as a single group.
time_steps : int, default 12
Look-back window :math:`T_\text{past}` consumed by the encoder.
forecast_horizon : int, default 3
Prediction horizon :math:`H` produced by the decoder.
engine : {'vectorized', 'loop', 'pyarrow'}, default 'vectorized'
* **'vectorized'** – fastest; single :meth:`DataFrame.groupby.size`
call (C-level) plus NumPy math.
* **'native'** – reproduces the original Python loop for
debuggability.
* **'pyarrow'** – forces pandas’ Arrow backend, then runs the same
vectorised logic; ~20 % faster on very wide frames when
*pyarrow* ≥ 14 is installed.
mode : {'pihal_like', 'tft_like'} or None, optional
Present only for API symmetry. **Ignored** – feasibility depends
*solely* on ``time_steps + forecast_horizon``.
logger : callable, default :func:`print`
Sink for human-readable log messages. Must accept a single `str`.
verbose : int, default 0
Verbosity level:
0 → silent, 1 → summary lines, 2 → per-group detail.
error : {'raise', 'warn', 'ignore'}, default 'warn'
Action when *no* group is long enough.
* ``'raise'`` – raise :class:`SequenceGeneratorError`.
* ``'warn'`` – emit :class:`UserWarning`, return ``False``.
* ``'ignore'`` – stay silent, return ``False``.
Returns
-------
feasible : bool
``True`` iff *at least one* sequence can be produced,
otherwise ``False``.
counts : dict
Mapping **group key → # sequences**.
The key is a tuple of the group values—or *None* when
*group_id_cols* is *None*.
Raises
------
SequenceGeneratorError
Raised only when ``error='raise'`` *and* all groups fail the length
check.
Notes
-----
A group passes the check iff
.. math::
\\text{len(group)} \\;\\ge\\; T_\\text{past} + H
No validation of time-gaps, duplicates, or NaNs is performed; those are
deferred to the full preparation routine.
The **Arrow backend** (``engine='pyarrow'``) can accelerate very wide
frames because each column is represented as a contiguous Arrow array
with cheap zero-copy slicing.
Examples
--------
* Minimal usage
>>> from geoprior.utils.sequence_utils import check_sequence_feasibility
>>> ok, counts = check_sequence_feasibility(
... df,
... time_col="date",
... group_id_cols=["site"],
... time_steps=6,
... forecast_horizon=3,
... )
>>> ok
True
>>> counts # doctest: +ELLIPSIS
{'A': 9, 'B': 9}
* Fail-fast behaviour
>>> check_sequence_feasibility(
... df_small,
... time_col="t",
... time_steps=10,
... forecast_horizon=5,
... error="raise",
... )
Traceback (most recent call last):
...
SequenceGeneratorError: No group is long enough ...
* Switching engines
>>> _ , _ = check_sequence_feasibility(
... df,
... time_col="ts",
... group_id_cols=None,
... engine="pyarrow", # requires pandas 2.1+, pyarrow installed
... verbose=1,
... )
✅ Feasible: 1 234 567 sequences possible.
References
----------
* McKinney, W. *pandas 2.0 User Guide*, sec. “GroupBy: split-apply-combine’’.
* Arrow Project. (2025). *Arrow Columnar Memory Format v2*.
"""
# --- tiny inline logger
def _v(msg: str, *, lvl: int = 1) -> None:
vlog(msg, verbose=verbose, level=lvl, logger=logger)
min_len = time_steps + forecast_horizon
_v(f"Required length per group: {min_len}", lvl=2)
# deterministic ordering
# or just df; sorting not needed for counts
# sort_cols = (group_id_cols or []) + [time_col]
# df_sorted = df.sort_values(sort_cols)
# inside your feasibility function
total_sequences, counts, sizes = get_sequence_counts(
df,
group_id_cols=group_id_cols,
min_len=min_len,
engine=engine,
verbose=verbose,
logger=logger,
)
if total_sequences == 0:
longest = int(sizes.max()) if not sizes.empty else 0
msg = (
"No group is long enough to create any sequence.\n"
f"Each trajectory needs >= {min_len} consecutive records "
f"(time_steps={time_steps}, horizon={forecast_horizon}), "
f"but the longest has only {longest}.\n"
"-> Reduce `time_steps` / `forecast_horizon`, "
"or supply more data."
)
# _v("❌ " + msg.splitlines()[0], lvl=1)
_v("" + msg.splitlines()[0], lvl=1)
if error == "raise":
raise SequenceGeneratorError(msg)
if error == "warn":
warnings.warn(msg, UserWarning, stacklevel=2)
return False, counts
# _v(rf"✅ Feasible: {total_sequences} sequences possible.", lvl=1)
_v(
f" Feasible: {total_sequences} sequences possible.",
lvl=1,
)
return True, counts
def _sequence_counts_fast(
df: pd.DataFrame,
group_id_cols: list[str] | None,
min_len: int,
) -> tuple[int, dict[str | tuple, int], pd.Series]:
"""Vectorised: one C call → group sizes, then NumPy math."""
if group_id_cols:
sizes = df.groupby(group_id_cols, sort=False).size()
else:
sizes = pd.Series([len(df)], index=[None])
n_seq_series = np.maximum(sizes - min_len + 1, 0)
return (
int(n_seq_series.sum()),
n_seq_series.to_dict(),
sizes,
)
def _sequence_counts_loop(
df: pd.DataFrame,
group_id_cols: list[str] | None,
min_len: int,
) -> tuple[int, dict[str | tuple, int], pd.Series]:
"""Original Python loop – slower but easy to single-step."""
if group_id_cols:
iterator = df.groupby(group_id_cols)
else:
iterator = [(None, df)]
counts: dict[str | tuple, int] = {}
sizes_dict: dict[str | tuple, int] = {}
total_sequences = 0
for g_key, g_df in iterator:
n_pts = len(g_df)
n_seq = max(n_pts - min_len + 1, 0)
counts[g_key] = n_seq
sizes_dict[g_key] = n_pts
total_sequences += n_seq
sizes = pd.Series(sizes_dict)
return total_sequences, counts, sizes
[docs]
@isdf
def get_sequence_counts(
df: pd.DataFrame,
*,
group_id_cols: list[str] | None,
min_len: int,
engine: Literal[
"vectorized", "native", "pyarrow"
] = "vectorized",
verbose: int = 0,
logger=print,
) -> tuple[int, dict[str | tuple, int], pd.Series]:
"""
Return the **total** number of feasible sliding-window sequences and
a mapping *group → count* using the requested execution *engine*.
Parameters
----------
engine : {'vectorized', 'native', 'pyarrow'}, default 'vectorized'
Execution backend.
* **'vectorized'** – fast C-level :meth:`DataFrame.groupby.size`
(recommended).
* **'native'** – original Python loop (easier to debug, slower).
* **'pyarrow'** – forces pandas’ Arrow backend *if available*,
then runs the vectorised path. Falls back silently to
``'vectorized'`` when *pyarrow* is not installed.
"""
def _v(msg: str, lvl: int = 1) -> None:
vlog(msg, verbose=verbose, level=lvl, logger=logger)
if engine == "pyarrow":
try:
import pyarrow # noqa: F401
except ImportError: # ⇢ graceful fallback
# _v("⚠ pyarrow not installed — reverting to 'vectorized'.", lvl=1)
_v(
" pyarrow not installed — reverting to 'vectorized'.",
lvl=1,
)
engine = "vectorized"
if engine == "pyarrow":
old_backend = pd.options.mode.dtype_backend
pd.options.mode.dtype_backend = "pyarrow"
try:
total, counts, sizes = _sequence_counts_fast(
df, group_id_cols, min_len
)
finally:
pd.options.mode.dtype_backend = old_backend
elif engine == "vectorized":
total, counts, sizes = _sequence_counts_fast(
df, group_id_cols, min_len
)
elif engine == "native":
total, counts, sizes = _sequence_counts_loop(
df, group_id_cols, min_len
)
else: # pragma: no cover
raise ValueError(
f"Unknown engine='{engine}'. "
"Choose 'vectorized', 'loop', or 'pyarrow'."
)
if verbose >= 2:
for g_key, n_seq in counts.items():
size_str = sizes[g_key]
_v(
f"Group {g_key if g_key is not None else '<whole>'}: "
f"{size_str} pts -> {n_seq} seq.",
lvl=2,
)
return total, counts, sizes
[docs]
@isdf
def generate_pinn_sequences(
df: pd.DataFrame,
time_col: str,
subsidence_col: str,
gwl_col: str,
dynamic_cols: list[str],
static_cols: list[str] | None = None,
future_cols: list[str] | None = None,
spatial_cols: tuple[str, str] | None = None,
group_id_cols: list[str] | None = None,
time_steps: int = 12,
forecast_horizon: int = 3,
output_subsidence_dim: int = 1,
output_gwl_dim: int = 1,
mode: str = "pihal_like",
normalize_coords: bool = True,
cols_to_scale: list[str] | str | None = None,
method: str = "rolling",
stride: int = 1,
random_samples: int | None = None,
expand_step: int = 1,
n_bootstrap: int = 0,
progress_hook: Callable[[float], None] | None = None,
stop_check: Callable[[], bool] | None = None,
verbose: int = 1,
_logger: logging.Logger
| Callable[[str], None]
| None = None,
**kwargs,
) -> tuple[
dict[str, np.ndarray],
dict[str, np.ndarray],
MinMaxScaler | None,
]:
"""
Generate input/target arrays for PINN models using various sampling
methods (rolling, strided, random, expanding, bootstrap).
Parameters
----------
df : pd.DataFrame
Full time-series data.
time_col : str
Name of the time coordinate column.
subsidence_col : str
Name of the subsidence target column.
gwl_col : str
Name of the groundwater level target column.
dynamic_cols : list[str]
Names of past-covariate columns.
static_cols : list[str], optional
Names of static feature columns.
future_cols : list[str], optional
Names of known-future feature columns.
spatial_cols : (str, str), optional
Tuple of (lon_col, lat_col) for spatial coords.
group_id_cols : list[str], optional
Column(s) identifying independent time-series groups.
time_steps : int, default 12
Look-back window length T.
forecast_horizon : int, default 3
Prediction horizon H.
output_subsidence_dim : int, default 1
Last-dim of subsidence target.
output_gwl_dim : int, default 1
Last-dim of GWL target.
mode : {'pihal_like','tft_like'}, default 'pihal_like'
Shapes the “future” window length for TFT vs. PIHALNet.
normalize_coords : bool, default True
Apply MinMax scaling to (t,x,y) across all sequences.
cols_to_scale : list[str] or 'auto' or None
Additional columns to scale via MinMax.
method : {'rolling','strided','random','expanding','bootstrap'}
Sequence-generation strategy.
stride : int, default 1
Step size for 'strided' sampling.
random_samples : int, optional
Number of random start indices for 'random' sampling.
expand_step : int, default 1
Increment size for 'expanding' sampling.
n_bootstrap : int, default 0
Number of blocks for 'bootstrap' sampling.
progress_hook : callable, optional
Called with float in [0,1] to report overall progress.
stop_check : callable, optional
If returns True, aborts sequence generation early.
verbose : int, default 1
Verbosity level (higher = more logs).
_logger : logging.Logger or callable, optional
Logger or print‐style function for vlog().
**kwargs
Passed to helper.
Returns
-------
inputs : dict[str, np.ndarray]
Contains 'coords', 'dynamic_features', optionally
'static_features' and 'future_features'.
targets : dict[str, np.ndarray]
Contains 'subsidence' and 'gwl' arrays.
coord_scaler : MinMaxScaler or None
Fitted scaler for coords, if normalization was applied.
"""
def _v(msg, lvl):
vlog(msg, verbose=verbose, level=lvl, logger=_logger)
# Optionally allow early abort
if stop_check and stop_check():
_v("Sequence generation aborted before start.", 1)
return {}, {}, None
# Split into groups
groups = (
[g for _, g in df.groupby(group_id_cols)]
if group_id_cols
else [df]
)
sequences: list[tuple[pd.DataFrame, int]] = []
L = time_steps + forecast_horizon
total_groups = len(groups)
for gi, gdf in enumerate(groups):
if stop_check and stop_check():
_v("Sequence generation aborted.", 1)
break
length = len(gdf)
max_start = length - L
if max_start < 0:
_v(
f"Group {gi} too short (len={length}); skipping.",
2,
)
continue
# Determine start indices by method
if method == "rolling":
starts = range(0, max_start + 1)
elif method == "strided":
starts = range(0, max_start + 1, stride)
elif method == "random":
all_starts = list(range(0, max_start + 1))
if random_samples is None or random_samples > len(
all_starts
):
starts = all_starts
else:
starts = np.random.choice(
all_starts, random_samples, replace=False
)
elif method == "expanding":
starts = list(
range(0, max_start + 1, expand_step)
)
elif method == "bootstrap":
block_size = L
blocks = list(
range(0, length - block_size + 1, block_size)
)
starts = np.random.choice(
blocks, n_bootstrap, replace=True
)
else:
raise ValueError(f"Unknown method '{method}'")
for i in starts:
sequences.append((gdf, int(i)))
# Report group‐level progress
if progress_hook:
progress_hook((gi + 1) / total_groups * 0.5)
# Build arrays from these starts
inputs, targets, coord_scaler = _build_from_starts(
sequences,
time_col,
time_steps,
forecast_horizon,
subsidence_col,
gwl_col,
dynamic_cols,
static_cols or [],
future_cols or [],
spatial_cols,
mode,
normalize_coords,
cols_to_scale,
output_subsidence_dim,
output_gwl_dim,
verbose,
_logger,
)
# Final progress
if progress_hook:
progress_hook(1.0)
return inputs, targets, coord_scaler
def _build_from_starts(
seqs: list[tuple[pd.DataFrame, int]],
time_col: str,
T: int,
H: int,
subs_col: str,
gwl_col: str,
dyn_cols: list[str],
stat_cols: list[str],
fut_cols: list[str],
spatial_cols: tuple[str, str] | None,
mode: str,
norm_coords: bool,
cols_to_scale: list[str] | str | None,
out_sub_dim: int,
out_gwl_dim: int,
verbose: int = 1,
_logger=None,
) -> tuple[
dict[str, np.ndarray],
dict[str, np.ndarray],
MinMaxScaler | None,
]:
def _v(msg, lvl):
vlog(msg, verbose=verbose, level=lvl, logger=_logger)
N = len(seqs)
_v(f"Building {N} sequences (T={T}, H={H})", 1)
# Allocate arrays
coords = np.zeros((N, H, 3), dtype=np.float32)
dyn = np.zeros((N, T, len(dyn_cols)), dtype=np.float32)
stat = (
np.zeros((N, len(stat_cols)), dtype=np.float32)
if stat_cols
else None
)
fut_len = T + H if mode == "tft_like" else H
fut = (
np.zeros(
(N, fut_len, len(fut_cols)), dtype=np.float32
)
if fut_cols
else None
)
subs = np.zeros((N, H, out_sub_dim), dtype=np.float32)
gwl_a = np.zeros((N, H, out_gwl_dim), dtype=np.float32)
# Fit coordinate scaler if needed
if norm_coords and spatial_cols:
all_blocks = []
for gdf, i in seqs:
window = gdf.iloc[i : i + T + H]
block = np.stack(
[
window[time_col].values[:H],
window[spatial_cols[0]].values[:H],
window[spatial_cols[1]].values[:H],
],
axis=1,
)
all_blocks.append(block)
flat = np.vstack(all_blocks)
coord_scl = MinMaxScaler().fit(flat)
else:
coord_scl = None
# Fill arrays
for idx, (gdf, i) in enumerate(seqs):
window = gdf.iloc[i : i + T + H]
dyn[idx] = window.iloc[:T][dyn_cols].values
if stat is not None:
stat[idx] = gdf.iloc[0][stat_cols].values
if fut is not None:
if mode == "tft_like":
fut[idx] = window.iloc[: T + H][
fut_cols
].values
else:
fut[idx] = window.iloc[T : T + H][
fut_cols
].values
# Coordinates
block = np.stack(
[
window[time_col].values[:H],
(
window[spatial_cols[0]].values[:H]
if spatial_cols
else np.zeros(H)
),
(
window[spatial_cols[1]].values[:H]
if spatial_cols
else np.zeros(H)
),
],
axis=1,
)
coords[idx] = (
coord_scl.transform(block) if coord_scl else block
)
# Targets
subs[idx] = window.iloc[T : T + H][
subs_col
].values.reshape(H, out_sub_dim)
gwl_a[idx] = window.iloc[T : T + H][
gwl_col
].values.reshape(H, out_gwl_dim)
if verbose >= 3 and idx % 1000 == 0:
_v(f" → Processed sequence {idx + 1}/{N}", 2)
inputs = {"coords": coords, "dynamic_features": dyn}
if stat is not None:
inputs["static_features"] = stat
if fut is not None:
inputs["future_features"] = fut
targets = {"subsidence": subs, "gwl": gwl_a}
_v("Sequence building complete.", 1)
return inputs, targets, coord_scl
[docs]
@isdf
def generate_ts_sequences(
df: pd.DataFrame,
time_col: str,
dynamic_cols: list[str],
static_cols: list[str] | None = None,
future_cols: list[str] | None = None,
spatial_cols: tuple[str, str] | None = None,
group_id_cols: list[str] | None = None,
time_steps: int = 12,
forecast_horizon: int = 1,
normalize_coords: bool = True,
cols_to_scale: list[str] | str | None = None,
method: str = "rolling",
stride: int = 1,
random_samples: int | None = None,
expand_step: int = 1,
n_bootstrap: int = 0,
progress_hook: Callable[[float], None] | None = None,
stop_check: Callable[[], bool] | None = None,
verbose: int = 1,
_logger: Callable[[str], None] | None = None,
**kwargs,
) -> tuple[
dict[str, np.ndarray],
dict[str, np.ndarray],
MinMaxScaler | None,
]:
"""
Generate time-series windows for encoder/decoder and covariates.
Supports rolling, strided, random, expanding, and bootstrap.
Parameters
----------
df : pd.DataFrame
Input frame with time and feature columns.
time_col : str
Name of the time coordinate column.
dynamic_cols : list[str]
Past-covariate columns for encoder inputs.
static_cols : list[str] or None
Static covariate columns, repeated per window.
future_cols : list[str] or None
Known-future covariates for decoder inputs.
spatial_cols : tuple(str,str) or None
(lon, lat) column names for spatial coords.
group_id_cols : list[str] or None
Columns to group by for independent series.
time_steps : int
Number of past steps (T) per window.
forecast_horizon : int
Number of future steps (H) per window.
normalize_coords : bool
If True, MinMax-scale spatial coords.
cols_to_scale : list[str] or 'auto' or None
Other columns to MinMax-scale.
method : str
'rolling','strided','random','expanding','bootstrap'.
stride : int
Step size for 'strided' windows.
random_samples : int or None
Number of random windows if method='random'.
expand_step : int
Increment for 'expanding' windows.
n_bootstrap : int
Number of bootstrap samples if method='bootstrap'.
progress_hook : callable or None
Receives float [0,1] as work progresses.
stop_check : callable or None
If returns True, aborts generation.
verbose : int
Verbosity level. >0 logs progress.
_logger : callable or None
Logger to use for messages.
Returns
-------
inputs : dict of np.ndarray
'encoder_inputs','static','future','coords'.
targets : dict of np.ndarray
'decoder_targets'.
coord_scaler : MinMaxScaler or None
Fitted scaler for coords, if normalized.
Raises
------
SequenceGeneratorError
If no valid windows could be generated.
"""
def _v(msg, lvl):
vlog(msg, verbose=verbose, level=lvl, logger=_logger)
# split into groups
if group_id_cols:
groups = [g for _, g in df.groupby(group_id_cols)]
else:
groups = [df]
all_enc, all_dec = [], []
all_stat, all_fut, all_coord = [], [], []
L = time_steps + forecast_horizon
total = 0
for gdf in groups:
if stop_check and stop_check():
_v("Generation aborted by stop_check()", 1)
break
M = len(gdf)
if M < L:
_v(f"Group too small (len={M}), skip", 2)
continue
dyn = gdf[dynamic_cols].values
win = sliding_window_view(dyn, window_shape=L, axis=0)
idx = np.arange(win.shape[0])
if method == "strided":
idx = idx[::stride]
elif method == "random":
if random_samples and random_samples < len(idx):
idx = np.random.choice(
idx, random_samples, replace=False
)
elif method == "expanding":
idx = idx[::expand_step]
elif method == "bootstrap":
idx = np.random.randint(
0, len(idx), size=n_bootstrap
)
elif method != "rolling":
raise ValueError(f"Unknown method {method}")
if idx.size == 0:
continue
enc = win[idx, :time_steps]
dec = win[idx, time_steps:]
all_enc.append(enc)
all_dec.append(dec)
if static_cols:
st = gdf.iloc[0][static_cols].values.astype(
np.float32
)
all_stat.append(
np.repeat(st[None, :], len(idx), 0)
)
if future_cols:
fut = gdf[future_cols].values
fw = sliding_window_view(
fut, window_shape=forecast_horizon, axis=0
)
fw = fw[time_steps : time_steps + len(idx)]
all_fut.append(fw)
if spatial_cols:
t, x, y = (
gdf[c].values
for c in (time_col,) + spatial_cols
)
coord = np.stack(
[
sliding_window_view(t, L, 0)[
idx, time_steps:
],
sliding_window_view(x, L, 0)[
idx, time_steps:
],
sliding_window_view(y, L, 0)[
idx, time_steps:
],
],
1,
)
all_coord.append(coord)
total += len(idx)
if progress_hook:
progress_hook(
min(1.0, total / (len(df) // L + 1))
)
if not all_enc:
raise SequenceGeneratorError(
"No sequences generated (series too short)"
)
Xe = np.concatenate(all_enc, 0)
Xd = np.concatenate(all_dec, 0)
inputs = {"encoder_inputs": Xe}
targets = {"decoder_targets": Xd}
if static_cols:
inputs["static"] = np.concatenate(all_stat, 0)
if future_cols:
inputs["future"] = np.concatenate(all_fut, 0)
if spatial_cols:
coords = np.concatenate(all_coord, 0)
if normalize_coords:
flat = coords.reshape(-1, 3)
coord_scl = MinMaxScaler().fit(flat)
inputs["coords"] = coord_scl.transform(
flat
).reshape(coords.shape)
else:
coord_scl = None
inputs["coords"] = coords
else:
coord_scl = None
_v(f"Generated {Xe.shape[0]} windows", 1)
return inputs, targets, coord_scl
def _generate_ts_sequences(
series: np.ndarray | pd.Series,
time_steps: int = 12,
forecast_horizon: int = 1,
method: str = "rolling", # 'rolling','strided','random','expanding','bootstrap'
stride: int = 1, # for 'strided'
random_samples: int | None = None, # for 'random'
expand_step: int = 1, # for 'expanding'
n_bootstrap: int = 0, # for 'bootstrap'
shuffle: bool = False, # whether to shuffle final arrays
) -> tuple[np.ndarray, np.ndarray]:
"""
Generate (X, y) arrays from a 1D series.
X has shape (N, time_steps),
y has shape (N, forecast_horizon).
Parameters
----------
series
1D array or pandas Series of length M.
time_steps
Number of past steps (T) for each input window.
forecast_horizon
Number of future steps (H) for each target window.
method
Sampling strategy:
- 'rolling': every possible window,
- 'strided': every `stride` windows,
- 'random': random subset of starts,
- 'expanding': windows starting at 0,expand by `expand_step`,
- 'bootstrap': `n_bootstrap` random blocks of size T+H.
stride
Step size for 'strided'.
random_samples
Number of random windows if method='random'.
expand_step
Increment for 'expanding'.
n_bootstrap
Number of bootstrap samples if method='bootstrap'.
shuffle
Shuffle output windows.
Returns
-------
X : ndarray, shape (N, time_steps)
y : ndarray, shape (N, forecast_horizon)
Raises
------
ValueError
If `method` is unknown or if the series is too short.
"""
# ensure numpy array
arr = (
series.values
if isinstance(series, pd.Series)
else np.asarray(series)
)
M = arr.shape[0]
L = time_steps + forecast_horizon
max_start = M - L
if max_start < 0:
# not enough data for even one window
return np.empty((0, time_steps)), np.empty(
(0, forecast_horizon)
)
# rolling windows via stride_tricks
windows = sliding_window_view(arr, window_shape=L)
if method == "rolling":
idx = np.arange(max_start + 1)
elif method == "strided":
idx = np.arange(0, max_start + 1, stride)
elif method == "random":
all_idx = np.arange(max_start + 1)
if random_samples is None or random_samples >= len(
all_idx
):
idx = all_idx
else:
idx = np.random.choice(
all_idx, random_samples, replace=False
)
elif method == "expanding":
idx = np.arange(0, max_start + 1, expand_step)
elif method == "bootstrap":
# pick random blocks of length L
idx = np.random.randint(
0, max_start + 1, size=n_bootstrap
)
else:
raise ValueError(f"Unknown method: {method}")
# slice out X and y
X = windows[idx, :time_steps]
y = windows[
idx, time_steps : time_steps + forecast_horizon
]
if shuffle:
p = np.random.permutation(len(X))
X, y = X[p], y[p]
return X, y
class SequenceGeneratorError(RuntimeError):
"""Raised when no sequence can be generated with the given settings."""