geoprior.utils.geo_utils#
Geospatial utility helpers for GeoPrior workflows.
Functions
|
Apply grouped spatiotemporal augmentation with city-aware defaults. |
|
Add random noise to selected numeric feature columns. |
|
Apply interpolation, feature augmentation, or both to grouped data. |
|
Generate dummy PINN data dictionary with specified or default ranges. |
|
Interpolates missing values in specified columns of a time series DataFrame. |
|
Merge multiple NATCOM city datasets into a single compressed file. |
|
Helper to validate and resolve spatial columns. |
|
Reverse of merge_city_frames_to_file: split an aggregated NATCOM dataset into per-city frames (and optionally write them to disk). |
- geoprior.utils.geo_utils.augment_city_spatiotemporal_data(df, city, mode='interpolate', group_by_cols=None, time_col=None, value_cols_interpolate=None, feature_cols_augment=None, interpolation_config=None, augmentation_config=None, target_name=None, interpolate_target=False, verbose=True, coordinate_precision=None, savefile=None)[source]#
Apply grouped spatiotemporal augmentation with city-aware defaults.
This is a convenience wrapper around
augment_spatiotemporal_data. It validates the requested city, optionally rounds coordinates before grouping, and forwards interpolation and augmentation configuration dictionaries.- Parameters:
df (
pandas.DataFrame) – Input DataFrame containing spatial, temporal, and feature columns.city (
{'nansha', 'zhongshan'}) – City identifier used for validation and defaults.mode (
{'interpolate', 'augment_features', 'both'}, optional) – Processing mode forwarded toaugment_spatiotemporal_data.group_by_cols (
listofstrorNone, optional) – Grouping columns for interpolation.time_col (
strorNone, optional) – Time column used for interpolation.value_cols_interpolate (
listofstrorNone, optional) – Columns to interpolate.feature_cols_augment (
listofstrorNone, optional) – Columns to augment with noise.interpolation_config (
dictorNone, optional) – Keyword arguments forinterpolate_temporal_gaps. Typical values include{'freq': 'AS', 'method': 'linear'}.augmentation_config (
dictorNone, optional) – Keyword arguments foraugment_series_features. Typical values include{'noise_level': 0.01, 'noise_type': 'gaussian'}.target_name (
strorNone, optional) – Optional target column name used when inferring default feature sets.interpolate_target (
bool, optional) – Whether the target should be included in default interpolation columns.verbose (
bool, optional) – Whether to emit progress information.coordinate_precision (
intorNone, optional) – Decimal precision applied to coordinates before grouping.savefile (
strorNone, optional) – Optional output CSV path handled by the decorator.
- Returns:
Augmented DataFrame.
- Return type:
- Raises:
ValueError – If
cityormodeis invalid, or if required arguments are missing for the selected mode.TypeError – If the main inputs are of the wrong type.
- geoprior.utils.geo_utils.augment_series_features(series_df, feature_cols, noise_level=0.01, noise_type='gaussian', random_seed=None, savefile=None)[source]#
Add random noise to selected numeric feature columns.
- Parameters:
series_df (
pandas.DataFrame) – Input DataFrame representing one or more time series.noise_level (
float, optional) – Magnitude of the added noise. For Gaussian noise it scales the feature standard deviation, and for uniform noise it scales the feature range.noise_type (
{'gaussian', 'uniform'}, optional) – Type of noise distribution to use.random_seed (
intorNone, optional) – Seed for reproducible noise generation.savefile (
strorNone, optional) – Optional output path handled by the decorator.
- Returns:
DataFrame with noise added to the selected feature columns.
- Return type:
- Raises:
ValueError – If requested feature columns are missing or
noise_typeis invalid.TypeError – If the main inputs are of the wrong type.
Notes
Non-numeric columns are skipped, and constant or invalid numeric ranges are left unchanged.
- geoprior.utils.geo_utils.augment_spatiotemporal_data(df, mode, group_by_cols=None, time_col=None, value_cols_interpolate=None, feature_cols_augment=None, interpolation_kwargs=None, augmentation_kwargs=None, savefile=None, verbose=False)[source]#
Apply interpolation, feature augmentation, or both to grouped data.
- Parameters:
df (
pandas.DataFrame) – Input spatiotemporal DataFrame.mode (
{'interpolate', 'augment_features', 'both'}) – Processing mode. Use interpolation only, feature augmentation only, or interpolation followed by augmentation.group_by_cols (
listofstrorNone, optional) – Grouping columns used for per-location processing.time_col (
strorNone, optional) – Time column required when interpolation is requested.value_cols_interpolate (
listofstrorNone, optional) – Value columns to interpolate when interpolation is enabled.feature_cols_augment (
listofstrorNone, optional) – Feature columns to perturb when augmentation is enabled.interpolation_kwargs (
dictorNone, optional) – Keyword arguments forwarded tointerpolate_temporal_gaps.augmentation_kwargs (
dictorNone, optional) – Keyword arguments forwarded toaugment_series_features.savefile (
strorNone, optional) – Optional output path handled by the decorator.verbose (
bool, optional) – Whether to emit progress information.
- Returns:
Processed DataFrame assembled from all groups.
- Return type:
- Raises:
ValueError – If
modeis invalid or required arguments for the selected mode are missing.
Notes
Groups are processed independently and concatenated afterward.
- geoprior.utils.geo_utils.generate_dummy_pinn_data(n_samples, *, year_range=None, coords_range=None, subs_range=None, gwl_range=None, rainfall_range=None, vars_range=None)[source]#
Generate dummy PINN data dictionary with specified or default ranges.
- Parameters:
n_samples (
int) – Number of samples to generate.year_range (
tuple[float,float], optional) – (min_year, max_year) for integer years. Default (2000, 2025).coords_range (
tuple[tuple[float,float],tuple[float,float]], optional) – ((lon_min, lon_max), (lat_min, lat_max)). Default ((113.0, 113.8), (22.3, 22.8)).subs_range (
tuple[float,float], optional) – (mean_subsidence, std_subsidence) for normal distribution. Default (-20, 15).gwl_range (
tuple[float,float], optional) – (mean_gwl, std_gwl) for normal distribution. Default (2.5, 1.0).rainfall_range (
tuple[float,float], optional) – (min_rain, max_rain) for uniform distribution. Default (500, 2500).vars_range (
dict, optional) – Dictionary that may contain any of the keys: ‘year_range’, ‘coords_range’, ‘subs_range’, ‘gwl_range’, ‘rainfall_range’. Missing keys will fall back to defaults or to explicitly passed arguments.
- Returns:
dummy_data_dict –
- Dictionary with keys:
”year” : integer years array
”longitude” : float longitudes array
”latitude” : float latitudes array
”subsidence” : float subsidence values array
”GWL” : float groundwater level values array
”rainfall_mm” : float rainfall values array
- Return type:
dict[str,np.ndarray]
- geoprior.utils.geo_utils.interpolate_temporal_gaps(series_df, time_col, value_cols, freq=None, method='linear', order=None, fill_limit=None, fill_limit_direction='forward', savefile=None)[source]#
Interpolates missing values in specified columns of a time series DataFrame.
This function is designed to work on a DataFrame representing a time series for a single spatial group (e.g., one monitoring location), sorted by time. If
freqis provided, the DataFrame’s index is first reindexed to that frequency, which can create NaN values for missing time steps. These NaNs, along with any pre-existing NaNs invalue_cols, are then interpolated.Let \(t_1 < t_2 < \dots < t_n\) be the original timestamps. If
freqyields a new index \(\{t_i'\}\) that includes times not in the original, NaNs appear at those \(t_i'\). Then for each column \(v\) in \(\{\text{value\_cols}\}\), we perform:(1)#\[\begin{split}v(t) \;=\; \begin{cases} \text{interpolate}(v,\;t;\;\text{method},\;\dots) & \text{for } t \in \{t_i'\}\,,\\ v(t) & \text{if } t \in \{t_1,\dots,t_n\}\text{ and not NaN.} \end{cases}\end{split}\]- Parameters:
series_df (
pd.DataFrame) – Input DataFrame for a single time series, ideally sorted bytime_col. Thetime_colshould be convertible to datetime.time_col (
str) – Name of the column containing datetime information.value_cols (
List[str]) – List of column names whose missing values (NaNs) should be interpolated.freq (
strorNone, defaultNone) – The desired frequency for the time series (e.g., ‘D’ for daily, ‘MS’ for month start, ‘AS’ for year start). If provided, the DataFrame is reindexed to this frequency before interpolation. This helps identify and fill gaps where entire time steps are missing.method (
str, default'linear') – Interpolation method to use. Passed to pandas.DataFrame.interpolate(). Common methods: ‘linear’, ‘time’, ‘polynomial’, ‘spline’. If ‘polynomial’ or ‘spline’,ordermust be specified.order (
intorNone, defaultNone) – Order for polynomial or spline interpolation. Required ifmethodis ‘polynomial’ or ‘spline’.fill_limit (
intorNone, defaultNone) – Maximum number of consecutive NaNs to fill. Passed to pandas.DataFrame.interpolate().fill_limit_direction (
str, default'forward') – Direction forfill_limit(‘forward’, ‘backward’, ‘both’). Passed to pandas.DataFrame.interpolate().savefile (str | None)
- Returns:
DataFrame with specified columns interpolated. If
freqwas used, the DataFrame will have a DatetimeIndex. Other columns not invalue_colswill be forward-filled after reindexing iffreqis set, to propagate their last known values into new empty rows.- Return type:
pd.DataFrame- Raises:
TypeError – If
series_dfis not a DataFrame or ifvalue_colsis not a list of strings. Also iftime_colis missing from the DataFrame.ValueError – If
orderis required but not provided for ‘polynomial’ or ‘spline’.
Examples
>>> import pandas as pd >>> from geoprior.utils.geo_utils import interpolate_temporal_gaps >>> # Sample time series with missing dates >>> df = pd.DataFrame({ ... 'date': ['2020-01-01', '2020-01-03', '2020-01-06'], ... 'value': [1.0, None, 4.0] ... }) >>> df date value 0 2020-01-01 1.0 1 2020-01-03 NaN 2 2020-01-06 4.0 >>> result = interpolate_temporal_gaps( ... df, time_col='date', value_cols=['value'], freq='D' ... ) >>> result.head() date value 0 2020-01-01 1.0 1 2020-01-02 2.0 2 2020-01-03 3.0 3 2020-01-04 3.0 4 2020-01-05 3.5
Notes
Ensure
series_dfpertains to a single spatial group and is sorted by time for meaningful interpolation.The ‘time’ method for interpolation requires the index to be a DatetimeIndex.
Polynomial or spline methods require
orderto be specified.
See also
pandas.DataFrame.interpolateCore interpolation method.
pandas.DataFrame.asfreqReindex DataFrame to fixed frequency.
- geoprior.utils.geo_utils.resolve_spatial_columns(df, spatial_cols=None, lon_col=None, lat_col=None)[source]#
Helper to validate and resolve spatial columns.
Accepts either explicit lon/lat columns or a list of spatial_cols. Returns (lon_col, lat_col).
If lon_col and lat_col are both provided, they take precedence (warn if spatial_cols also set).
Else if spatial_cols is provided, it must yield exactly two column names.
Otherwise, error is raised.
- Parameters:
- Returns:
(lon_col, lat_col) – Validated column names for longitude and latitude.
- Return type:
- Raises:
ValueError – If neither lon/lat nor valid spatial_cols is provided, or if spatial_cols len != 2.
- geoprior.utils.geo_utils.merge_frames_to_file(sources, output_path, *, output_format='parquet', compression='snappy', check_columns='strict', excel_mode='all_sheets', sheet_names=None, add_source_label=True, source_col='source', sort_by=None, drop_duplicates=False, reset_index=True, save_kwargs=None, verbose=1)[source]#
Merge multiple NATCOM city datasets into a single compressed file.
- Parameters:
sources (
iterableof{path-like, DataFrame}) –Input sources. Each element can be:
A path to a CSV file (e.g.
"nansha_final...csv"),A path to an Excel workbook (one or many sheets per city),
A pre-loaded
DataFrame.
output_path (
path-like) – Destination file path. Ifoutput_format='parquet'and the suffix is missing,'.parquet'is appended.output_format (
{'parquet', 'csv', 'feather', 'pickle'}, optional) – Output format. Default is'parquet'for compact, columnar storage (recommended for Code Ocean).compression (
strorNone, optional) –Compression to use for the chosen format.
For
'parquet'this is passed topandas.DataFrame.to_parquet()(e.g.'snappy','gzip','brotli').For
'csv'it is passed topandas.DataFrame.to_csv()via thecompressionkeyword if non-None.Ignored for
'feather'and'pickle'(Feather uses its own defaults; pickle rarely benefits from extra compression at this layer).
check_columns (
{'strict', 'subset', 'union'}, optional) –How to handle column consistency across sources:
'strict'(default): all sources must have exactly the same set of columns (order may differ). Columns are then aligned to the order of the first DataFrame. A mismatch raisesValueError.'subset': all columns in the first DataFrame must exist in each subsequent DataFrame. Extra columns in later sources are dropped. Missing required columns raiseValueError.'union': columns are unioned across all sources. Any missing column in a particular source is added and filled withNaNbefore concatenation.
excel_mode (
{'all_sheets', 'first_sheet'}, optional) –Behaviour when a source is an Excel workbook:
'all_sheets'(default): read all sheets and treat each sheet as a separate DataFrame to merge.'first_sheet': read only the first sheet.
If
sheet_namesis provided, it takes precedence.sheet_names (
iterableofstr, optional) – Explicit sheet names to read from Excel workbooks. If provided, only these sheets are read.add_source_label (
bool, optional) – IfTrue(default), add a column namedsource_colto each chunk before concatenation. For path-like inputs, the label is derived from the file name and, when applicable, sheet name (e.g."nansha_final_main_std.harmonized.csv"or"zhongshan.xlsx:Sheet1"). For pre-loaded DataFrames, the label is'<in_memory>'.source_col (
str, optional) – Name of the column storing the source label whenadd_source_label=True.sort_by (
iterableofstr, optional) – Optional column(s) to sort the merged DataFrame by at the end (e.g.['city', 'year', 'longitude', 'latitude']).drop_duplicates (
bool, optional) – IfTrue, drop duplicate rows at the end (after sorting).reset_index (
bool, optional) – IfTrue(default), reset index after concatenation.save_kwargs (
dict, optional) – Extra keyword arguments forwarded to the correspondingto_*writer (e.g.to_parquet,to_csv,to_feather,to_pickle).verbose (
int, optional) – Verbosity level.0= silent,>=1prints basic progress information.
- Returns:
merged – The merged DataFrame (also written to disk).
- Return type:
- Raises:
ValueError – If
check_columns='strict'or'subset'and a column mismatch is detected.
Examples
>>> from geoprior.utils.geo_utils import merge_frames_to_file >>> merge_frames_to_file( ... sources=[ ... "nansha_final_main_std.harmonized.csv", ... "zhongshan_final_main_std.harmonized.csv", ... ], ... output_path="natcom_all_cities", ... output_format="parquet", ... compression="snappy", ... sort_by=["city", "year", "longitude", "latitude"], ... )
Notes
All inputs are read fully into memory before concatenation. This is acceptable for the NATCOM subsidence datasets (
O(10^6 - 10^7)rows) but can be refactored to a streaming/row-group approach if needed later.Using
output_format='parquet'with compression (e.g.'snappy') is recommended for Code Ocean to minimise disk usage while keeping I/O efficient.
- geoprior.utils.geo_utils.unpack_frames_from_file(merged, *, group_col='city', output_dir=None, output_format='csv', compression=None, use_source_col=True, source_col='source', filename_pattern='{group_value}_split', drop_columns=None, keep_columns=None, save=True, return_dict=True, save_kwargs=None, verbose=1, logger)[source]#
Reverse of merge_city_frames_to_file: split an aggregated NATCOM dataset into per-city frames (and optionally write them to disk).
- Parameters:
merged (
path-likeorDataFrame) –Aggregated dataset. If path-like, the format is inferred from the file suffix:
.parquet→pandas.read_parquet().csv→pandas.read_csv().feather→pandas.read_feather().pkl/.pickle→pandas.read_pickle()
If a
DataFrameis passed, it is used directly.group_col (
str, optional) – Column used to split the dataset (default:'city'). Each unique value defines one output chunk.output_dir (
path-like, optional) – Directory where per-group files are written. IfNoneandmergedis a path, the directory ofmergedis used. Ifmergedis a DataFrame andoutput_dirisNone, the current working directory is used.output_format (
{'csv', 'parquet', 'feather', 'pickle'}, optional) – Output format for per-group files. Default is'csv'.compression (
strorNone, optional) –Compression to use when writing:
For
'csv', forwarded toDataFrame.to_csv()as thecompressionargument (e.g.'gzip').For
'parquet', forwarded toDataFrame.to_parquet()(e.g.'snappy','gzip').Ignored for
'feather'and'pickle'(these use their own defaults).
use_source_col (
bool, optional) –If
True(default) and a column namedsource_colexists, the helper tries to reconstruct the original file name for each group:If a group has a single unique, non-null source value that looks like a filename (e.g.
'nansha_final_main_std.harmonized.csv'), that base name is used for the output (with its suffix adjusted to matchoutput_formatif needed).If there are multiple unique source labels within a group, it falls back to
filename_pattern.
source_col (
str, optional) – Name of the column containing the source label (default:'source'). This should match the column created inmerge_frames_to_file()whenadd_source_label=True.filename_pattern (
str, optional) –Pattern used when no suitable source label is available. The following placeholders are supported:
{group_value}: the group value as a string{group_col}: the name of the grouping column
Example:
filename_pattern="{group_col}_{group_value}_data"→"city_Nansha_data.csv".drop_columns (
iterableofstr, optional) – Columns to drop from each group before saving/returning (e.g.['source']if you don’t want the bookkeeping column).keep_columns (
iterableofstr, optional) – If provided, only these columns are kept (all others are dropped after anydrop_columnsprocessing is applied).save (
bool, optional) – IfTrue(default), write each group to disk as a separate file. IfFalse, no files are written; only the dict of DataFrames is returned (ifreturn_dict=True).return_dict (
bool, optional) – IfTrue(default), return a mapping{group_value: group_df}. IfFalse, an empty dict is returned (useful when you only care about side-effect files).save_kwargs (
dict, optional) – Extra keyword arguments forwarded to the respective writer:DataFrame.to_csv(),DataFrame.to_parquet(),DataFrame.to_feather(), orDataFrame.to_pickle().verbose (
int, optional) – Verbosity level.0= silent,>=1prints progress information.logger (None)
- Returns:
out – Dictionary mapping each group value to the corresponding
DataFrame. Empty ifreturn_dict=False.- Return type:
- Raises:
ValueError – If
group_colis not present in the merged dataset.
Examples
>>> from geoprior.utils.geo_utils import unpack_frames_from_file >>> unpack_frames_from_file( ... "natcom_all_cities.parquet", ... group_col="city", ... output_format="csv", ... ) # -> writes e.g. 'nansha_final_main_std.harmonized.csv', # 'zhongshan_final_main_std.harmonized.csv' (if `source` labels exist), # and returns a dict: {'Nansha': df_nansha, 'Zhongshan': df_zhongshan}