geoprior.utils.nat_utils#

Public exports for NAT workflow utilities.

geoprior.utils.nat_utils.build_censor_mask(xb, H, idx, thresh=0.5, *, source='dynamic', reduce_time='any', align='broadcast')[source]#

Build a censor mask aligned to the forecast horizon: (B, H, 1).

Parameters:
  • source ({"dynamic", "future"}, default "dynamic") – Selects where the censoring flag is read from. "dynamic" reads xb["dynamic_features"][:, :, idx] from the history window, while "future" reads xb["future_features"][:, :, idx] from the forecast window.

  • reduce_time ({"any", "last", "all"}, default "any") – Reduction applied when source="dynamic" and the censor flag behaves like a per-sample label. "any" marks the sample as censored if any history step is flagged, "last" uses only the last history step, and "all" requires every history step to be flagged.

  • align ({"broadcast", "crop", "pad_false", "pad_edge", "error"}, default "broadcast") – Policy used when the time axis does not already match the forecast horizon H. "broadcast" repeats a single-step label across all horizon steps, "crop" keeps the last H steps, "pad_false" pads missing steps with False, "pad_edge" repeats the last available step, and "error" raises on mismatch.

  • xb (dict)

  • idx (int | None)

  • thresh (float)

Return type:

Tensor

geoprior.utils.nat_utils.ensure_input_shapes(x, mode, forecast_horizon)[source]#

Ensure presence of zero-width static/future placeholders.

Stage-1 exporters sometimes omit static_features or future_features when there are no static/future variables for a particular experiment. Keras, however, expects these inputs to exist so that the input signature remains stable.

This helper:

  • Copies the input dict to avoid in-place modification.

  • Ensures static_features is an array of shape (N, 0) if missing.

  • Ensures future_features is an array of shape (N, T_future, 0) if missing, where:

    • T_future = dynamic_features.shape[1] when mode == "tft_like" (past+future style).

    • Otherwise, T_future = forecast_horizon.

Parameters:
  • x (dict) – Dictionary containing at least dynamic_features with shape (N, T_dyn, D_dyn).

  • mode (str) – Model mode. When "tft_like" the future sequence length is inferred from the dynamic sequence.

  • forecast_horizon (int) – Forecast horizon in time steps/years for non-TFT modes.

Returns:

Shallow copy of x with guaranteed static_features and future_features entries.

Return type:

dict

geoprior.utils.nat_utils.extract_preds(model, out, *, strict=True, output_names=None)[source]#

Extract (subs_pred, gwl_pred) from GeoPrior outputs.

Supports:
  1. v3.2+ call(): {“subs_pred”,”gwl_pred”}

  2. forward_with_aux(): (y_pred, aux)

  3. legacy: {“data_final”} + model.split_data_predictions

  4. predict(): list/tuple mapped via output names

If strict=True, list/tuple outputs must be mappable via output names; otherwise we raise to avoid silent swaps.

This helper normalizes the output interface across two GeoPrior generation families:

  1. New interface (preferred) model(inputs) -> {"subs_pred": ..., "gwl_pred": ...}

  2. Legacy interface (backward compatible) model(inputs) -> {"data_final": ...}, where the caller must split the tensor using model.split_data_predictions.

Parameters:
  • model (object) –

    A Keras-like model instance that may expose split_data_predictions(data_final).

    The splitter must return a tuple:

    • subs_pred with shape (B, H, 1) or (B, H, Q, 1)

    • gwl_pred with shape (B, H, 1) or (B, H, Q, 1)

  • out (dict) –

    Output returned by the model call, typically model(inputs, training=False).

    Supported keys are either:

    • {"subs_pred", "gwl_pred"} (new interface), or

    • {"data_final"} (legacy interface).

  • strict (bool)

  • output_names (Sequence[str] | None)

Returns:

  • subs_pred (Tensor) – Predicted subsidence in model space.

    Expected shapes:

    • Point mode: (B, H, 1)

    • Quantile mode: (B, H, Q, 1)

  • gwl_pred (Tensor) – Predicted groundwater/head variable in model space.

    Expected shapes:

    • Point mode: (B, H, 1)

    • Quantile mode: (B, H, Q, 1)

Raises:
  • KeyError – If out does not contain a supported key set.

  • TypeError – If out is not a mapping/dict-like object.

Return type:

tuple[Any, Any]

Notes

This function is intended for Stage-2 and Stage-3 scripts where you may load checkpoints from older experiments. It avoids fragile code that slices data_final manually.

The function does not validate tensor dtypes or numerical finiteness. Upstream code should handle NaN and Inf checks as needed. Output normalization follows the Keras model conventions documented in Keras Team [24].

Examples

New interface:

out = model_inf(xb, training=False)
s_pred, h_pred = extract_stage_outputs(
    model_inf,
    out,
)

Legacy interface:

out = model_inf(xb, training=False)
s_pred, h_pred = extract_stage_outputs(
    model_inf,
    out,
)

See also

subs_point_from_stage_out

Convert subsidence predictions to a point forecast.

geoprior.utils.nat_utils.load_nat_config(root='nat.com')[source]#

High-level helper used by NATCOM scripts.

Example

>>> from geoprior.utils.nat_utils import load_nat_config
>>> cfg = load_nat_config()
>>> CITY_NAME = cfg["CITY_NAME"]
>>> TIME_STEPS = cfg["TIME_STEPS"]
Return type:

dict[str, Any]

geoprior.utils.nat_utils.load_nat_config_payload(root='nat.com')[source]#

Return the full config.json payload, including city, model and __meta__ fields.

This is convenient when you also want to see which hash or city/model are currently active.

Return type:

dict[str, Any]

geoprior.utils.nat_utils.load_scaler_info(encoders_block)[source]#

Load the scaler_info mapping from an encoders block.

Stage-1 exporters typically store a compact description of the scalers used to normalise the data. In many cases this takes the form:

encoders = {
    "main_scaler": "/path/to/minmax.joblib",
    "coord_scaler": "/path/to/coords.joblib",
    "scaler_info": "/path/to/scaler_info.joblib",
    ...
}

where scaler_info is either a path to a joblib file or an already-loaded dictionary.

This helper returns a dictionary regardless of how it was stored, making downstream formatting/evaluation code simpler.

Parameters:

encoders_block (dict) – The encoders part of the Stage-1 manifest (M["artifacts"]["encoders"]).

Returns:

The loaded scaler_info dictionary, or None if not present / not loadable.

Return type:

dict or None

geoprior.utils.nat_utils.make_tf_dataset(X_np, y_np, batch_size, shuffle, mode, forecast_horizon, *, seed=42, drop_remainder=False, reshuffle_each_iter=True, prefetch=True, check_npz_finite=False, check_finite=False, scan_finite_batches=0, dynamic_feature_names=None, future_feature_names=None)[source]#

Build a tf.data.Dataset using NATCOM conventions.

Steps: 1) ensure_input_shapes(…) for X. 2) map_targets_for_training(…) for y. 3) tf.data pipeline (shuffle/batch/prefetch). 4) optional finite checks (NPZ + tf batches).

Parameters:
  • X_np (dict) – Input dictionary, typically obtained from np.load on the Stage-1 *_inputs_npz file.

  • y_np (dict) – Target dictionary, typically obtained from np.load on the Stage-1 *_targets_npz file.

  • batch_size (int) – Number of samples per batch.

  • shuffle (bool) – If True, shuffle the dataset using a fixed seed for reproducibility.

  • mode (str) – Model mode passed to ensure_input_shapes().

  • forecast_horizon (int) – Forecast horizon passed to ensure_input_shapes().

  • check_npz_finite (bool) – If True, checks Xin/Yin numpy arrays for NaN/Inf before building ds.

  • check_finite (bool) – If True, inserts assert_all_finite checks inside the tf.data pipeline.

  • scan_finite_batches (int) – If >0, eagerly scans first N batches right away (fails early).

  • dynamic_feature_names (list[str] | None) – If provided, used to report bad channels for feature tensors.

  • future_feature_names (list[str] | None) – If provided, used to report bad channels for feature tensors.

  • seed (int)

  • drop_remainder (bool)

  • reshuffle_each_iter (bool)

  • prefetch (bool)

Returns:

Dataset of (X, y) pairs.

Return type:

tf.data.Dataset

Notes

TensorFlow is imported lazily inside the function so that this module remains importable in environments where TF is not installed (for example, for tooling or static analysis).

geoprior.utils.nat_utils.map_targets_for_training(y_dict, subs_key='subsidence', gwl_key='gwl', subs_pred_key='subs_pred', gwl_pred_key='gwl_pred')[source]#

Standardise target dictionaries to the Keras compile keys.

This helper enforces a small convention used throughout the NATCOM training scripts:

  • Upstream sequence builders typically export raw targets with keys subsidence and gwl.

  • The GeoPrior model is compiled with targets named subs_pred and gwl_pred.

This function accepts either style and always returns a dict keyed by subs_pred and gwl_pred for use in Keras.

Parameters:
  • y_dict (dict) – Dictionary produced by the Stage-1 sequence exporter or by a previous training script. Must contain either (subsidence, gwl) or (subs_pred, gwl_pred).

  • subs_key (str, default "subsidence") – Name of the raw subsidence key in y_dict.

  • gwl_key (str, default "gwl") – Name of the raw groundwater-level key in y_dict.

  • subs_pred_key (str, default "subs_pred") – Standardised key for the subsidence prediction target.

  • gwl_pred_key (str, default "gwl_pred") – Standardised key for the GWL prediction target.

Returns:

New dictionary with keys subs_pred and gwl_pred.

Return type:

dict

Raises:

KeyError – If the dictionary does not contain either of the expected key pairs.

geoprior.utils.nat_utils.name_of(obj)[source]#

Return a human-readable name for an object.

This utility is handy when serialising compile configurations (e.g., turning metric callables into simple strings for JSON logs).

Parameters:

obj (object) – Any Python object (function, class instance, etc.).

Returns:

obj.__name__ if present, otherwise the class name, and finally str(obj) as a last resort.

Return type:

str

geoprior.utils.nat_utils.resolve_hybrid_config(manifest_cfg, live_cfg, verbose=True)[source]#

Merge Manifest config (Data Authority) with Live config (Physics Authority).

Parameters:
Return type:

dict

geoprior.utils.nat_utils.resolve_si_affine(cfg, scaler_info, *, target_name, prefix, unit_factor_key, scale_key, bias_key)[source]#
Parameters:
  • cfg (dict)

  • scaler_info (dict)

  • target_name (str)

  • prefix (str)

  • unit_factor_key (str)

  • scale_key (str)

  • bias_key (str)

geoprior.utils.nat_utils.best_epoch_and_metrics(history, monitor='val_loss')[source]#

Return the best epoch and metrics at that epoch.

Given a History.history dictionary produced by model.fit(...), this helper identifies the index of the minimum value for the monitored quantity (by default "val_loss") and returns:

  • The epoch index (0-based).

  • A dictionary mapping each metric name to its value at that epoch.

Parameters:
  • history (dict) – The history.history attribute from Keras training.

  • monitor (str, default "val_loss") – Name of the metric to minimise.

Returns:

  • best_epoch (int or None) – Index of the best epoch, or None if monitor is not present.

  • metrics_at_best (dict) – Mapping from metric name to its value at the best epoch. Empty if monitor is not present.

Return type:

tuple[int | None, dict]

geoprior.utils.nat_utils.subs_point_from_out(model, out, quantiles=None, med_idx=None)[source]#

Convert model output into a subsidence point forecast.

This helper produces a subsidence tensor shaped (B, H, 1) in model space, regardless of whether the model emits quantiles or a point prediction.

  • If quantiles are present and the subsidence prediction is shaped (B, H, Q, 1), the function selects the median quantile slice.

  • Otherwise, it returns the point prediction directly.

Parameters:
  • model (object) – A Keras-like model instance passed to extract_stage_outputs().

  • out (dict) –

    Output returned by the model call.

    This can be either the new interface with keys "subs_pred" and "gwl_pred", or the legacy interface with key "data_final".

  • quantiles (sequence of float or None, default None) –

    Quantile levels used by the model, such as [0.1, 0.5, 0.9].

    If provided, the function may use it to interpret the rank-4 quantile output and select the median.

    If None, quantile selection is disabled unless med_idx is explicitly provided and the tensor rank indicates quantiles.

  • med_idx (int or None, default None) –

    Index along the quantile axis to use as the “point” forecast when quantiles are available.

    If None and quantiles is provided, the function selects the index closest to 0.5.

Returns:

subs_point – Subsidence point prediction in model space with shape (B, H, 1).

Return type:

Tensor

Raises:
  • ValueError – If subsidence prediction is missing or None.

  • ValueError – If a quantile tensor is detected but a valid median index cannot be resolved.

Notes

Quantile outputs are assumed to be shaped (B, H, Q, 1) where the quantile axis is the third dimension (axis=2).

If the model returns point predictions already, the function is effectively a no-op. The quantile interpretation used here follows Koenker and Bassett [25].

Examples

Quantile model:

out = model_inf(xb, training=False)
s_point = subs_point_from_stage_out(
    model_inf,
    out,
    quantiles=[0.1, 0.5, 0.9],
)

Point model:

out = model_inf(xb, training=False)
s_point = subs_point_from_stage_out(
    model_inf,
    out,
)

See also

extract_stage_outputs

Normalize outputs across new and legacy checkpoints.

geoprior.utils.nat_utils.serialize_subs_params(params, cfg=None)[source]#

Make GeoPrior subnet parameters JSON-friendly.

The training scripts typically pass a dictionary of model construction arguments, e.g. subsmodel_params, which contains objects such as LearnableMV or FixedGammaW that are not directly JSON-serialisable.

This helper replaces those objects by small dictionaries describing their type and scalar value, optionally using values from the NATCOM config dictionary.

Parameters:
  • params (dict) – Dictionary of model init parameters (e.g. subsmodel_params in training_NATCOM_GEOPRIOR.py).

  • cfg (dict, optional) –

    NATCOM config dictionary. If provided, scalar values are taken from:

    • GEOPRIOR_INIT_MV

    • GEOPRIOR_INIT_KAPPA

    • GEOPRIOR_GAMMA_W

    • GEOPRIOR_H_REF

    and used as the authoritative numbers.

Returns:

Copy of params where scalar GeoPrior parameters are replaced by JSON-friendly dictionaries.

Return type:

dict

Notes

This function does not import any of the GeoPrior classes. It only introspects attributes like initial_value or value when the corresponding config entry is missing.

geoprior.utils.nat_utils.save_ablation_record(outdir, city, model_name, cfg, eval_dict, phys_diag=None, per_h_mae=None, per_h_r2=None, log_fn=None)[source]#

Append a single ablation record to ablation_record.jsonl.

Each training run (e.g., different physics toggles or weights) writes one JSON line containing:

  • Basic run identifiers (city, model, timestamp).

  • Physics configuration (PDE_MODE_CONFIG, lambda weights, effective head flags, etc.).

  • Key performance metrics (R², MSE, MAE, coverage, sharpness).

  • Optional physics diagnostics (epsilon_prior, epsilon_cons).

  • Optional per-horizon MAE/R² for more detailed analysis.

Parameters:
  • outdir (str) – Base output directory for the current run. The ablation file is created under outdir / "ablation_records".

  • city (str) – City name (e.g., "nansha" or "zhongshan").

  • model_name (str) – Model identifier (e.g., "GeoPriorSubsNet").

  • cfg (dict) – Lightweight configuration dictionary containing at least the physics-related keys used below.

  • eval_dict (dict or None) – Dictionary of evaluation metrics (R², MSE, MAE, coverage80, sharpness80). If None, metrics fields default to None.

  • phys_diag (dict or None, optional) – Physics diagnostics (e.g., from evaluate()) with keys such as "epsilon_prior" and "epsilon_cons".

  • per_h_mae (dict or None, optional) – Per-horizon MAE values (e.g., keyed by year/step).

  • per_h_r2 (dict or None, optional) – Per-horizon R² values.

Return type:

None

Notes

The output file is a JSON-Lines file, so it can be loaded with load_ablation_jsonl().

geoprior.utils.nat_utils.load_windows_npz(path)[source]#

Load Stage-1 windows as (x, y).

Supported: - Bundle NPZ (contains inputs+targets in one file). - Mapping {‘inputs’: <npz>, ‘targets’: <npz>}. - Inputs NPZ only (targets inferred by filename). - Directory containing inputs/targets NPZ.

Returns:

  • x (dict[str, np.ndarray]) – Inputs (e.g., static_features, dynamic_features, etc.)

  • y (dict[str, np.ndarray]) – Targets (e.g., subs_pred, gwl_pred)

Parameters:

path (str | Path | Mapping[str, str])

Return type:

tuple[dict[str, ndarray], dict[str, ndarray]]

geoprior.utils.nat_utils.load_tuned_hps_near_model(model_path, *, prefer='keras', required=True, log_fn=None)[source]#
Parameters:
Return type:

dict

geoprior.utils.nat_utils.load_trained_hps_near_model(model_path, *, allowed, required=False, log_fn=None)[source]#
Parameters:
Return type:

dict

geoprior.utils.nat_utils.sanitize_inputs_np(X)[source]#
Parameters:

X (dict)

Return type:

dict

geoprior.utils.nat_utils.load_hps_auto_near_model(model_path, *, allowed, prefer='keras', required=False, log_fn=None)[source]#
Parameters:
Return type:

dict

geoprior.utils.nat_utils.load_or_rebuild_geoprior_model(model_path, manifest, X_sample, out_s_dim, out_g_dim, mode, horizon, quantiles, city_name=None, compile_on_load=True, verbose=1)[source]#

Load a tuned or trained GeoPriorSubsNet, with robust rebuild fallback.

Parameters:
geoprior.utils.nat_utils.compile_for_eval(model, manifest, best_hps, quantiles, *, include_metrics=True)[source]#

Recompile a GeoPriorSubsNet instance for evaluation / diagnostics.

This is intended for: - tuned models loaded from a .keras archive, or - models rebuilt from best_hps.

It does NOT change the architecture or weights, only the compile configuration (optimizer, losses, and physics loss weights).

Parameters:
  • model (GeoPriorSubsNet) – Loaded or freshly-built GeoPriorSubsNet instance.

  • manifest (dict) – Stage-1 manifest; training config is taken from manifest['config'].

  • best_hps (dict or None) – Dictionary of tuned hyperparameters. If empty/None, reasonable defaults are inferred from the manifest.

  • quantiles (list of float or None) – Quantiles used for probabilistic subsidence/GWL outputs.

  • include_metrics (bool, default True) – If True, attach MAE/MSE + coverage/sharpness metrics to match the training script; if False, only losses are configured.

Returns:

The same model instance, compiled in-place.

Return type:

model

geoprior.utils.nat_utils.load_best_hps_near_model(model_path, *, model_name='GeoPriorSubsNet', prefer='keras', log_fn=None)[source]#

Load best hyperparameters saved near a model artifact.

Supports model names like: <city>_<model_name>_H{H}_best.keras <city>_<model_name>_H{H}_best.weights.h5

Parameters:
  • model_path (str) – Path to a model file or its run directory.

  • model_name (str or None, default "GeoPriorSubsNet") – Model name token in filenames.

  • prefer ({"keras", "weights"}, default "keras") – Which artifact type to infer the prefix from.

  • log_fn (callable or None, default None) – Logger (e.g. print). None disables logs.

Returns:

best_hps – Non-empty hyperparameter dictionary.

Return type:

dict

Raises:
geoprior.utils.nat_utils.pick_npz_for_dataset(manifest, split)[source]#

Load (inputs, targets) NPZ arrays for a given dataset split.

This is a public, reusable version of the internal helper that was previously named _pick_npz_for_dataset.

Parameters:
  • manifest (dict) –

    Stage-1 manifest dictionary with the structure:

    manifest["artifacts"]["numpy"] = {
        "train_inputs_npz": ...,
        "train_targets_npz": ...,
        "val_inputs_npz": ...,
        "val_targets_npz": ...,
        "test_inputs_npz": ... (optional),
        "test_targets_npz": ... (optional),
    }
    

  • split ({"train", "val", "test"}) – Which dataset to load.

Returns:

  • X (dict or None) – Dictionary of input arrays for the requested split, or None if the split is unavailable (e.g. test NPZ missing).

  • y (dict or None) – Dictionary of target arrays for the requested split, or None if targets are unavailable.

Raises:
  • KeyError – If the manifest does not contain the expected NPZ entries.

  • ValueError – If split is not one of {"train", "val", "test"}.

Return type:

tuple[dict | None, dict | None]

geoprior.utils.nat_utils.ensure_config_json(root='nat.com')[source]#

Ensure that nat.com/config.json exists and is consistent with nat.com/config.py.

Returns:

  • config (dict) – The configuration dictionary (payload[“config”]).

  • json_path (str) – Absolute path to config.json.

  • Behaviour

  • ---------

  • - If `config.json does not exist`, it is created fromconfig.py.

  • - If it exists but the SHA-256 hash of config.py has – changed, it is regenerated.

  • - Otherwise the existing JSON file is reused.

Parameters:

root (str)

Return type:

tuple[dict[str, Any], str]

geoprior.utils.nat_utils.get_natcom_dir(root='nat.com')[source]#

Directory containing NATCOM scripts and configuration, typically <repo_root>/nat.com.

Return type:

str

geoprior.utils.nat_utils.get_config_paths(root='nat.com')[source]#

Return (config_py_path, config_json_path) for NATCOM.

Return type:

tuple[str, str]

geoprior.utils.nat_utils.compile_geoprior_for_eval(model, manifest, best_hps, quantiles)[source]#

(Re)compile a GeoPriorSubsNet-like model for evaluation.

This helper uses the Stage-1 manifest and tuned hyperparameters to configure:

  • the pinball losses for subsidence and GWL outputs,

  • loss weights for the two heads,

  • physics loss weights (lambda_*),

  • learning rate and LR multipliers.

TensorFlow and geoprior are imported lazily inside this function so that nat_utils can be imported even in non-TF environments.

Parameters:
  • model (GeoPriorSubsNet-like) – An instance of the GeoPriorSubsNet model (or any model exposing the same compile signature).

  • manifest (dict) – Stage-1 manifest dictionary. The config entry is used to retrieve default loss weights and physics settings.

  • best_hps (dict) – Hyperparameters loaded from the tuning run (e.g. via load_best_hps_near_model()).

  • quantiles (list of float or None) – Quantile levels used for probabilistic outputs. If None, mean-squared error is used instead of pinball loss.

Returns:

The same model instance, compiled in-place.

Return type:

model

Raises:

ImportError – If TensorFlow or geoprior’s make_weighted_pinball cannot be imported.