Source code for geoprior.utils.geo_utils

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# https://lkouadio.com
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
r"""Geospatial utility helpers for GeoPrior workflows."""

from __future__ import annotations

import os
import warnings
from collections.abc import Iterable
from pathlib import Path
from typing import (
    Any,
    Literal,
)

import numpy as np
import pandas as pd

from ..core.checks import (
    check_spatial_columns,
    exist_features,
)
from ..core.handlers import columns_manager
from ..core.io import SaveFile
from ..logging import get_logger
from .generic_utils import vlog

logger = get_logger(__name__)

PathLike = str | os.PathLike[str]
DataSource = PathLike | pd.DataFrame


__all__ = [
    "augment_city_spatiotemporal_data",
    "augment_series_features",
    "augment_spatiotemporal_data",
    "generate_dummy_pinn_data",
    "interpolate_temporal_gaps",
    "resolve_spatial_columns",
    "merge_frames_to_file",
    "unpack_frames_from_file",
]


[docs] def resolve_spatial_columns( df, spatial_cols=None, lon_col=None, lat_col=None ): """ Helper to validate and resolve spatial columns. Accepts either explicit lon/lat columns or a list of spatial_cols. Returns (lon_col, lat_col). - If lon_col and lat_col are both provided, they take precedence (warn if spatial_cols also set). - Else if spatial_cols is provided, it must yield exactly two column names. - Otherwise, error is raised. Parameters ---------- df : pd.DataFrame Input DataFrame for feature checks. spatial_cols : list[str] or None Two-element list of [lon_col, lat_col]. lon_col : str or None Name of longitude column. lat_col : str or None Name of latitude column. Returns ------- (lon_col, lat_col) : tuple of str Validated column names for longitude and latitude. Raises ------ ValueError If neither lon/lat nor valid spatial_cols is provided, or if spatial_cols len != 2. """ # Case 1: explicit lon/lat if lon_col is not None and lat_col is not None: if spatial_cols: warnings.warn( "Both lon_col/lat_col and spatial_cols set;" " spatial_cols will be ignored.", UserWarning, stacklevel=2, ) exist_features( df, features=[lon_col, lat_col], name="Longitude/Latitude", ) return lon_col, lat_col # Case 2: spatial_cols provided if spatial_cols: spatial_cols = columns_manager( spatial_cols, empty_as_none=False ) check_spatial_columns(df, spatial_cols=spatial_cols) exist_features( df, features=spatial_cols, name="Spatial columns" ) if len(spatial_cols) != 2: raise ValueError( "spatial_cols must contain exactly two" " column names" ) lon, lat = spatial_cols return lon, lat # Neither provided raise ValueError( "Either lon_col & lat_col, or spatial_cols" " must be provided." )
[docs] @SaveFile def interpolate_temporal_gaps( series_df: pd.DataFrame, time_col: str, value_cols: list[str], freq: str | None = None, method: str = "linear", order: int | None = None, fill_limit: int | None = None, fill_limit_direction: str = "forward", savefile: str | None = None, ) -> pd.DataFrame: r""" Interpolates missing values in specified columns of a time series DataFrame. This function is designed to work on a DataFrame representing a time series for a single spatial group (e.g., one monitoring location), sorted by time. If :code:`freq` is provided, the DataFrame’s index is first reindexed to that frequency, which can create NaN values for missing time steps. These NaNs, along with any pre-existing NaNs in :code:`value_cols`, are then interpolated. Let :math:`t_1 < t_2 < \dots < t_n` be the original timestamps. If :code:`freq` yields a new index :math:`\{t_i'\}` that includes times not in the original, NaNs appear at those :math:`t_i'`. Then for each column :math:`v` in :math:`\{\text{value\_cols}\}`, we perform: .. math:: v(t) \;=\; \begin{cases} \text{interpolate}(v,\;t;\;\text{method},\;\dots) & \text{for } t \in \{t_i'\}\,,\\ v(t) & \text{if } t \in \{t_1,\dots,t_n\}\text{ and not NaN.} \end{cases} Parameters ---------- series_df : pd.DataFrame Input DataFrame for a single time series, ideally sorted by :code:`time_col`. The :code:`time_col` should be convertible to datetime. time_col : str Name of the column containing datetime information. value_cols : List[str] List of column names whose missing values (NaNs) should be interpolated. freq : str or None, default None The desired frequency for the time series (e.g., 'D' for daily, 'MS' for month start, 'AS' for year start). If provided, the DataFrame is reindexed to this frequency before interpolation. This helps identify and fill gaps where entire time steps are missing. method : str, default 'linear' Interpolation method to use. Passed to `pandas.DataFrame.interpolate()`. Common methods: 'linear', 'time', 'polynomial', 'spline'. If 'polynomial' or 'spline', :code:`order` must be specified. order : int or None, default None Order for polynomial or spline interpolation. Required if :code:`method` is 'polynomial' or 'spline'. fill_limit : int or None, default None Maximum number of consecutive NaNs to fill. Passed to `pandas.DataFrame.interpolate()`. fill_limit_direction : str, default 'forward' Direction for :code:`fill_limit` ('forward', 'backward', 'both'). Passed to `pandas.DataFrame.interpolate()`. Returns ------- pd.DataFrame DataFrame with specified columns interpolated. If :code:`freq` was used, the DataFrame will have a DatetimeIndex. Other columns not in :code:`value_cols` will be forward-filled after reindexing if :code:`freq` is set, to propagate their last known values into new empty rows. Raises ------ TypeError If :code:`series_df` is not a DataFrame or if :code:`value_cols` is not a list of strings. Also if :code:`time_col` is missing from the DataFrame. ValueError If :code:`order` is required but not provided for 'polynomial' or 'spline'. Examples -------- >>> import pandas as pd >>> from geoprior.utils.geo_utils import interpolate_temporal_gaps >>> # Sample time series with missing dates >>> df = pd.DataFrame({ ... 'date': ['2020-01-01', '2020-01-03', '2020-01-06'], ... 'value': [1.0, None, 4.0] ... }) >>> df date value 0 2020-01-01 1.0 1 2020-01-03 NaN 2 2020-01-06 4.0 >>> result = interpolate_temporal_gaps( ... df, time_col='date', value_cols=['value'], freq='D' ... ) >>> result.head() date value 0 2020-01-01 1.0 1 2020-01-02 2.0 2 2020-01-03 3.0 3 2020-01-04 3.0 4 2020-01-05 3.5 Notes ----- - Ensure :code:`series_df` pertains to a single spatial group and is sorted by time for meaningful interpolation. - The 'time' method for interpolation requires the index to be a DatetimeIndex. - Polynomial or spline methods require :code:`order` to be specified. See Also -------- pandas.DataFrame.interpolate : Core interpolation method. pandas.DataFrame.asfreq : Reindex DataFrame to fixed frequency. """ if not isinstance(series_df, pd.DataFrame): raise TypeError( "Input 'series_df' must be a pandas DataFrame." ) if not isinstance(value_cols, list) or not all( isinstance(col, str) for col in value_cols ): raise TypeError( "'value_cols' must be a list of strings." ) if time_col not in series_df.columns: raise ValueError( f"Time column '{time_col}' not found in DataFrame." ) df_interpolated = series_df.copy() # Convert time column to datetime and set as index for interpolation try: df_interpolated[time_col] = pd.to_datetime( df_interpolated[time_col] ) except Exception as e: logger.error( f"Could not convert time column '{time_col}' to datetime: {e}" ) raise df_interpolated = df_interpolated.sort_values(by=time_col) # Store original index name if it exists, to restore it later if needed. original_index_name = df_interpolated.index.name df_interpolated = df_interpolated.set_index( time_col, drop=False ) original_columns = ( series_df.columns.tolist() ) # Keep track of original columns if freq: try: # Reindex to the specified frequency. This may create new rows # with NaNs. df_interpolated = df_interpolated.asfreq(freq) # Forward fill other columns that are not being interpolated # to propagate their values into the new empty rows created by # asfreq. other_cols = [ col for col in original_columns if col not in value_cols and col != time_col ] if other_cols: df_interpolated[other_cols] = df_interpolated[ other_cols ].ffill() # The time_col itself might become NaN in new rows if it was # dropped when setting index and then re-added. Ensure it's # filled from the new index. if time_col not in df_interpolated.columns: # if it was dropped df_interpolated[time_col] = ( df_interpolated.index ) else: # if it was kept, ensure it is also ffilled from the new # index df_interpolated[time_col] = ( df_interpolated.index ) except ValueError as e: logger.error( f"Invalid frequency string: '{freq}'. Error: {e}. " "Skipping reindexing." ) # If reindexing fails, return a copy of the original to avoid # partial modification return series_df.copy() except Exception as e: logger.error( f"Error during reindexing with frequency '{freq}': {e}" ) return series_df.copy() # Perform interpolation # Check if value_cols exist after potential reindexing missing_value_cols = [ vc for vc in value_cols if vc not in df_interpolated.columns ] if missing_value_cols: logger.warning( f"Value columns not found in DataFrame after potential " f"reindexing: {missing_value_cols}. Skipping interpolation for " f"these." ) value_cols = [ vc for vc in value_cols if vc in df_interpolated.columns ] if not value_cols: logger.warning( "No valid value_cols left to interpolate. Returning processed " "DataFrame." ) # If freq was applied, df_interpolated is reindexed and possibly # ffilled # If not, it's a copy of series_df with sorted datetime index df_to_return = df_interpolated.reset_index(drop=True) if ( original_index_name ): # Restore original index name if it existed df_to_return.index.name = original_index_name return df_to_return interpolate_kwargs = { "method": method, "limit": fill_limit, "limit_direction": fill_limit_direction, } if method in ["polynomial", "spline"]: if order is None: raise ValueError( f"Order must be specified for '{method}' interpolation." ) interpolate_kwargs["order"] = order if method == "time": if not isinstance( df_interpolated.index, pd.DatetimeIndex ): logger.warning( "Interpolation method 'time' requires a DatetimeIndex. " "Consider setting `freq` or ensuring `time_col` is a " "DatetimeIndex." ) # Fallback or let pandas handle it/error out try: df_interpolated[value_cols] = df_interpolated[ value_cols ].interpolate(**interpolate_kwargs) except Exception as e: logger.error( f"Error during interpolation of columns {value_cols}: {e}" ) # Return the DataFrame as it is before the erroring interpolation # attempt df_to_return = df_interpolated.reset_index(drop=True) if original_index_name: df_to_return.index.name = original_index_name return df_to_return # Reset index to restore time_col as a column and get default integer index df_interpolated = df_interpolated.reset_index(drop=True) if ( original_index_name ): # Restore original index name if it existed df_interpolated.index.name = original_index_name # Ensure original column order if possible, and all original columns # are present # This is important if asfreq added/removed rows and columns were # just ffilled final_cols_ordered = [] for col in original_columns: if col in df_interpolated.columns: final_cols_ordered.append(col) # Add any new columns that might have been created (e.g. if time_col # was index and then reset) # though current logic tries to preserve original_columns. for col in df_interpolated.columns: if col not in final_cols_ordered: final_cols_ordered.append(col) return df_interpolated[final_cols_ordered]
[docs] @SaveFile def augment_series_features( series_df: pd.DataFrame, feature_cols: list[str], noise_level: float = 0.01, noise_type: str = "gaussian", random_seed: int | None = None, savefile: str | None = None, ) -> pd.DataFrame: """ Add random noise to selected numeric feature columns. Parameters ---------- series_df : pandas.DataFrame Input DataFrame representing one or more time series. feature_cols : list of str Feature columns to augment. noise_level : float, optional Magnitude of the added noise. For Gaussian noise it scales the feature standard deviation, and for uniform noise it scales the feature range. noise_type : {'gaussian', 'uniform'}, optional Type of noise distribution to use. random_seed : int or None, optional Seed for reproducible noise generation. savefile : str or None, optional Optional output path handled by the decorator. Returns ------- pandas.DataFrame DataFrame with noise added to the selected feature columns. Raises ------ ValueError If requested feature columns are missing or ``noise_type`` is invalid. TypeError If the main inputs are of the wrong type. Notes ----- Non-numeric columns are skipped, and constant or invalid numeric ranges are left unchanged. """ if not isinstance(series_df, pd.DataFrame): raise TypeError( "Input 'series_df' must be a pandas DataFrame." ) if not isinstance(feature_cols, list) or not all( isinstance(col, str) for col in feature_cols ): raise TypeError( "'feature_cols' must be a list of strings." ) missing_cols = [ col for col in feature_cols if col not in series_df.columns ] if missing_cols: raise ValueError( f"Feature columns not found in DataFrame: {missing_cols}" ) if noise_type not in ["gaussian", "uniform"]: raise ValueError( f"Invalid noise_type: '{noise_type}'. " "Choose 'gaussian' or 'uniform'." ) if random_seed is not None: np.random.seed(random_seed) df_augmented = series_df.copy() for col in feature_cols: if not pd.api.types.is_numeric_dtype( df_augmented[col] ): logger.warning( f"Column '{col}' is not numeric. " "Skipping noise augmentation for this column." ) continue feature_values = df_augmented[col].values if noise_type == "gaussian": # Scale noise by the standard deviation of the feature col_std = np.std(feature_values) if col_std == 0 or np.isnan( col_std ): # Avoid division by zero noise = 0 if np.isnan(col_std): logger.debug( f"Std of column '{col}' is NaN. Adding zero noise." ) else: noise = np.random.normal( 0, col_std * noise_level, size=feature_values.shape, ) elif noise_type == "uniform": # Scale noise by the range of the feature col_min = np.min(feature_values) col_max = np.max(feature_values) col_range = col_max - col_min if col_range == 0 or np.isnan( col_range ): # Avoid zero range noise = 0 if np.isnan(col_range): logger.debug( f"Range of column '{col}' is NaN. Adding zero noise." ) else: noise = np.random.uniform( -col_range * noise_level / 2.0, col_range * noise_level / 2.0, size=feature_values.shape, ) df_augmented[col] = feature_values + noise.astype( feature_values.dtype ) # Ensure dtype consistency return df_augmented
[docs] @SaveFile def augment_spatiotemporal_data( df: pd.DataFrame, mode: str, group_by_cols: list[str] | None = None, time_col: str | None = None, value_cols_interpolate: list[str] | None = None, feature_cols_augment: list[str] | None = None, interpolation_kwargs: dict[str, Any] | None = None, augmentation_kwargs: dict[str, Any] | None = None, savefile: str | None = None, verbose: bool = False, ) -> pd.DataFrame: """ Apply interpolation, feature augmentation, or both to grouped data. Parameters ---------- df : pandas.DataFrame Input spatiotemporal DataFrame. mode : {'interpolate', 'augment_features', 'both'} Processing mode. Use interpolation only, feature augmentation only, or interpolation followed by augmentation. group_by_cols : list of str or None, optional Grouping columns used for per-location processing. time_col : str or None, optional Time column required when interpolation is requested. value_cols_interpolate : list of str or None, optional Value columns to interpolate when interpolation is enabled. feature_cols_augment : list of str or None, optional Feature columns to perturb when augmentation is enabled. interpolation_kwargs : dict or None, optional Keyword arguments forwarded to ``interpolate_temporal_gaps``. augmentation_kwargs : dict or None, optional Keyword arguments forwarded to ``augment_series_features``. savefile : str or None, optional Optional output path handled by the decorator. verbose : bool, optional Whether to emit progress information. Returns ------- pandas.DataFrame Processed DataFrame assembled from all groups. Raises ------ ValueError If ``mode`` is invalid or required arguments for the selected mode are missing. Notes ----- Groups are processed independently and concatenated afterward. """ if mode not in [ "interpolate", "augment_features", "both", ]: raise ValueError( "Invalid mode. Choose from 'interpolate', " "'augment_features', or 'both'." ) processed_df = df.copy() interpolation_kwargs = interpolation_kwargs or {} augmentation_kwargs = augmentation_kwargs or {} if mode in ["interpolate", "both"]: if not all( [group_by_cols, time_col, value_cols_interpolate] ): raise ValueError( "For 'interpolate' or 'both' mode, 'group_by_cols', " "'time_col', and 'value_cols_interpolate' must be provided." ) logger.info( f"Starting temporal interpolation for groups: " f"{group_by_cols}..." ) interpolated_groups = [] grouped = processed_df.groupby( group_by_cols, group_keys=True ) # group_keys=True for safety num_groups = len(grouped) for i, (name, group_df) in enumerate(grouped): if verbose: # Simple print for progress, logger can be used too print( f" Interpolating group {i + 1}/{num_groups}: {name}", end="\r", ) interpolated_group = interpolate_temporal_gaps( series_df=group_df, time_col=time_col, value_cols=value_cols_interpolate, **interpolation_kwargs, ) interpolated_groups.append(interpolated_group) if verbose: print("\nInterpolation of all groups complete.") if interpolated_groups: processed_df = pd.concat( interpolated_groups, ignore_index=True ) logger.info( "Temporal interpolation applied to all groups." ) else: logger.warning( "No groups found for interpolation or all groups were empty." ) if mode in ["augment_features", "both"]: if not feature_cols_augment: raise ValueError( "For 'augment_features' or 'both' mode, " "'feature_cols_augment' must be provided." ) logger.info( f"Starting feature augmentation for columns: " f"{feature_cols_augment}..." ) processed_df = augment_series_features( series_df=processed_df, feature_cols=feature_cols_augment, **augmentation_kwargs, ) logger.info("Feature augmentation applied.") return processed_df
[docs] def augment_city_spatiotemporal_data( df: pd.DataFrame, city: str, mode: str = "interpolate", group_by_cols: list[str] | None = None, time_col: str | None = None, value_cols_interpolate: list[str] | None = None, feature_cols_augment: list[str] | None = None, interpolation_config: dict[str, Any] | None = None, augmentation_config: dict[str, Any] | None = None, target_name: str | None = None, interpolate_target: bool = False, verbose: bool = True, coordinate_precision: int | None = None, savefile: str | None = None, ) -> pd.DataFrame: """ Apply grouped spatiotemporal augmentation with city-aware defaults. This is a convenience wrapper around ``augment_spatiotemporal_data``. It validates the requested city, optionally rounds coordinates before grouping, and forwards interpolation and augmentation configuration dictionaries. Parameters ---------- df : pandas.DataFrame Input DataFrame containing spatial, temporal, and feature columns. city : {'nansha', 'zhongshan'} City identifier used for validation and defaults. mode : {'interpolate', 'augment_features', 'both'}, optional Processing mode forwarded to ``augment_spatiotemporal_data``. group_by_cols : list of str or None, optional Grouping columns for interpolation. time_col : str or None, optional Time column used for interpolation. value_cols_interpolate : list of str or None, optional Columns to interpolate. feature_cols_augment : list of str or None, optional Columns to augment with noise. interpolation_config : dict or None, optional Keyword arguments for ``interpolate_temporal_gaps``. Typical values include ``{'freq': 'AS', 'method': 'linear'}``. augmentation_config : dict or None, optional Keyword arguments for ``augment_series_features``. Typical values include ``{'noise_level': 0.01, 'noise_type': 'gaussian'}``. target_name : str or None, optional Optional target column name used when inferring default feature sets. interpolate_target : bool, optional Whether the target should be included in default interpolation columns. verbose : bool, optional Whether to emit progress information. coordinate_precision : int or None, optional Decimal precision applied to coordinates before grouping. savefile : str or None, optional Optional output CSV path handled by the decorator. Returns ------- pandas.DataFrame Augmented DataFrame. Raises ------ ValueError If ``city`` or ``mode`` is invalid, or if required arguments are missing for the selected mode. TypeError If the main inputs are of the wrong type. """ # Validate DataFrame type if not isinstance(df, pd.DataFrame): raise TypeError( "Input 'df' must be a pandas DataFrame." ) # Validate city parameter city_lower = city.strip().lower() if city_lower not in ["nansha", "zhongshan"]: raise ValueError( "`city` must be 'nansha' or 'zhongshan'." ) logger.info( f"Processing data for city: {city_lower.capitalize()}." ) # Copy input to avoid modifying original df_city = df.copy() # Ensure 'year' (or specified time_col) is datetime # Default time_col for processing, can be overridden by user _time_col = time_col or "year" if _time_col not in df_city.columns: raise ValueError( f"Time column '{_time_col}' not found in DataFrame." ) try: # Attempt to convert to datetime if not already. # If 'year' is int like 2015, format='%Y' makes it YYYY-01-01 first_val = df_city[_time_col].iloc[0] if ( pd.api.types.is_integer(first_val) and 1000 < first_val < 3000 ): df_city[_time_col] = pd.to_datetime( df_city[_time_col], format="%Y" ) else: df_city[_time_col] = pd.to_datetime( df_city[_time_col] ) logger.debug(f"Ensured '{_time_col}' is datetime.") except Exception as e: logger.warning( f"Could not convert time column '{_time_col}' to datetime: {e}. " "Proceeding, but interpolation might behave unexpectedly." ) # --- Define default parameters --- _group_by_cols = group_by_cols or [ "longitude", "latitude", ] _group_by_cols = columns_manager( _group_by_cols, empty_as_none=False ) # Default columns to exclude from auto-selection default_exclude_cols = set(_group_by_cols + [_time_col]) # Add known categorical or ID-like columns (expand as needed) known_non_numeric_or_id_cols = { "lithology", "city", "lithology_class", # 'density_concentration', 'rainfall_category', # 'building_concentration', 'soil_thickness' } default_exclude_cols.update(known_non_numeric_or_id_cols) if target_name and target_name in df_city.columns: if not interpolate_target: # Exclude target from interpolation by default default_exclude_cols.add(target_name) numeric_cols = df_city.select_dtypes( include=np.number ).columns.tolist() _value_cols_interpolate = value_cols_interpolate if _value_cols_interpolate is None: _value_cols_interpolate = [ col for col in numeric_cols if col not in default_exclude_cols ] if ( interpolate_target and target_name and target_name in numeric_cols ): _value_cols_interpolate.append(target_name) logger.info( f"Defaulting 'value_cols_interpolate' to: " f"{_value_cols_interpolate}" ) _feature_cols_augment = feature_cols_augment if _feature_cols_augment is None: augment_exclude_cols = default_exclude_cols.copy() if target_name: augment_exclude_cols.add(target_name) _feature_cols_augment = [ col for col in numeric_cols if col not in augment_exclude_cols ] logger.info( f"Defaulting 'feature_cols_augment' to: " f"{_feature_cols_augment}" ) _interpolation_config = interpolation_config or { "freq": "AS", "method": "linear", } _augmentation_config = augmentation_config or { "noise_level": 0.01, "noise_type": "gaussian", "random_seed": None, } # --- Coordinate Precision --- if coordinate_precision is not None: if not ( isinstance(coordinate_precision, int) and coordinate_precision >= 0 ): raise ValueError( "'coordinate_precision' must be a " "non-negative integer." ) if ( "longitude" in df_city.columns and "latitude" in df_city.columns ): df_city["longitude"] = df_city["longitude"].round( coordinate_precision ) df_city["latitude"] = df_city["latitude"].round( coordinate_precision ) logger.info( f"Coordinates rounded to {coordinate_precision} " "decimal places." ) else: logger.warning( "Longitude/Latitude columns not found for rounding, " "but coordinate_precision was set." ) logger.info( f"Original DataFrame shape for augmentation: {df_city.shape}" ) # --- Call the core augmentation function --- try: df_augmented = augment_spatiotemporal_data( df=df_city, mode=mode, group_by_cols=_group_by_cols, time_col=_time_col, value_cols_interpolate=_value_cols_interpolate, feature_cols_augment=_feature_cols_augment, interpolation_kwargs=_interpolation_config, augmentation_kwargs=_augmentation_config, verbose=verbose, ) logger.info( f"Augmented DataFrame shape: {df_augmented.shape}" ) if savefile: try: # Ensure directory exists for savefile save_dir = os.path.dirname(savefile) if save_dir and not os.path.exists(save_dir): os.makedirs(save_dir, exist_ok=True) df_augmented.to_csv(savefile, index=False) logger.info( f"Augmented DataFrame saved to: {savefile}" ) except Exception as e_save: logger.error( f"Failed to save augmented DataFrame to '{savefile}': " f"{e_save}" ) return df_augmented except ValueError as ve: logger.error( f"ValueError during core augmentation process: {ve}" ) raise except TypeError as te: logger.error( f"TypeError during core augmentation process: {te}" ) raise except Exception as e: logger.error( f"An unexpected error occurred during core augmentation: {e}" ) raise
[docs] def generate_dummy_pinn_data( n_samples: int, *, year_range: tuple[float, float] | None = None, coords_range: tuple[ tuple[float, float], tuple[float, float] ] | None = None, subs_range: tuple[float, float] | None = None, gwl_range: tuple[float, float] | None = None, rainfall_range: tuple[float, float] | None = None, vars_range: dict | None = None, ) -> dict[str, np.ndarray]: """ Generate dummy PINN data dictionary with specified or default ranges. Parameters ---------- n_samples : int Number of samples to generate. year_range : tuple[float, float], optional (min_year, max_year) for integer years. Default (2000, 2025). coords_range : tuple[tuple[float, float], tuple[float, float]], optional ((lon_min, lon_max), (lat_min, lat_max)). Default ((113.0, 113.8), (22.3, 22.8)). subs_range : tuple[float, float], optional (mean_subsidence, std_subsidence) for normal distribution. Default (-20, 15). gwl_range : tuple[float, float], optional (mean_gwl, std_gwl) for normal distribution. Default (2.5, 1.0). rainfall_range : tuple[float, float], optional (min_rain, max_rain) for uniform distribution. Default (500, 2500). vars_range : dict, optional Dictionary that may contain any of the keys: 'year_range', 'coords_range', 'subs_range', 'gwl_range', 'rainfall_range'. Missing keys will fall back to defaults or to explicitly passed arguments. Returns ------- dummy_data_dict : dict[str, np.ndarray] Dictionary with keys: - "year" : integer years array - "longitude" : float longitudes array - "latitude" : float latitudes array - "subsidence" : float subsidence values array - "GWL" : float groundwater level values array - "rainfall_mm" : float rainfall values array """ # Default ranges _def_year = (2000, 2025) _def_coords = ((113.0, 113.8), (22.3, 22.8)) _def_subs = (-20.0, 15.0) _def_gwl = (2.5, 1.0) _def_rain = (500.0, 2500.0) # Merge vars_range if provided vr = vars_range or {} yr_rng = ( year_range if year_range is not None else vr.get("year_range", _def_year) ) crd_rng = ( coords_range if coords_range is not None else vr.get("coords_range", _def_coords) ) sbs_rng = ( subs_range if subs_range is not None else vr.get("subs_range", _def_subs) ) gwl_rng = ( gwl_range if gwl_range is not None else vr.get("gwl_range", _def_gwl) ) rnf_rng = ( rainfall_range if rainfall_range is not None else vr.get("rainfall_range", _def_rain) ) # Unpack ranges yr_min, yr_max = float(yr_rng[0]), float(yr_rng[1]) (lon_min, lon_max), (lat_min, lat_max) = crd_rng subs_mean, subs_std = sbs_rng gwl_mean, gwl_std = gwl_rng rain_min, rain_max = rnf_rng # Generate year integers years = np.random.randint( int(yr_min), int(yr_max), size=n_samples ) # Generate coordinates uniformly longitudes = np.random.uniform( float(lon_min), float(lon_max), size=n_samples ) latitudes = np.random.uniform( float(lat_min), float(lat_max), size=n_samples ) # Generate subsidence via normal distribution subsidence = np.random.normal( float(subs_mean), float(subs_std), size=n_samples ) # Generate GWL via normal distribution gwl = np.random.normal( float(gwl_mean), float(gwl_std), size=n_samples ) # Generate rainfall uniformly rainfall_mm = np.random.uniform( float(rain_min), float(rain_max), size=n_samples ) return { "year": years, "longitude": longitudes.astype(np.float32), "latitude": latitudes.astype(np.float32), "subsidence": subsidence.astype(np.float32), "GWL": gwl.astype(np.float32), "rainfall_mm": rainfall_mm.astype(np.float32), }
[docs] def merge_frames_to_file( sources: Iterable[DataSource], output_path: PathLike, *, output_format: Literal[ "parquet", "csv", "feather", "pickle" ] = "parquet", compression: str | None = "snappy", check_columns: Literal[ "strict", "subset", "union" ] = "strict", excel_mode: Literal[ "all_sheets", "first_sheet" ] = "all_sheets", sheet_names: Iterable[str] | None = None, add_source_label: bool = True, source_col: str = "source", sort_by: Iterable[str] | None = None, drop_duplicates: bool = False, reset_index: bool = True, save_kwargs: dict[str, Any] | None = None, verbose: int = 1, ) -> pd.DataFrame: """ Merge multiple NATCOM city datasets into a single compressed file. Parameters ---------- sources : iterable of {path-like, DataFrame} Input sources. Each element can be: * A path to a CSV file (e.g. ``"nansha_final...csv"``), * A path to an Excel workbook (one or many sheets per city), * A pre-loaded :class:`~pandas.DataFrame`. output_path : path-like Destination file path. If ``output_format='parquet'`` and the suffix is missing, ``'.parquet'`` is appended. output_format : {'parquet', 'csv', 'feather', 'pickle'}, optional Output format. Default is ``'parquet'`` for compact, columnar storage (recommended for Code Ocean). compression : str or None, optional Compression to use for the chosen format. * For ``'parquet'`` this is passed to :meth:`pandas.DataFrame.to_parquet` (e.g. ``'snappy'``, ``'gzip'``, ``'brotli'``). * For ``'csv'`` it is passed to :meth:`pandas.DataFrame.to_csv` via the ``compression`` keyword if non-``None``. * Ignored for ``'feather'`` and ``'pickle'`` (Feather uses its own defaults; pickle rarely benefits from extra compression at this layer). check_columns : {'strict', 'subset', 'union'}, optional How to handle column consistency across sources: * ``'strict'`` (default): all sources must have exactly the same set of columns (order may differ). Columns are then aligned to the order of the first DataFrame. A mismatch raises :class:`ValueError`. * ``'subset'``: all columns in the *first* DataFrame must exist in each subsequent DataFrame. Extra columns in later sources are dropped. Missing required columns raise :class:`ValueError`. * ``'union'``: columns are unioned across all sources. Any missing column in a particular source is added and filled with ``NaN`` before concatenation. excel_mode : {'all_sheets', 'first_sheet'}, optional Behaviour when a source is an Excel workbook: * ``'all_sheets'`` (default): read *all* sheets and treat each sheet as a separate DataFrame to merge. * ``'first_sheet'``: read only the first sheet. If ``sheet_names`` is provided, it takes precedence. sheet_names : iterable of str, optional Explicit sheet names to read from Excel workbooks. If provided, only these sheets are read. add_source_label : bool, optional If ``True`` (default), add a column named ``source_col`` to each chunk before concatenation. For path-like inputs, the label is derived from the file name and, when applicable, sheet name (e.g. ``"nansha_final_main_std.harmonized.csv"`` or ``"zhongshan.xlsx:Sheet1"``). For pre-loaded DataFrames, the label is ``'<in_memory>'``. source_col : str, optional Name of the column storing the source label when ``add_source_label=True``. sort_by : iterable of str, optional Optional column(s) to sort the merged DataFrame by at the end (e.g. ``['city', 'year', 'longitude', 'latitude']``). drop_duplicates : bool, optional If ``True``, drop duplicate rows at the end (after sorting). reset_index : bool, optional If ``True`` (default), reset index after concatenation. save_kwargs : dict, optional Extra keyword arguments forwarded to the corresponding ``to_*`` writer (e.g. ``to_parquet``, ``to_csv``, ``to_feather``, ``to_pickle``). verbose : int, optional Verbosity level. ``0`` = silent, ``>=1`` prints basic progress information. Returns ------- merged : pandas.DataFrame The merged DataFrame (also written to disk). Raises ------ ValueError If ``check_columns='strict'`` or ``'subset'`` and a column mismatch is detected. Examples ---------- >>> from geoprior.utils.geo_utils import merge_frames_to_file >>> merge_frames_to_file( ... sources=[ ... "nansha_final_main_std.harmonized.csv", ... "zhongshan_final_main_std.harmonized.csv", ... ], ... output_path="natcom_all_cities", ... output_format="parquet", ... compression="snappy", ... sort_by=["city", "year", "longitude", "latitude"], ... ) Notes ----- * All inputs are read fully into memory before concatenation. This is acceptable for the NATCOM subsidence datasets (``O(10^6 - 10^7)`` rows) but can be refactored to a streaming/row-group approach if needed later. * Using ``output_format='parquet'`` with compression (e.g. ``'snappy'``) is recommended for Code Ocean to minimise disk usage while keeping I/O efficient. """ save_kwargs = dict(save_kwargs or {}) sources = list(sources) if not sources: raise ValueError( "No sources provided to merge_city_frames_to_file()." ) out_path = Path(output_path) fmt = output_format.lower() if fmt not in {"parquet", "csv", "feather", "pickle"}: raise ValueError( f"Unsupported output_format={output_format!r}. " "Expected one of {'parquet','csv','feather','pickle'}." ) def _iter_frames_from_source( src: DataSource, ) -> list[pd.DataFrame]: """Internal: expand a single source into one or more frames.""" if isinstance(src, pd.DataFrame): label = "<in_memory>" return [(src.copy(), label)] p = Path(src) if not p.exists(): raise FileNotFoundError(f"Source not found: {p}") suffix = p.suffix.lower() frames_with_labels: list[ tuple[pd.DataFrame, str] ] = [] if suffix in {".csv", ".txt"}: df = pd.read_csv(p) frames_with_labels.append((df, p.name)) elif suffix in {".xls", ".xlsx"}: # Decide which sheets to read. if sheet_names is not None: sheets_to_read = list(sheet_names) elif excel_mode == "first_sheet": sheets_to_read = [ 0 ] # first sheet by position else: # 'all_sheets' # Read all sheets into a dict all_sheets = pd.read_excel(p, sheet_name=None) for sh_name, sh_df in all_sheets.items(): frames_with_labels.append( (sh_df, f"{p.name}:{sh_name}") ) return [*frames_with_labels] for sh in sheets_to_read: sh_df = pd.read_excel(p, sheet_name=sh) label = ( f"{p.name}:{sh}" if not isinstance(sh, int) else f"{p.name}:sheet{sh}" ) frames_with_labels.append((sh_df, label)) else: raise ValueError( f"Unsupported file extension for source {p!s}. " "Only CSV (.csv) and Excel (.xls, .xlsx) are allowed." ) return frames_with_labels # 1) Collect and validate/align columns frames: list[pd.DataFrame] = [] base_cols = None for src in sources: for df_chunk, label in _iter_frames_from_source(src): if base_cols is None: base_cols = list(df_chunk.columns) if verbose: print( f"[merge_city_frames] Base columns " f"({len(base_cols)}): {base_cols[:6]}..." ) else: cols_now = list(df_chunk.columns) set_base = set(base_cols) set_now = set(cols_now) if check_columns == "strict": if set_now != set_base: missing = sorted(set_base - set_now) extra = sorted(set_now - set_base) raise ValueError( "Column mismatch under 'strict' mode.\n" f" Missing in {label}: {missing}\n" f" Extra in {label}: {extra}" ) # Align column order df_chunk = df_chunk.reindex( columns=base_cols ) elif check_columns == "subset": # ensure *required* columns (base) exist missing = sorted(set_base - set_now) if missing: raise ValueError( "Column mismatch under 'subset' mode.\n" f" Missing required in {label}: {missing}" ) # drop any extra columns and align order df_chunk = df_chunk.reindex( columns=base_cols ) elif check_columns == "union": # union columns so far; fill missing with NaN union_cols = list(set_base | set_now) for c in union_cols: if c not in df_chunk.columns: df_chunk[c] = pd.NA if set_base != set(union_cols): # update base definition & expand existing frames for i, f in enumerate(frames): for c in union_cols: if c not in f.columns: frames[i][c] = pd.NA base_cols = union_cols df_chunk = df_chunk.reindex( columns=base_cols ) else: raise ValueError( f"Unknown check_columns={check_columns!r}" ) if add_source_label: if source_col in df_chunk.columns: # avoid clobbering an existing column silently df_chunk[f"{source_col}_orig"] = df_chunk[ source_col ] df_chunk[source_col] = label frames.append(df_chunk) if not frames: raise RuntimeError( "No frames were collected from the given sources." ) # 2) Concatenate and post-process merged = pd.concat( frames, axis=0, ignore_index=reset_index ) if sort_by: merged = merged.sort_values( list(sort_by), ignore_index=reset_index ) if drop_duplicates: merged = merged.drop_duplicates( ignore_index=reset_index ) if not reset_index: merged = merged # keep concatenated index as-is else: merged = merged.reset_index(drop=True) # 3) Save to disk (compressed, if requested) if fmt == "parquet": if out_path.suffix == "": out_path = out_path.with_suffix(".parquet") if verbose: print( f"[merge_frames] Writing Parquet -> {out_path} " f"(compression={compression!r})" ) merged.to_parquet( out_path, compression=compression, index=False, **save_kwargs, ) elif fmt == "csv": if out_path.suffix == "": out_path = out_path.with_suffix(".csv") if verbose: print( f"[merge_frames] Writing CSV -> {out_path} " f"(compression={compression!r})" ) if compression is not None: save_kwargs.setdefault("compression", compression) merged.to_csv(out_path, index=False, **save_kwargs) elif fmt == "feather": if out_path.suffix == "": out_path = out_path.with_suffix(".feather") if verbose: print( f"[merge_frames] Writing Feather -> {out_path}" ) # pandas uses pyarrow under the hood merged.to_feather(out_path, **save_kwargs) elif fmt == "pickle": if out_path.suffix == "": out_path = out_path.with_suffix(".pkl") if verbose: print( f"[merge_frames] Writing Pickle -> {out_path}" ) merged.to_pickle(out_path, **save_kwargs) if verbose: print( f"[merge_frames] Done. Merged shape=" f"{merged.shape}, saved as '{fmt}' at: {out_path}" ) return merged
[docs] def unpack_frames_from_file( merged: PathLike | pd.DataFrame, *, group_col: str = "city", output_dir: PathLike | None = None, output_format: Literal[ "csv", "parquet", "feather", "pickle" ] = "csv", compression: str | None = None, use_source_col: bool = True, source_col: str = "source", filename_pattern: str = "{group_value}_split", drop_columns: Iterable[str] | None = None, keep_columns: Iterable[str] | None = None, save: bool = True, return_dict: bool = True, save_kwargs: dict[str, Any] | None = None, verbose: int = 1, logger: None, ) -> dict[Any, pd.DataFrame]: """ Reverse of `merge_city_frames_to_file`: split an aggregated NATCOM dataset into per-city frames (and optionally write them to disk). Parameters ---------- merged : path-like or DataFrame Aggregated dataset. If path-like, the format is inferred from the file suffix: - ``.parquet`` → :func:`pandas.read_parquet` - ``.csv`` → :func:`pandas.read_csv` - ``.feather`` → :func:`pandas.read_feather` - ``.pkl``/``.pickle`` → :func:`pandas.read_pickle` If a :class:`~pandas.DataFrame` is passed, it is used directly. group_col : str, optional Column used to split the dataset (default: ``'city'``). Each unique value defines one output chunk. output_dir : path-like, optional Directory where per-group files are written. If ``None`` and ``merged`` is a path, the directory of ``merged`` is used. If ``merged`` is a DataFrame and ``output_dir`` is ``None``, the current working directory is used. output_format : {'csv', 'parquet', 'feather', 'pickle'}, optional Output format for per-group files. Default is ``'csv'``. compression : str or None, optional Compression to use when writing: - For ``'csv'``, forwarded to :meth:`DataFrame.to_csv` as the ``compression`` argument (e.g. ``'gzip'``). - For ``'parquet'``, forwarded to :meth:`DataFrame.to_parquet` (e.g. ``'snappy'``, ``'gzip'``). - Ignored for ``'feather'`` and ``'pickle'`` (these use their own defaults). use_source_col : bool, optional If ``True`` (default) and a column named ``source_col`` exists, the helper tries to reconstruct the *original* file name for each group: - If a group has a single unique, non-null `source` value that looks like a filename (e.g. ``'nansha_final_main_std.harmonized.csv'``), that base name is used for the output (with its suffix adjusted to match ``output_format`` if needed). - If there are multiple unique `source` labels within a group, it falls back to ``filename_pattern``. source_col : str, optional Name of the column containing the source label (default: ``'source'``). This should match the column created in :func:`merge_frames_to_file` when ``add_source_label=True``. filename_pattern : str, optional Pattern used when no suitable `source` label is available. The following placeholders are supported: - ``{group_value}`` : the group value as a string - ``{group_col}`` : the name of the grouping column Example: ``filename_pattern="{group_col}_{group_value}_data"`` → ``"city_Nansha_data.csv"``. drop_columns : iterable of str, optional Columns to drop from each group before saving/returning (e.g. ``['source']`` if you don't want the bookkeeping column). keep_columns : iterable of str, optional If provided, only these columns are kept (all others are dropped *after* any ``drop_columns`` processing is applied). save : bool, optional If ``True`` (default), write each group to disk as a separate file. If ``False``, no files are written; only the dict of DataFrames is returned (if ``return_dict=True``). return_dict : bool, optional If ``True`` (default), return a mapping ``{group_value: group_df}``. If ``False``, an empty dict is returned (useful when you only care about side-effect files). save_kwargs : dict, optional Extra keyword arguments forwarded to the respective writer: :meth:`DataFrame.to_csv`, :meth:`DataFrame.to_parquet`, :meth:`DataFrame.to_feather`, or :meth:`DataFrame.to_pickle`. verbose : int, optional Verbosity level. ``0`` = silent, ``>=1`` prints progress information. Returns ------- out : dict Dictionary mapping each group value to the corresponding :class:`~pandas.DataFrame`. Empty if ``return_dict=False``. Raises ------ ValueError If ``group_col`` is not present in the merged dataset. Examples --------- >>> from geoprior.utils.geo_utils import unpack_frames_from_file >>> unpack_frames_from_file( ... "natcom_all_cities.parquet", ... group_col="city", ... output_format="csv", ... ) # -> writes e.g. 'nansha_final_main_std.harmonized.csv', # 'zhongshan_final_main_std.harmonized.csv' (if `source` labels exist), # and returns a dict: {'Nansha': df_nansha, 'Zhongshan': df_zhongshan} """ def _log(mess, verbose=verbose, logger=logger): vlog(mess, verbose=verbose, logger=logger) save_kwargs = dict(save_kwargs or {}) # 1) Load merged DataFrame df: pd.DataFrame merged_path: Path | None = None if isinstance(merged, pd.DataFrame): df = merged.copy() else: merged_path = Path(merged) if not merged_path.exists(): raise FileNotFoundError( f"Merged file not found: {merged_path}" ) suffix = merged_path.suffix.lower() if suffix == ".parquet": df = pd.read_parquet(merged_path) elif suffix == ".csv": df = pd.read_csv(merged_path) elif suffix in {".feather", ".ft"}: df = pd.read_feather(merged_path) elif suffix in {".pkl", ".pickle"}: df = pd.read_pickle(merged_path) else: raise ValueError( f"Unsupported merged file extension {suffix!r}. " "Expected one of: '.parquet', '.csv', '.feather', '.pkl', '.pickle'." ) if group_col not in df.columns: raise ValueError( f"group_col={group_col!r} not found in merged DataFrame. " f"Available columns: {list(df.columns)}" ) # 2) Resolve output directory if output_dir is not None: out_dir = Path(output_dir) elif merged_path is not None: out_dir = merged_path.parent else: out_dir = Path.cwd() out_dir.mkdir(parents=True, exist_ok=True) fmt = output_format.lower() if fmt not in {"csv", "parquet", "feather", "pickle"}: raise ValueError( f"Unsupported output_format={output_format!r}. " "Expected one of {'csv','parquet','feather','pickle'}." ) def _ext_for_format() -> str: if fmt == "csv": return ".csv" elif fmt == "parquet": return ".parquet" elif fmt == "feather": return ".feather" else: # 'pickle' return ".pkl" def _filename_from_source_label( label: str, group_value: Any, ) -> str: """ Try to infer a good filename from the `source` label. Adjust the suffix to match `output_format`, but keep the stem. """ p = Path(str(label)) stem = p.stem # 'nansha_final_main_std.harmonized' ext = _ext_for_format() # Special case: if output_format is csv and original already ends # with .csv, keep exactly that file name. if fmt == "csv" and p.suffix.lower() == ".csv": return p.name return stem + ext # 3) Split, post-process, and save out_frames: dict[Any, pd.DataFrame] = {} groups = df.groupby(group_col, dropna=False) if verbose: _log( f"[unpack_{group_col}_frames] Splitting merged DataFrame of shape " f"{df.shape} into {len(groups)} group(s) by {group_col!r}..." ) for gval, gdf in groups: chunk = gdf.copy() # Optional column dropping/keeping if drop_columns: cols_to_drop = [ c for c in drop_columns if c in chunk.columns ] if cols_to_drop: chunk = chunk.drop(columns=cols_to_drop) if keep_columns is not None: keep_set = [ c for c in keep_columns if c in chunk.columns ] chunk = chunk[keep_set] # Decide file name filename: str | None = None if save: if use_source_col and source_col in gdf.columns: labels = ( gdf[source_col] .dropna() .astype(str) .unique() ) if labels.size == 1: # Nice case: one clear source label for this group filename = _filename_from_source_label( labels[0], gval ) elif labels.size > 1 and verbose: _log( f"[unpack_{group_col}_frames] Group {group_col}={gval!r} " f"has multiple source labels {labels.tolist()}; " "falling back to filename_pattern." ) if filename is None: # Fallback to pattern gv_str = str(gval).strip().replace(" ", "_") base = filename_pattern.format( group_value=gv_str, group_col=group_col, ) if not any( base.lower().endswith(ext) for ext in [ ".csv", ".parquet", ".feather", ".pkl", ".pickle", ] ): base += _ext_for_format() filename = base out_path = out_dir / filename if verbose: _log( f"[unpack_{group_col}_frames] Writing group {group_col}={gval!r} " f"to {out_path} (format={fmt}, n={len(chunk)})" ) # Save according to format if fmt == "csv": if compression is not None: save_kwargs.setdefault( "compression", compression ) chunk.to_csv( out_path, index=False, **save_kwargs ) elif fmt == "parquet": chunk.to_parquet( out_path, compression=compression, index=False, **save_kwargs, ) elif fmt == "feather": chunk.to_feather(out_path, **save_kwargs) else: # 'pickle' chunk.to_pickle(out_path, **save_kwargs) if return_dict: out_frames[gval] = chunk if verbose: _log( f"[unpack_{group_col}_frames] Done. Produced {len(out_frames)} " f"group DataFrame(s). Files saved in: {out_dir}" ) return out_frames