geoprior.utils.geo_utils#

Geospatial utility helpers for GeoPrior workflows.

Functions

augment_city_spatiotemporal_data(df, city[, ...])

Apply grouped spatiotemporal augmentation with city-aware defaults.

augment_series_features(series_df, feature_cols)

Add random noise to selected numeric feature columns.

augment_spatiotemporal_data(df, mode[, ...])

Apply interpolation, feature augmentation, or both to grouped data.

generate_dummy_pinn_data(n_samples, *[, ...])

Generate dummy PINN data dictionary with specified or default ranges.

interpolate_temporal_gaps(series_df, ...[, ...])

Interpolates missing values in specified columns of a time series DataFrame.

merge_frames_to_file(sources, output_path, *)

Merge multiple NATCOM city datasets into a single compressed file.

resolve_spatial_columns(df[, spatial_cols, ...])

Helper to validate and resolve spatial columns.

unpack_frames_from_file(merged, *[, ...])

Reverse of merge_city_frames_to_file: split an aggregated NATCOM dataset into per-city frames (and optionally write them to disk).

geoprior.utils.geo_utils.augment_city_spatiotemporal_data(df, city, mode='interpolate', group_by_cols=None, time_col=None, value_cols_interpolate=None, feature_cols_augment=None, interpolation_config=None, augmentation_config=None, target_name=None, interpolate_target=False, verbose=True, coordinate_precision=None, savefile=None)[source]#

Apply grouped spatiotemporal augmentation with city-aware defaults.

This is a convenience wrapper around augment_spatiotemporal_data. It validates the requested city, optionally rounds coordinates before grouping, and forwards interpolation and augmentation configuration dictionaries.

Parameters:
  • df (pandas.DataFrame) – Input DataFrame containing spatial, temporal, and feature columns.

  • city ({'nansha', 'zhongshan'}) – City identifier used for validation and defaults.

  • mode ({'interpolate', 'augment_features', 'both'}, optional) – Processing mode forwarded to augment_spatiotemporal_data.

  • group_by_cols (list of str or None, optional) – Grouping columns for interpolation.

  • time_col (str or None, optional) – Time column used for interpolation.

  • value_cols_interpolate (list of str or None, optional) – Columns to interpolate.

  • feature_cols_augment (list of str or None, optional) – Columns to augment with noise.

  • interpolation_config (dict or None, optional) – Keyword arguments for interpolate_temporal_gaps. Typical values include {'freq': 'AS', 'method': 'linear'}.

  • augmentation_config (dict or None, optional) – Keyword arguments for augment_series_features. Typical values include {'noise_level': 0.01, 'noise_type': 'gaussian'}.

  • target_name (str or None, optional) – Optional target column name used when inferring default feature sets.

  • interpolate_target (bool, optional) – Whether the target should be included in default interpolation columns.

  • verbose (bool, optional) – Whether to emit progress information.

  • coordinate_precision (int or None, optional) – Decimal precision applied to coordinates before grouping.

  • savefile (str or None, optional) – Optional output CSV path handled by the decorator.

Returns:

Augmented DataFrame.

Return type:

pandas.DataFrame

Raises:
  • ValueError – If city or mode is invalid, or if required arguments are missing for the selected mode.

  • TypeError – If the main inputs are of the wrong type.

geoprior.utils.geo_utils.augment_series_features(series_df, feature_cols, noise_level=0.01, noise_type='gaussian', random_seed=None, savefile=None)[source]#

Add random noise to selected numeric feature columns.

Parameters:
  • series_df (pandas.DataFrame) – Input DataFrame representing one or more time series.

  • feature_cols (list of str) – Feature columns to augment.

  • noise_level (float, optional) – Magnitude of the added noise. For Gaussian noise it scales the feature standard deviation, and for uniform noise it scales the feature range.

  • noise_type ({'gaussian', 'uniform'}, optional) – Type of noise distribution to use.

  • random_seed (int or None, optional) – Seed for reproducible noise generation.

  • savefile (str or None, optional) – Optional output path handled by the decorator.

Returns:

DataFrame with noise added to the selected feature columns.

Return type:

pandas.DataFrame

Raises:
  • ValueError – If requested feature columns are missing or noise_type is invalid.

  • TypeError – If the main inputs are of the wrong type.

Notes

Non-numeric columns are skipped, and constant or invalid numeric ranges are left unchanged.

geoprior.utils.geo_utils.augment_spatiotemporal_data(df, mode, group_by_cols=None, time_col=None, value_cols_interpolate=None, feature_cols_augment=None, interpolation_kwargs=None, augmentation_kwargs=None, savefile=None, verbose=False)[source]#

Apply interpolation, feature augmentation, or both to grouped data.

Parameters:
  • df (pandas.DataFrame) – Input spatiotemporal DataFrame.

  • mode ({'interpolate', 'augment_features', 'both'}) – Processing mode. Use interpolation only, feature augmentation only, or interpolation followed by augmentation.

  • group_by_cols (list of str or None, optional) – Grouping columns used for per-location processing.

  • time_col (str or None, optional) – Time column required when interpolation is requested.

  • value_cols_interpolate (list of str or None, optional) – Value columns to interpolate when interpolation is enabled.

  • feature_cols_augment (list of str or None, optional) – Feature columns to perturb when augmentation is enabled.

  • interpolation_kwargs (dict or None, optional) – Keyword arguments forwarded to interpolate_temporal_gaps.

  • augmentation_kwargs (dict or None, optional) – Keyword arguments forwarded to augment_series_features.

  • savefile (str or None, optional) – Optional output path handled by the decorator.

  • verbose (bool, optional) – Whether to emit progress information.

Returns:

Processed DataFrame assembled from all groups.

Return type:

pandas.DataFrame

Raises:

ValueError – If mode is invalid or required arguments for the selected mode are missing.

Notes

Groups are processed independently and concatenated afterward.

geoprior.utils.geo_utils.generate_dummy_pinn_data(n_samples, *, year_range=None, coords_range=None, subs_range=None, gwl_range=None, rainfall_range=None, vars_range=None)[source]#

Generate dummy PINN data dictionary with specified or default ranges.

Parameters:
  • n_samples (int) – Number of samples to generate.

  • year_range (tuple[float, float], optional) – (min_year, max_year) for integer years. Default (2000, 2025).

  • coords_range (tuple[tuple[float, float], tuple[float, float]], optional) – ((lon_min, lon_max), (lat_min, lat_max)). Default ((113.0, 113.8), (22.3, 22.8)).

  • subs_range (tuple[float, float], optional) – (mean_subsidence, std_subsidence) for normal distribution. Default (-20, 15).

  • gwl_range (tuple[float, float], optional) – (mean_gwl, std_gwl) for normal distribution. Default (2.5, 1.0).

  • rainfall_range (tuple[float, float], optional) – (min_rain, max_rain) for uniform distribution. Default (500, 2500).

  • vars_range (dict, optional) – Dictionary that may contain any of the keys: ‘year_range’, ‘coords_range’, ‘subs_range’, ‘gwl_range’, ‘rainfall_range’. Missing keys will fall back to defaults or to explicitly passed arguments.

Returns:

dummy_data_dict

Dictionary with keys:
  • ”year” : integer years array

  • ”longitude” : float longitudes array

  • ”latitude” : float latitudes array

  • ”subsidence” : float subsidence values array

  • ”GWL” : float groundwater level values array

  • ”rainfall_mm” : float rainfall values array

Return type:

dict[str, np.ndarray]

geoprior.utils.geo_utils.interpolate_temporal_gaps(series_df, time_col, value_cols, freq=None, method='linear', order=None, fill_limit=None, fill_limit_direction='forward', savefile=None)[source]#

Interpolates missing values in specified columns of a time series DataFrame.

This function is designed to work on a DataFrame representing a time series for a single spatial group (e.g., one monitoring location), sorted by time. If freq is provided, the DataFrame’s index is first reindexed to that frequency, which can create NaN values for missing time steps. These NaNs, along with any pre-existing NaNs in value_cols, are then interpolated.

Let \(t_1 < t_2 < \dots < t_n\) be the original timestamps. If freq yields a new index \(\{t_i'\}\) that includes times not in the original, NaNs appear at those \(t_i'\). Then for each column \(v\) in \(\{\text{value\_cols}\}\), we perform:

(1)#\[\begin{split}v(t) \;=\; \begin{cases} \text{interpolate}(v,\;t;\;\text{method},\;\dots) & \text{for } t \in \{t_i'\}\,,\\ v(t) & \text{if } t \in \{t_1,\dots,t_n\}\text{ and not NaN.} \end{cases}\end{split}\]
Parameters:
  • series_df (pd.DataFrame) – Input DataFrame for a single time series, ideally sorted by time_col. The time_col should be convertible to datetime.

  • time_col (str) – Name of the column containing datetime information.

  • value_cols (List[str]) – List of column names whose missing values (NaNs) should be interpolated.

  • freq (str or None, default None) – The desired frequency for the time series (e.g., ‘D’ for daily, ‘MS’ for month start, ‘AS’ for year start). If provided, the DataFrame is reindexed to this frequency before interpolation. This helps identify and fill gaps where entire time steps are missing.

  • method (str, default 'linear') – Interpolation method to use. Passed to pandas.DataFrame.interpolate(). Common methods: ‘linear’, ‘time’, ‘polynomial’, ‘spline’. If ‘polynomial’ or ‘spline’, order must be specified.

  • order (int or None, default None) – Order for polynomial or spline interpolation. Required if method is ‘polynomial’ or ‘spline’.

  • fill_limit (int or None, default None) – Maximum number of consecutive NaNs to fill. Passed to pandas.DataFrame.interpolate().

  • fill_limit_direction (str, default 'forward') – Direction for fill_limit (‘forward’, ‘backward’, ‘both’). Passed to pandas.DataFrame.interpolate().

  • savefile (str | None)

Returns:

DataFrame with specified columns interpolated. If freq was used, the DataFrame will have a DatetimeIndex. Other columns not in value_cols will be forward-filled after reindexing if freq is set, to propagate their last known values into new empty rows.

Return type:

pd.DataFrame

Raises:
  • TypeError – If series_df is not a DataFrame or if value_cols is not a list of strings. Also if time_col is missing from the DataFrame.

  • ValueError – If order is required but not provided for ‘polynomial’ or ‘spline’.

Examples

>>> import pandas as pd
>>> from geoprior.utils.geo_utils import interpolate_temporal_gaps
>>> # Sample time series with missing dates
>>> df = pd.DataFrame({
...     'date': ['2020-01-01', '2020-01-03', '2020-01-06'],
...     'value': [1.0, None, 4.0]
... })
>>> df
         date  value
0 2020-01-01    1.0
1 2020-01-03    NaN
2 2020-01-06    4.0
>>> result = interpolate_temporal_gaps(
...     df, time_col='date', value_cols=['value'], freq='D'
... )
>>> result.head()
         date  value
0 2020-01-01    1.0
1 2020-01-02    2.0
2 2020-01-03    3.0
3 2020-01-04    3.0
4 2020-01-05    3.5

Notes

  • Ensure series_df pertains to a single spatial group and is sorted by time for meaningful interpolation.

  • The ‘time’ method for interpolation requires the index to be a DatetimeIndex.

  • Polynomial or spline methods require order to be specified.

See also

pandas.DataFrame.interpolate

Core interpolation method.

pandas.DataFrame.asfreq

Reindex DataFrame to fixed frequency.

geoprior.utils.geo_utils.resolve_spatial_columns(df, spatial_cols=None, lon_col=None, lat_col=None)[source]#

Helper to validate and resolve spatial columns.

Accepts either explicit lon/lat columns or a list of spatial_cols. Returns (lon_col, lat_col).

  • If lon_col and lat_col are both provided, they take precedence (warn if spatial_cols also set).

  • Else if spatial_cols is provided, it must yield exactly two column names.

  • Otherwise, error is raised.

Parameters:
  • df (pd.DataFrame) – Input DataFrame for feature checks.

  • spatial_cols (list[str] or None) – Two-element list of [lon_col, lat_col].

  • lon_col (str or None) – Name of longitude column.

  • lat_col (str or None) – Name of latitude column.

Returns:

(lon_col, lat_col) – Validated column names for longitude and latitude.

Return type:

tuple of str

Raises:

ValueError – If neither lon/lat nor valid spatial_cols is provided, or if spatial_cols len != 2.

geoprior.utils.geo_utils.merge_frames_to_file(sources, output_path, *, output_format='parquet', compression='snappy', check_columns='strict', excel_mode='all_sheets', sheet_names=None, add_source_label=True, source_col='source', sort_by=None, drop_duplicates=False, reset_index=True, save_kwargs=None, verbose=1)[source]#

Merge multiple NATCOM city datasets into a single compressed file.

Parameters:
  • sources (iterable of {path-like, DataFrame}) –

    Input sources. Each element can be:

    • A path to a CSV file (e.g. "nansha_final...csv"),

    • A path to an Excel workbook (one or many sheets per city),

    • A pre-loaded DataFrame.

  • output_path (path-like) – Destination file path. If output_format='parquet' and the suffix is missing, '.parquet' is appended.

  • output_format ({'parquet', 'csv', 'feather', 'pickle'}, optional) – Output format. Default is 'parquet' for compact, columnar storage (recommended for Code Ocean).

  • compression (str or None, optional) –

    Compression to use for the chosen format.

    • For 'parquet' this is passed to pandas.DataFrame.to_parquet() (e.g. 'snappy', 'gzip', 'brotli').

    • For 'csv' it is passed to pandas.DataFrame.to_csv() via the compression keyword if non-None.

    • Ignored for 'feather' and 'pickle' (Feather uses its own defaults; pickle rarely benefits from extra compression at this layer).

  • check_columns ({'strict', 'subset', 'union'}, optional) –

    How to handle column consistency across sources:

    • 'strict' (default): all sources must have exactly the same set of columns (order may differ). Columns are then aligned to the order of the first DataFrame. A mismatch raises ValueError.

    • 'subset': all columns in the first DataFrame must exist in each subsequent DataFrame. Extra columns in later sources are dropped. Missing required columns raise ValueError.

    • 'union': columns are unioned across all sources. Any missing column in a particular source is added and filled with NaN before concatenation.

  • excel_mode ({'all_sheets', 'first_sheet'}, optional) –

    Behaviour when a source is an Excel workbook:

    • 'all_sheets' (default): read all sheets and treat each sheet as a separate DataFrame to merge.

    • 'first_sheet': read only the first sheet.

    If sheet_names is provided, it takes precedence.

  • sheet_names (iterable of str, optional) – Explicit sheet names to read from Excel workbooks. If provided, only these sheets are read.

  • add_source_label (bool, optional) – If True (default), add a column named source_col to each chunk before concatenation. For path-like inputs, the label is derived from the file name and, when applicable, sheet name (e.g. "nansha_final_main_std.harmonized.csv" or "zhongshan.xlsx:Sheet1"). For pre-loaded DataFrames, the label is '<in_memory>'.

  • source_col (str, optional) – Name of the column storing the source label when add_source_label=True.

  • sort_by (iterable of str, optional) – Optional column(s) to sort the merged DataFrame by at the end (e.g. ['city', 'year', 'longitude', 'latitude']).

  • drop_duplicates (bool, optional) – If True, drop duplicate rows at the end (after sorting).

  • reset_index (bool, optional) – If True (default), reset index after concatenation.

  • save_kwargs (dict, optional) – Extra keyword arguments forwarded to the corresponding to_* writer (e.g. to_parquet, to_csv, to_feather, to_pickle).

  • verbose (int, optional) – Verbosity level. 0 = silent, >=1 prints basic progress information.

Returns:

merged – The merged DataFrame (also written to disk).

Return type:

pandas.DataFrame

Raises:

ValueError – If check_columns='strict' or 'subset' and a column mismatch is detected.

Examples

>>> from geoprior.utils.geo_utils import merge_frames_to_file
>>> merge_frames_to_file(
...    sources=[
...        "nansha_final_main_std.harmonized.csv",
...        "zhongshan_final_main_std.harmonized.csv",
...    ],
...    output_path="natcom_all_cities",
...    output_format="parquet",
...    compression="snappy",
...    sort_by=["city", "year", "longitude", "latitude"],
... )

Notes

  • All inputs are read fully into memory before concatenation. This is acceptable for the NATCOM subsidence datasets (O(10^6 - 10^7) rows) but can be refactored to a streaming/row-group approach if needed later.

  • Using output_format='parquet' with compression (e.g. 'snappy') is recommended for Code Ocean to minimise disk usage while keeping I/O efficient.

geoprior.utils.geo_utils.unpack_frames_from_file(merged, *, group_col='city', output_dir=None, output_format='csv', compression=None, use_source_col=True, source_col='source', filename_pattern='{group_value}_split', drop_columns=None, keep_columns=None, save=True, return_dict=True, save_kwargs=None, verbose=1, logger)[source]#

Reverse of merge_city_frames_to_file: split an aggregated NATCOM dataset into per-city frames (and optionally write them to disk).

Parameters:
  • merged (path-like or DataFrame) –

    Aggregated dataset. If path-like, the format is inferred from the file suffix:

    If a DataFrame is passed, it is used directly.

  • group_col (str, optional) – Column used to split the dataset (default: 'city'). Each unique value defines one output chunk.

  • output_dir (path-like, optional) – Directory where per-group files are written. If None and merged is a path, the directory of merged is used. If merged is a DataFrame and output_dir is None, the current working directory is used.

  • output_format ({'csv', 'parquet', 'feather', 'pickle'}, optional) – Output format for per-group files. Default is 'csv'.

  • compression (str or None, optional) –

    Compression to use when writing:

    • For 'csv', forwarded to DataFrame.to_csv() as the compression argument (e.g. 'gzip').

    • For 'parquet', forwarded to DataFrame.to_parquet() (e.g. 'snappy', 'gzip').

    • Ignored for 'feather' and 'pickle' (these use their own defaults).

  • use_source_col (bool, optional) –

    If True (default) and a column named source_col exists, the helper tries to reconstruct the original file name for each group:

    • If a group has a single unique, non-null source value that looks like a filename (e.g. 'nansha_final_main_std.harmonized.csv'), that base name is used for the output (with its suffix adjusted to match output_format if needed).

    • If there are multiple unique source labels within a group, it falls back to filename_pattern.

  • source_col (str, optional) – Name of the column containing the source label (default: 'source'). This should match the column created in merge_frames_to_file() when add_source_label=True.

  • filename_pattern (str, optional) –

    Pattern used when no suitable source label is available. The following placeholders are supported:

    • {group_value} : the group value as a string

    • {group_col} : the name of the grouping column

    Example: filename_pattern="{group_col}_{group_value}_data""city_Nansha_data.csv".

  • drop_columns (iterable of str, optional) – Columns to drop from each group before saving/returning (e.g. ['source'] if you don’t want the bookkeeping column).

  • keep_columns (iterable of str, optional) – If provided, only these columns are kept (all others are dropped after any drop_columns processing is applied).

  • save (bool, optional) – If True (default), write each group to disk as a separate file. If False, no files are written; only the dict of DataFrames is returned (if return_dict=True).

  • return_dict (bool, optional) – If True (default), return a mapping {group_value: group_df}. If False, an empty dict is returned (useful when you only care about side-effect files).

  • save_kwargs (dict, optional) – Extra keyword arguments forwarded to the respective writer: DataFrame.to_csv(), DataFrame.to_parquet(), DataFrame.to_feather(), or DataFrame.to_pickle().

  • verbose (int, optional) – Verbosity level. 0 = silent, >=1 prints progress information.

  • logger (None)

Returns:

out – Dictionary mapping each group value to the corresponding DataFrame. Empty if return_dict=False.

Return type:

dict

Raises:

ValueError – If group_col is not present in the merged dataset.

Examples

>>> from geoprior.utils.geo_utils import unpack_frames_from_file
>>> unpack_frames_from_file(
...     "natcom_all_cities.parquet",
...     group_col="city",
...     output_format="csv",
... )
# -> writes e.g. 'nansha_final_main_std.harmonized.csv',
#    'zhongshan_final_main_std.harmonized.csv' (if `source` labels exist),
#    and returns a dict: {'Nansha': df_nansha, 'Zhongshan': df_zhongshan}