Source code for geoprior.utils.spatial_utils

# SPDX-License-Identifier: Apache-2.0
# Author: LKouadio <etanoyau@gmail.com>
# Adapted from: earthai-tech/fusionlab-learn — https://github.com/earthai-tech/fusionlab-learn
# Modified for GeoPrior-v3 API.

"""
geospatial_utils - A collection of utilities for geospatial and positional
data analysis, filtering, and transformations.
"""

from __future__ import annotations

import warnings
from numbers import Real
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.patches import Circle, Patch
from scipy.spatial import cKDTree
from sklearn.cluster import (
    DBSCAN,
    AgglomerativeClustering,
    KMeans,
)
from sklearn.metrics import silhouette_score
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

from ..api.types import DataFrame
from ..compat.sklearn import (
    Interval,
    StrOptions,
    validate_params,
)
from ..core.array_manager import extract_array_from
from ..core.checks import (
    are_all_frames_valid,
    assert_ratio,
    check_spatial_columns,
    exist_features,
)
from ..core.handlers import columns_manager, resolve_label
from ..core.io import SaveFile, is_data_readable
from ..decorators import Deprecated, isdf
from .generic_utils import find_id_column
from .validator import (
    filter_valid_kwargs,
    parameter_validator,
    validate_length_range,
    validate_positive_integer,
)

HAS_TQDM = True
try:
    from tqdm import tqdm
except:
    HAS_TQDM = False

__all__ = [
    "spatial_sampling",
    "extract_coordinates",
    "batch_spatial_sampling",
    "extract_zones_from",
    "filter_position",
    "create_spatial_clusters",
    "gen_negative_samples",
    "gen_buffered_negative_samples",
    "gen_negative_samples_plus",
    "extract_spatial_roi",
    "make_forecast_ready_sample",
]


def _longest_consecutive_run(
    values: np.ndarray | list[int],
) -> int:
    """Return the longest consecutive integer run."""
    if len(values) == 0:
        return 0

    years = np.asarray(values, dtype=int)
    years = np.unique(years)
    years.sort()

    if years.size == 1:
        return 1

    diffs = np.diff(years)
    longest = 1
    current = 1

    for d in diffs:
        if d == 1:
            current += 1
            longest = max(longest, current)
        else:
            current = 1

    return int(longest)


def _export_demo_sample(
    df: pd.DataFrame,
    export_path: str | None = None,
    export_format: str | None = None,
) -> pd.DataFrame:
    """
    Export a demo sample to a non-CSV format.

    Notes
    -----
    - `savefile` is still handled by `@SaveFile`
      and remains the simplest CSV export path.
    - This helper only handles explicit non-CSV
      or forced-format exports.
    """
    if export_path is None:
        return df

    path = Path(export_path).expanduser()

    fmt = export_format
    if fmt is None:
        suffix = path.suffix.lower().lstrip(".")
        if suffix in {"xlsx", "xls"}:
            fmt = "xlsx"
        elif suffix in {"pq", "parquet"}:
            fmt = "parquet"
        elif suffix in {"json"}:
            fmt = "json"
        elif suffix in {"feather", "ftr"}:
            fmt = "feather"
        elif suffix in {"pkl", "pickle"}:
            fmt = "pickle"
        else:
            fmt = "csv"

    fmt = str(fmt).strip().lower()

    path.parent.mkdir(parents=True, exist_ok=True)

    if fmt == "csv":
        df.to_csv(path, index=False)
    elif fmt in {"xlsx", "xls", "excel"}:
        df.to_excel(path, index=False)
    elif fmt == "parquet":
        df.to_parquet(path, index=False)
    elif fmt == "json":
        df.to_json(
            path,
            orient="records",
            indent=2,
        )
    elif fmt in {"feather", "ftr"}:
        df.reset_index(drop=True).to_feather(path)
    elif fmt in {"pkl", "pickle"}:
        df.to_pickle(path)
    else:
        raise ValueError(
            "Unsupported export_format. "
            "Use one of: "
            "{'csv', 'xlsx', 'parquet', "
            "'json', 'feather', 'pickle'}."
        )

    return df


[docs] @SaveFile @is_data_readable @validate_params( { "data": ["array-like"], "method": [ StrOptions({"abs", "absolute", "relative"}), None, ], "year_mode": [ StrOptions( { "all", "latest", "earliest", "random", } ), None, ], "export_format": [ StrOptions( { "csv", "xlsx", "excel", "parquet", "json", "feather", "pickle", } ), None, ], } ) @isdf def make_forecast_ready_sample( data, sample_size=0.05, time_col: str = "year", spatial_cols: tuple[str, str] | list[str] | None = None, group_cols: tuple[str, ...] | list[str] | None = None, stratify_by: list[str] | tuple[str, ...] | None = None, spatial_bins: int | tuple[int, int] | list[int] = 10, time_steps: int = 3, forecast_horizon: int = 1, require_consecutive: bool = True, keep_years: int | None = None, year_mode: str = "latest", min_groups: int = 5, max_groups: int | None = None, columns_to_keep: list[str] | tuple[str, ...] | None = None, method: str = "abs", min_relative_ratio: float = 0.01, random_state: int | None = 42, export_path: str | None = None, export_format: str | None = None, savefile: str | None = None, sort_output: bool = True, verbose: int = 1, ) -> pd.DataFrame: """ Build a compact, forecast-ready panel sample. The function samples *spatial groups* rather than individual rows, then reconstructs the full panel for the selected groups. This is much safer for demo/testing than row-wise sampling because each sampled location keeps enough temporal history for sequence construction. Parameters ---------- data : pandas.DataFrame Input panel DataFrame. sample_size : float or int, default=0.05 Group-level sample size passed to the internal spatial sampler. - float: fraction of eligible groups - int: absolute number of eligible groups time_col : str, default='year' Time column. spatial_cols : tuple/list of str, optional Spatial coordinate columns. If None, the function searches for 'longitude' and 'latitude'. group_cols : tuple/list of str, optional Group identifier columns. If None, uses `spatial_cols`. stratify_by : list/tuple of str, optional Extra group-level columns used for stratification. Typical examples: ['lithology_class'] or ['city', 'lithology_class']. spatial_bins : int or tuple/list, default=10 Spatial bins passed to `spatial_sampling()`. time_steps : int, default=3 Lookback window length. forecast_horizon : int, default=1 Forecast horizon length. require_consecutive : bool, default=True If True, each kept group must contain at least one consecutive run of length `time_steps + forecast_horizon`. keep_years : int, optional If provided, keep only this many years per group after sampling. Must be >= `time_steps + forecast_horizon`. year_mode : {'all', 'latest', 'earliest', 'random'} How to trim years when `keep_years` is given. min_groups : int, default=5 Minimum number of eligible groups required. max_groups : int, optional Hard cap on sampled groups after spatial sampling. columns_to_keep : list/tuple of str, optional Restrict the returned columns. Group and time columns are always preserved. method : {'abs', 'absolute', 'relative'}, default='abs' Same meaning as in `spatial_sampling()`. min_relative_ratio : float, default=0.01 Minimum relative sampling ratio when `method='relative'`. random_state : int, optional Random seed. export_path : str, optional Explicit path for non-CSV export. export_format : str, optional Export format for `export_path`. If None, inferred from suffix. savefile : str, optional CSV save path handled by `@SaveFile`. sort_output : bool, default=True Sort final output by group and time. verbose : int, default=1 Verbosity level. Returns ------- pandas.DataFrame A compact panel sample that preserves group-wise temporal structure for forecast demos/tests. Examples -------- >>> from geoprior.utils.spatial_utils import make_forecast_ready_sample >>> # Small demo sample, latest 4 years per location >>> demo = make_forecast_ready_sample( data=df, sample_size=0.05, stratify_by=["lithology_class"], spatial_cols=("longitude", "latitude"), time_col="year", time_steps=3, forecast_horizon=1, keep_years=4, year_mode="latest", require_consecutive=True, savefile="demo_panel.csv", ) >>> #Slightly richer test panel, keep all years, export parquet demo = make_forecast_ready_sample( data=df, sample_size=150, stratify_by=["city", "lithology_class"], spatial_cols=("longitude", "latitude"), time_col="year", time_steps=3, forecast_horizon=1, keep_years=None, export_path="demo_panel.parquet", export_format="parquet", ) """ df = data.copy() rng = np.random.RandomState(random_state) spatial_cols = columns_manager(spatial_cols) if spatial_cols is None: spatial_cols = [] if "longitude" in df.columns: spatial_cols.append("longitude") if "latitude" in df.columns: spatial_cols.append("latitude") if not spatial_cols: raise ValueError( "No spatial columns found. Provide spatial_cols " "or ensure 'longitude'/'latitude' exist." ) if len(spatial_cols) > 2: raise ValueError( "spatial_cols can contain at most two columns." ) if group_cols is None: group_cols = tuple(spatial_cols) else: group_cols = tuple(columns_manager(group_cols)) stratify_by = ( [] if stratify_by is None else list(columns_manager(stratify_by)) ) required_cols = set(group_cols) | {time_col} required_cols |= set(spatial_cols) required_cols |= set(stratify_by) missing = [ c for c in required_cols if c not in df.columns ] if missing: raise ValueError( f"Missing required columns: {missing}" ) min_required_years = int(time_steps) + int( forecast_horizon ) if min_required_years <= 1: raise ValueError( "time_steps + forecast_horizon must be >= 2." ) if keep_years is not None: keep_years = validate_positive_integer( keep_years, "keep_years", ) if keep_years < min_required_years: raise ValueError( "keep_years must be >= " "time_steps + forecast_horizon." ) min_groups = validate_positive_integer( min_groups, "min_groups", ) if max_groups is not None: max_groups = validate_positive_integer( max_groups, "max_groups", ) if not 0 < float(min_relative_ratio) <= 1: raise ValueError( "min_relative_ratio must be in (0, 1]." ) df = df.dropna(subset=list(required_cols)).copy() if df.empty: raise ValueError( "No rows remain after dropping NA values in " "required columns." ) if not pd.api.types.is_numeric_dtype(df[time_col]): dt = pd.to_datetime(df[time_col], errors="coerce") if dt.isna().all(): raise ValueError( f"Could not coerce {time_col!r} to a " "numeric or datetime-like time column." ) df[time_col] = dt.dt.year.astype(int) else: df[time_col] = df[time_col].astype(int) stats = [] grouped = df.groupby(list(group_cols), dropna=False) for key, g in grouped: years = np.asarray( sorted(pd.unique(g[time_col])), dtype=int, ) n_years = int(years.size) longest_run = _longest_consecutive_run(years) ok = n_years >= min_required_years if require_consecutive: ok = ok and (longest_run >= min_required_years) if not ok: continue row0 = g.iloc[0] record = { "__group_key__": "||".join(map(str, key)) if isinstance(key, tuple) else str(key), "__n_years__": n_years, "__min_year__": int(years.min()), "__max_year__": int(years.max()), "__longest_run__": longest_run, } for col in group_cols: record[col] = row0[col] for col in spatial_cols: record[col] = row0[col] for col in stratify_by: record[col] = row0[col] stats.append(record) eligible = pd.DataFrame(stats) if eligible.empty: raise ValueError( "No eligible groups found. Try lowering " "time_steps/forecast_horizon, or set " "require_consecutive=False." ) if len(eligible) < min_groups: raise ValueError( f"Only {len(eligible)} eligible groups found, " f"but min_groups={min_groups}." ) # Group-level spatial sampling. select_cols = list( dict.fromkeys( list(spatial_cols) + list(stratify_by) + list(group_cols) + [ "__group_key__", "__n_years__", "__min_year__", "__max_year__", "__longest_run__", ] ) ) dups = eligible.loc[:, select_cols].columns[ eligible.loc[:, select_cols].columns.duplicated() ] if len(dups) > 0: raise RuntimeError( "Duplicate columns detected before spatial_sampling: " f"{list(dups)}" ) sampled_groups = spatial_sampling( eligible.loc[:, select_cols], sample_size=sample_size, stratify_by=stratify_by or None, spatial_bins=spatial_bins, spatial_cols=spatial_cols, method=method, min_relative_ratio=min_relative_ratio, random_state=random_state, verbose=max(0, int(verbose) - 1), ) if sampled_groups.empty: raise ValueError( "The group-level sampler returned no groups." ) if ( max_groups is not None and len(sampled_groups) > max_groups ): sampled_groups = sampled_groups.sample( n=max_groups, random_state=random_state, ) sampled = df.merge( sampled_groups.loc[ :, list(group_cols) ].drop_duplicates(), on=list(group_cols), how="inner", ) if sampled.empty: raise ValueError( "No rows were recovered after merging sampled " "groups back to the source panel." ) if keep_years is not None: parts = [] for _, g in sampled.groupby( list(group_cols), dropna=False, sort=False, ): g = g.sort_values(time_col) if len(g) <= keep_years: parts.append(g) continue if year_mode == "latest": parts.append(g.tail(keep_years)) elif year_mode == "earliest": parts.append(g.head(keep_years)) elif year_mode == "random": years = np.asarray( sorted(pd.unique(g[time_col])), dtype=int, ) chosen = np.sort( rng.choice( years, size=keep_years, replace=False, ) ) parts.append( g[g[time_col].isin(chosen)].copy() ) else: # "all" parts.append(g) sampled = pd.concat( parts, axis=0, ignore_index=True, ) if columns_to_keep is not None: keep = list(columns_manager(columns_to_keep)) keep = list( dict.fromkeys( list(group_cols) + [time_col] + keep ) ) keep = [c for c in keep if c in sampled.columns] sampled = sampled.loc[:, keep].copy() if sort_output: sampled = sampled.sort_values( list(group_cols) + [time_col] ).reset_index(drop=True) if verbose: n_groups = ( sampled.loc[ :, list(group_cols), ] .drop_duplicates() .shape[0] ) msg = ( "[INFO] Built forecast-ready sample: " f"{len(sampled)} rows, " f"{n_groups} groups, " f"min_required_years={min_required_years}" ) if keep_years is not None: msg += f", keep_years={keep_years}" print(msg) sampled = _export_demo_sample( sampled, export_path=export_path, export_format=export_format, ) return sampled
[docs] def deg_to_m_from_lat(lat_deg: float) -> tuple[float, float]: """ Approx WGS84 meters per degree at reference latitude. Returns (deg_to_m_lon, deg_to_m_lat). """ phi = np.deg2rad(lat_deg) # Common geodesy approximations (meters per degree) deg_to_m_lat = ( 111132.92 - 559.82 * np.cos(2 * phi) + 1.175 * np.cos(4 * phi) - 0.0023 * np.cos(6 * phi) ) deg_to_m_lon = ( 111412.84 * np.cos(phi) - 93.5 * np.cos(3 * phi) + 0.118 * np.cos(5 * phi) ) return float(deg_to_m_lon), float(deg_to_m_lat)
[docs] @SaveFile @isdf def extract_spatial_roi( df: pd.DataFrame, x_range: tuple[float, float], y_range: tuple[float, float], x_col: str = "longitude", y_col: str = "latitude", snap_to_closest: bool = True, savefile: str | None = None, **kwargs, ) -> pd.DataFrame: """Extracts a spatial Region of Interest (ROI) from a DataFrame. This function filters a DataFrame to include only the data points that fall within a specified rectangular bounding box defined by x and y coordinate ranges. Parameters ---------- df : pd.DataFrame The input DataFrame containing the spatial data. x_range : tuple of (float, float) A tuple containing the minimum and maximum desired values for the x-coordinate (e.g., longitude). The order does not matter. y_range : tuple of (float, float) A tuple containing the minimum and maximum desired values for the y-coordinate (e.g., latitude). The order does not matter. x_col : str, default='longitude' The name of the column in `df` that contains the x-coordinates. y_col : str, default='latitude' The name of the column in `df` that contains the y-coordinates. snap_to_closest : bool, default=True If True, and a value in `x_range` or `y_range` does not exist in the data, the function will "snap" to the nearest available coordinate in the dataset. If False, it will use the exact boundaries provided. savefile : str, optional The path to save the resulting DataFrame as a CSV file. Returns ------- pd.DataFrame A new DataFrame containing only the rows that fall within the specified spatial bounding box. Raises ------ ValueError If `x_col` or `y_col` are not found in the DataFrame, or if the range tuples are not provided correctly. """ if not all(c in df.columns for c in [x_col, y_col]): raise ValueError( f"Columns '{x_col}' and/or '{y_col}' not found in DataFrame." ) from geoprior.core.checks import check_numeric_dtype # this check the whether the xcol and y_col are numeric valid check_numeric_dtype( df[[x_col, y_col]], param_names={"X": "df[x_col, y_col]]"}, ) # Ensure ranges are correctly ordered (min, max) x_min, x_max = min(x_range), max(x_range) y_min, y_max = min(y_range), max(y_range) # Optionally snap the boundaries to the closest available data points if snap_to_closest: x_min = _find_closest_value(df[x_col], x_min) x_max = _find_closest_value(df[x_col], x_max) y_min = _find_closest_value(df[y_col], y_min) y_max = _find_closest_value(df[y_col], y_max) # Create boolean masks for the x and y ranges x_mask = (df[x_col] >= x_min) & (df[x_col] <= x_max) y_mask = (df[y_col] >= y_min) & (df[y_col] <= y_max) # Filter the DataFrame using the combined mask roi_df = df[x_mask & y_mask].copy() return roi_df
def _find_closest_value( series: pd.Series, value: float ) -> float: """Finds the value in a Series closest to a target value.""" return series.iloc[(series - value).abs().argmin()]
[docs] @SaveFile @isdf def gen_negative_samples_plus( df: pd.DataFrame, target_col: str, spatial_cols: tuple[str, str] = ("longitude", "latitude"), feature_cols: list[str] | None = None, buffer_km: float = 10, neg_feature_range: tuple[float, float] = (0, 5), num_neg_per_pos: int = 1, strategy: str = "landslide", gauge_data: pd.DataFrame | None = None, elevation_data: pd.DataFrame | None = None, similarity_features: list[str] | None = None, time_col: str | None = None, cluster_method: str = "kmeans", use_gpd: bool | str = "auto", id_col="auto", view: bool = False, savefile: str | None = None, verbose: int = 1, seed: int | None = None, ) -> pd.DataFrame: """ Generates negative samples for modeling in spatial scenarios, offering multiple strategies. The function calls `gen_buffered_negative_samples` when the ``strategy`` argument is ``'landslide'``, ``'event'``, or ``'gauge'``. It also calls `generate_negative_samples` partially in the ``'hybrid'`` strategy. Internally, each sample is augmented to produce negative instances according to a chosen method. .. math:: \\text{buffer_deg} = \\frac{\\text{buffer_km}}{111.0} The above formula approximates degrees from kilometers near the equator. The output is a combined dataset containing original positives and generated negatives. The ratio of negatives per positive is controlled by ``num_neg_per_pos``. Parameters ---------- df : pandas.DataFrame Input data containing spatial coordinates and features. target_col : str Name of the classification target column. Positive samples in `df` are labeled, negatives will be generated. spatial_cols : tuple of str, optional Columns representing longitude and latitude in `df`. feature_cols : list of str, optional Additional feature columns used to drive or constrain negative sampling processes. buffer_km : float, optional Radius in kilometers for local negative sampling. Used to compute buffer degrees. neg_feature_range : tuple of float, optional Lower and upper range for continuous features in random negative generation. num_neg_per_pos : int, optional Number of negatives to generate per positive instance. strategy : str, optional Defines the sampling approach. Options include ``'landslide'``, ``'gauge'``, ``'random_global'``, ``'temporal_shift'``, ``'clustered_negatives'``, ``'environmental_similarity'``, ``'elevation_based'``, and ``'hybrid'``. See more in :ref:`User Guide <user_guide>`. gauge_data : pandas.DataFrame, optional Reference data for gauge-based or hybrid strategies. elevation_data : pandas.DataFrame, optional Elevation records, used if ``strategy='elevation_based'``. similarity_features : list of str, optional Columns for nearest neighbor computation in ``'environmental_similarity'``. time_col : str, optional Name of the time column for ``'temporal_shift'``. Required if using that strategy. cluster_method : str, optional Clustering algorithm for ``'clustered_negatives'``. Default is ``'kmeans'``. use_gpd : bool or str, optional Indicator for whether geopandas is used in buffer-based processes. id_col : str, optional Column name used as an identifier. If `'auto'`, a default is used. view : bool, optional Flag for visualizing or previewing the results. savefile : str, optional Path to save the output dataset. If None, no file is saved. verbose : int, optional Verbosity level. Higher values yield more logs. seed : int, optional Random seed for reproducibility. If None, randomness is not fixed. Returns ------- pandas.DataFrame A combined DataFrame containing the original positive samples labeled as 1 and newly generated negative samples labeled 0. Notes ----- When ``strategy='hybrid'``, partial sets of negatives come from two distinct calls to `generate_negative_samples` for ``'landslide'`` and ``'gauge'`` sub-strategies, then merged. Examples -------- >>> from geoprior.utils.spatial_utils import gen_negative_samples_plus >>> import pandas as pd >>> df_example = pd.DataFrame({{ ... "longitude": [10.1, 10.2, 10.3], ... "latitude": [45.1, 45.2, 45.3], ... "feature": [3.4, 2.1, 6.7], ... "target": [1, 1, 1] ... }}) >>> gauge_data = pd.DataFrame({{ ... 'gauge_id': ['G1', 'G2', 'G3'], ... 'latitude': np.random.uniform(24.0, 25.0, 3), ... 'longitude': np.random.uniform(113.0, 114.0, 3) ... }}) >>> # Generate random global negatives >>> result = gen_negative_samples_plus( ... df_example, ... target_col="target", ... strategy="random_global" ... ) >>> print(result.head()) See Also -------- gen_buffered_negative_samples : Generates negative samples within a buffer region around reference events or gauges. generate_negative_samples : A simpler negative sampling utility for certain strategies. """ # If the strategy is one of the recognized # types (landslide, event, gauge), we # directly call gen_buffered_negative_samples. if str(strategy).lower() in ( "landslide", "event", "gauge", ): return gen_buffered_negative_samples( df=df, target_col=target_col, spatial_cols=spatial_cols, feature_cols=feature_cols, buffer_km=buffer_km, neg_feature_range=neg_feature_range, num_neg_per_pos=num_neg_per_pos, strategy=strategy, gauge_data=gauge_data, use_gpd=use_gpd, view=view, id_col=id_col, savefile=savefile, seed=seed, verbose=verbose, ) # If not in the above categories, we proceed # with advanced strategies. np.random.seed(seed) # Validate columns and parameters via the # _validate_negative_sampling utility. checked_values = _validate_negative_sampling( df=df, target_col=target_col, spatial_cols=spatial_cols, feature_cols=feature_cols, neg_feature_range=neg_feature_range, num_neg_per_pos=num_neg_per_pos, verbose=verbose, id_col=id_col, ) ( spatial_cols, feature_cols, neg_feature_range, num_neg_per_pos, ) = checked_values # Extract column names for lon/lat. lon_col, lat_col = spatial_cols # Convert buffer kilometers to degrees, # approximating 1 deg ~ 111 km. buffer_deg = buffer_km / 111.0 # Additional advanced strategy options. add_strategies = [ "random_global", "temporal_shift", "clustered_negatives", "environmental_similarity", "elevation_based", "hybrid", ] # Validate the selected strategy from # the advanced list. strategy = parameter_validator( "strategy", target_strs=add_strategies, deep=True )(strategy) # Collect negative samples in a list. negatives = [] # ----------------------- # RANDOM_GLOBAL strategy: # Generate uniform random coordinates # across the bounding box of df. # ----------------------- if strategy == "random_global": lon_min, lon_max = ( df[lon_col].min(), df[lon_col].max(), ) lat_min, lat_max = ( df[lat_col].min(), df[lat_col].max(), ) for _ in range(len(df) * num_neg_per_pos): sample = { lon_col: np.random.uniform(lon_min, lon_max), lat_col: np.random.uniform(lat_min, lat_max), target_col: 0, } for col in feature_cols: sample[col] = np.random.uniform( *neg_feature_range ) negatives.append(sample) # ----------------------- # TEMPORAL_SHIFT strategy: # Shift time backward (e.g. 30 days) # and mark as negative samples. # ----------------------- elif strategy == "temporal_shift": if time_col is None: raise ValueError( "Time column must be provided " "for 'temporal_shift'." ) df_shifted = df.copy() df_shifted[time_col] = pd.to_datetime( df_shifted[time_col] ) - pd.Timedelta(days=30) for _, row in df_shifted.iterrows(): sample = row.to_dict() sample[target_col] = 0 negatives.append(sample) # --------------------------- # CLUSTERED_NEGATIVES: # Cluster positive points and # sample negatives around the # resulting centroids. Two # clustering options are # supported: KMeans and DBSCAN. # --------------------------- elif strategy == "clustered_negatives": coords = df[[lon_col, lat_col]].values # --- choose clustering backend if cluster_method.lower() == "kmeans": clustering = KMeans( n_clusters=max(2, len(df) // 10), random_state=seed, ).fit(coords) centers = clustering.cluster_centers_ elif cluster_method.lower() == "dbscan": clustering = DBSCAN( eps=buffer_deg * 2, # search radius (deg) min_samples=5, ).fit(coords) labels = clustering.labels_ centers = [] for lbl in np.unique(labels): if lbl == -1: # DBSCAN noise continue pts = coords[labels == lbl] centers.append(pts.mean(axis=0)) centers = np.asarray(centers) else: raise ValueError( "cluster_method must be either " "'KMeans' or 'DBSCAN'" ) # --- generate negatives near each centroid for center in centers: for _ in range(num_neg_per_pos): lat_offset = np.random.uniform( -buffer_deg, buffer_deg ) lon_offset = np.random.uniform( -buffer_deg, buffer_deg ) sample = { lat_col: center[1] + lat_offset, lon_col: center[0] + lon_offset, target_col: 0, } for col in feature_cols: sample[col] = np.random.uniform( *neg_feature_range ) negatives.append(sample) # ----------------------- # ENVIRONMENTAL_SIMILARITY: # Nearest neighbor approach on # specific feature vectors. # ----------------------- elif strategy == "environmental_similarity": if similarity_features is None: raise ValueError( "Specify similarity_features for " "'environmental_similarity'." ) nn = NearestNeighbors(n_neighbors=1) nn.fit(df[similarity_features]) for _ in range(len(df) * num_neg_per_pos): random_vector = np.random.uniform( 0, 1, size=len(similarity_features) ) dist, idx = nn.kneighbors([random_vector]) ref = df.iloc[idx[0][0]] sample = ref.copy() sample[target_col] = 0 sample[lon_col] += np.random.uniform( -buffer_deg, buffer_deg ) sample[lat_col] += np.random.uniform( -buffer_deg, buffer_deg ) negatives.append(sample.to_dict()) # ----------------------- # ELEVATION_BASED: # Pick negative samples from # low slope areas, etc. # ----------------------- elif strategy == "elevation_based": if elevation_data is None: raise ValueError( "Provide elevation data for " "'elevation_based' strategy." ) low_slope_areas = elevation_data[ elevation_data["slope"] < buffer_km ] sample_count = min( len(df) * num_neg_per_pos, len(low_slope_areas) ) for _, row in low_slope_areas.sample( n=sample_count ).iterrows(): sample = row.to_dict() sample[target_col] = 0 negatives.append(sample) # ----------------------- # HYBRID: # Combine partial sets of # negative samples from # different strategies. # ----------------------- elif strategy == "hybrid": part1 = gen_buffered_negative_samples( df, target_col, spatial_cols, feature_cols, buffer_km, neg_feature_range, num_neg_per_pos // 2, strategy="landslide", gauge_data=None, seed=seed, verbose=0, ) part2 = gen_buffered_negative_samples( df, target_col, spatial_cols, feature_cols, buffer_km, neg_feature_range, num_neg_per_pos // 2, strategy="gauge", gauge_data=gauge_data, seed=seed, verbose=0, ) combined = pd.concat( [part1, part2], ignore_index=True ) # Deduplicate based on spatial + feature + target columns dedup_cols = ( spatial_cols + (feature_cols or []) + [target_col] ) combined = combined.drop_duplicates(subset=dedup_cols) if verbose >= 1: print( "[INFO] Final dataset after deduplication:" f" {len(combined)} samples" ) if view: _visualize_negative_sampling( df_combined=combined, base_points=df if strategy == "landslide" else gauge_data, strategy=strategy, spatial_cols=spatial_cols, target_col=target_col, buffer_km=buffer_km, title="Sample Generation via Hybrid Strategy", ) return combined # Create DataFrame of negative samples. df_neg = pd.DataFrame(negatives) # Copy df for positive samples. df_pos = df.copy() df_pos[target_col] = 1 # Combine positives and negatives. combined = pd.concat([df_pos, df_neg], ignore_index=True) if view: _visualize_negative_sampling( df_combined=combined, base_points=df, strategy=strategy, spatial_cols=spatial_cols, target_col=target_col, buffer_km=buffer_km, ) # Optionally print info about the results. if verbose >= 1: print( f"[INFO] Generated {len(df_neg)} negative " f"samples using strategy '{strategy}'" ) print( f"[INFO] Total dataset: {len(combined)} samples" ) return combined
[docs] @SaveFile @isdf def gen_buffered_negative_samples( df: pd.DataFrame, target_col: str, spatial_cols: tuple[str, str] = ("longitude", "latitude"), feature_cols: list[str] | None = None, buffer_km: float = 10, neg_feature_range: tuple[float, float] = (0, 5), num_neg_per_pos: int = 1, strategy: str = "landslide", gauge_data: pd.DataFrame | None = None, use_gpd: bool | str = "auto", id_col="auto", view: bool = False, savefile: str | None = None, seed: int | None = None, verbose: int = 1, ) -> pd.DataFrame: """ Generate buffer-based negative samples around existing points or gauge stations. This function creates additional negative samples for binary spatial events. It either takes an existing landslide dataset (when `strategy` is `'landslide'`) or a separate gauge dataset (if `strategy` is `'gauge'`) to serve as the base points for generating negatives within a circular buffer. The function validates input columns and parameters via `_validate_negatives_sampling` before constructing synthetic samples. Parameters ---------- df : pandas.DataFrame The DataFrame containing positive event samples (e.g., landslides). Must include `<target_col>` and `<spatial_cols>`. target_col : str Name of the binary target column (1 for event, 0 for no event). spatial_cols : tuple of str, default ('longitude', 'latitude') Indicates which columns hold `<longitude>` and `latitude` in `df`. feature_cols : list of str, optional Additional feature columns to simulate or copy for negatives. If ``None``, all columns except `<spatial_cols>` and `target_col` are used. buffer_km : float, default 10 The radial distance in kilometers for sampling negative points around each base point. neg_feature_range : tuple of float, default (0, 5) A numeric range from which feature values are drawn for negative samples if the column is numeric. num_neg_per_pos : int, default 1 Number of negatives to generate per positive (landslide) or gauge point. strategy : str, default 'landslide' Defines the base from which negative samples are generated. Use ``'landslide'`` or ``'event'`` to sample around rows in ``df``. Use ``'gauge'`` to sample around rows in ``gauge_data``. gauge_data : pandas.DataFrame, optional Required if ``strategy`` is ``'gauge'``. Must contain ``spatial_cols``. use_gpd : bool or 'auto', default 'auto' If ``'auto'``, attempts to use GeoPandas for visualization if installed. Otherwise, falls back to Matplotlib. This parameter is forwarded to the underlying visualization function. id_col : str or list of str, default 'auto' Column(s) representing IDs in ``df``. If ``'auto'``, the function tries to detect possible ID columns. Used by ``_validate_negatives_sampling``. view : bool, default False Whether to visualize the sampled negatives around the base points. savefile : str, optional If provided, saves the final combined dataset (positives and negatives) to a CSV file at the specified path. seed : int, optional Seed for NumPy's random generator, ensuring reproducible offsets in negative sampling. verbose : int, default 1 Controls console messages: ``1`` for minimal, ``2`` for more detailed logs. Returns ------- pandas.DataFrame The combined dataset containing both the original (positive) rows, labeled with ``target_col`` = 1, and the newly generated negative rows, labeled ``target_col`` = 0. Methods ------- ``_validate_negatives_sampling`` Validates required columns and parameters, including ``num_neg_per_pos`` and ``neg_feature_range``. ``visualize_negative_sampling`` Generates a plot showing the negative samples around the base points if ``view`` is True. Notes ----- - If ``strategy`` is ``'gauge'``, ``gauge_data`` must be provided and contain columns ``longitude`` and ``latitude``. - When ``view`` is True, circles are drawn to illustrate the buffer radius. - The ratio of 1 degree to roughly 111 km is an approximation and can vary slightly by latitude. Formally, a buffer in degrees :math:`\\Delta` is computed by: .. math:: \\Delta = \\frac{\\text{buffer\\_km}}{111}, where :math:`111` is an approximate km-per-degree conversion factor. Each base point :math:`(lat, lon)` spawns :math:`n` negatives, each offset by :math:`\\delta_{lat}`, :math:`\\delta_{lon}` drawn from :math:`U(-\\Delta, \\Delta)`. Examples -------- Below is an illustration of how to generate negative samples around both existing event locations (strategy=`landslide`) and separate gauge stations (strategy=`gauge`) using ``gen_buffered_negative_samples``. First, we simulate a small DataFrame of positive landslide samples with rainfall attributes, as well as a separate DataFrame for gauge stations: >>> import numpy as np >>> import pandas as pd >>> np.random.seed(42) >>> positive_samples = pd.DataFrame({ ... 'id': [1, 2, 3, 4, 5], ... 'latitude': np.random.uniform(24.0, 25.0, 5), ... 'longitude': np.random.uniform(113.0, 114.0, 5), ... 'rainfall_day_1': np.random.randint(10, 30, 5), ... 'rainfall_day_2': np.random.randint(10, 30, 5), ... 'rainfall_day_3': np.random.randint(10, 30, 5), ... 'rainfall_day_4': np.random.randint(10, 30, 5), ... 'rainfall_day_5': np.random.randint(10, 30, 5), ... 'landslide': [1]*5 ... }) >>> gauge_data = pd.DataFrame({ ... 'gauge_id': ['G1', 'G2', 'G3'], ... 'latitude': np.random.uniform(24.0, 25.0, 3), ... 'longitude': np.random.uniform(113.0, 114.0, 3) ... }) We then call ``gen_buffered_negative_samples`` to produce negatives around these data using two different strategies: >>> from geoprior.utils.spatial_utils import gen_buffered_negative_samples >>> # Generate negatives around landslide points >>> results_landslide = generate_negative_samples_with( ... df=positive_samples, ... target_col='landslide', ... spatial_cols=('longitude', 'latitude'), ... feature_cols=[f'rainfall_day_{i+1}' for i in range(5)], ... buffer_km=10, ... num_neg_per_pos=1, ... strategy='landslide', ... verbose=1 ... ) >>> # Generate negatives around the gauge stations >>> results_gauge = gen_buffered_negative_samples( ... df=positive_samples, ... target_col='landslide', ... spatial_cols=('longitude', 'latitude'), ... feature_cols=[f'rainfall_day_{i+1}' for i in range(5)], ... buffer_km=10, ... num_neg_per_pos=1, ... strategy='gauge', ... gauge_data=gauge_data, ... verbose=1 ... ) See Also -------- generate_negative_samples: Generate synthetic negative samples for spatial binary classification tasks. _validate_negatives_sampling: Ensures inputs and parameters are correct. visualize_negative_sampling : Plots the positive and negative points for inspection. """ # See for reproducibility strategy = ( "landslide" if strategy.lower() in ("landslide", "event") else strategy ) np.random.seed(seed) # Validate input and columns checked_values = _validate_negative_sampling( df=df, target_col=target_col, spatial_cols=spatial_cols, feature_cols=feature_cols, neg_feature_range=neg_feature_range, num_neg_per_pos=num_neg_per_pos, verbose=verbose, id_col=id_col, ) ( spatial_cols, feature_cols, neg_feature_range, num_neg_per_pos, ) = checked_values lon_col, lat_col = spatial_cols if verbose >= 1: print( f"[INFO] Generating negative samples using strategy: {strategy}" ) buffer_deg = buffer_km / 111.0 negatives = [] if strategy == "gauge": if gauge_data is None: raise ValueError( "Gauge data must be provided for 'gauge' strategy." ) base_points = ( df if strategy == "landslide" else gauge_data ) if base_points is None or not all( c in base_points.columns for c in spatial_cols ): raise ValueError( "Missing gauge data or invalid spatial columns." ) for _, point in base_points.iterrows(): for _ in range(num_neg_per_pos): lat_offset = np.random.uniform( -buffer_deg, buffer_deg ) lon_offset = np.random.uniform( -buffer_deg, buffer_deg ) new_lat = point[lat_col] + lat_offset new_lon = point[lon_col] + lon_offset sample = { lat_col: new_lat, lon_col: new_lon, target_col: 0, } for col in feature_cols: sample[col] = np.random.uniform( *neg_feature_range ) negatives.append(sample) df_neg = pd.DataFrame(negatives) df_pos = df.copy() df_pos[target_col] = 1 combined = pd.concat([df_pos, df_neg], ignore_index=True) if verbose >= 1: print( f"[INFO] Generated {len(df_neg)} negative samples" ) print( f"[INFO] Total dataset: {len(combined)} samples" ) if view: _visualize_negative_sampling( df_combined=combined, base_points=base_points, strategy=strategy, spatial_cols=spatial_cols, target_col=target_col, buffer_km=buffer_km, ) return combined
def _visualize_negative_sampling( df_combined, base_points, strategy, spatial_cols, target_col, buffer_km=10, s=50, title=None, ): lon_col, lat_col = spatial_cols buffer_deg = buffer_km / 111.0 fig, ax = plt.subplots(figsize=(10, 8)) colors = {1: "red", 0: "green"} labels = {1: "Positive", 0: "Negative"} for val in [1, 0]: subset = df_combined[df_combined[target_col] == val] ax.scatter( subset[lon_col], subset[lat_col], color=colors[val], label=f"{labels[val]} Samples", s=s, ) for _, row in base_points.iterrows(): circle = Circle( (row[lon_col], row[lat_col]), buffer_deg, color="blue", alpha=0.2, ) ax.add_patch(circle) if strategy == "gauge": ax.scatter( base_points[lon_col], base_points[lat_col], color="blue", label="Gauges", marker="x", s=+20, ) if strategy == "clustered_negatives": ax.scatter( base_points[lon_col], base_points[lat_col], color="blue", label="Cluster Centers", marker="P", s=s + 30, ) ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") ax.set_title( title or f"Negative Sample Generation Strategy: {strategy}" ) ax.legend( handles=[ Patch(color="red", label="Positive Samples"), Patch(color="green", label="Negative Samples"), Patch( color="blue", alpha=0.2, label=f"{buffer_km} km Buffer", ), ] ) ax.grid(True, **{"linestyle": ":", "alpha": 0.7}) plt.tight_layout() plt.show()
[docs] @SaveFile @isdf def gen_negative_samples( df: DataFrame, target_col: str, spatial_cols: tuple[str, str] = ("longitude", "latitude"), feature_cols: list[str] | None = None, buffer_km: float = 10, neg_feature_range: tuple[float, float] = (0, 5), num_neg_per_pos: int = 1, use_gpd: bool | str = "auto", view: bool = False, savefile: str | None = None, verbose: int = 1, ): r""" Generate synthetic negative samples for spatial binary classification tasks. This function creates additional samples labeled as non-events within a specified spatial buffer around the positive (event) observations. The main idea is to generate negative examples that reflect realistic conditions but have not triggered an event, thereby assisting models in distinguishing occurrences from non-occurrences. Parameters ---------- df : pandas.DataFrame Input DataFrame containing the positive samples (events). Must include both the target column and the specified spatial columns. target_col : str Column name for the binary target (e.g. `landslide`). Rows where this column is 1 (or True) are considered positive samples. spatial_cols : tuple of str, default ('longitude', 'latitude') Tuple specifying the longitude> and latitude column names in `df`. feature_cols : list of str or None, default None List of feature columns to use or to simulate for generated negatives. If ``None``, the function automatically detects numeric and categorical columns excluding `spatial_cols` and `target_col`. buffer_km : float, default 10 Spatial buffer in kilometers used to define the radius around each positive sample within which negative samples are created. neg_feature_range : tuple of int, default (0, 5) Value range (minimum, maximum) used for simulating numeric feature values in negative samples if the corresponding feature column does not exist in `<df>`. num_neg_per_pos : int, default 1 Number of negative samples to generate per positive sample. For instance, if ``num_neg_per_pos=2``, each positive sample spawns two negatives. use_gpd : str, default 'auto' If set to 'auto', the function tries to import GeoPandas for visualization. If `'none'`, no GeoPandas usage will occur. view : bool, default False Whether to visualize the generated samples on a map. Attempts to use `geopandas` if installed; falls back to `matplotlib` if `'auto'` is chosen and GeoPandas is not available. savefile : str or None, default None Path to which the resulting DataFrame is saved if provided. Handled by the decorator that wraps this function. verbose : int, default 1 Verbosity level. `0` for silent, `1` for progress indication, `2` for more messages, `3` for debugging output. Returns ------- pandas.DataFrame Combined DataFrame with both original positive samples and newly generated negative samples. The `<target_col>` is 1 for positives and 0 for negatives. Methods ------- `columns_manager` This internal function is used to handle the processing of columns for features and spatial parameters. Notes ----- - If a feature column exists in `df`, the negative samples will copy or randomly select categories for categorical columns, and sample integers within ``neg_feature_range`` for numeric columns. - If `feature_cols` is empty or does not exist in `df`, the function simulates all values for negative samples. - When `view=True`, circles depicting the buffer zone around each positive sample are drawn for visualization. - The approximation of 1 degree to roughly 111 km varies slightly depending on latitude. Mathematically, we define the spatial buffer in degrees as: .. math:: \\Delta = \\frac{\\text{buffer_km}}{111.0}, where :math:`111.0` km approximates the distance of one degree of latitude or longitude. For each positive sample at location :math:`(lat, lon)`, we generate :math:`n` new points with offsets :math:`\\delta_{lat}` and :math:`\\delta_{lon}`, each drawn from a uniform distribution :math:`U(-\\Delta, \\Delta)`: .. math:: \\begin{aligned} &lat_{new} = lat + \\delta_{lat},\\\\ &lon_{new} = lon + \\delta_{lon}. \\end{aligned} Combined with randomly sampled or inferred feature values, these new samples serve as negative examples for modeling tasks such as landslide prediction. Examples -------- >>> from geoprior.utils.spatial_utils import gen_negative_samples >>> import pandas as pd >>> import numpy as np >>> df_pos = pd.DataFrame({ ... 'latitude': np.random.uniform(24.0, 25.0, 5), ... 'longitude': np.random.uniform(113.0, 114.0, 5), ... 'rainfall_day_1': np.random.randint(10, 30, 5), ... 'rainfall_day_2': np.random.randint(10, 30, 5), ... 'rainfall_day_3': np.random.randint(10, 30, 5), ... 'rainfall_day_4': np.random.randint(10, 30, 5), ... 'rainfall_day_5': np.random.randint(10, 30, 5), ... 'landslide': 1 ... }) >>> combined = gen_negative_samples( ... df=df_pos, ... target_col='landslide', ... buffer_km=10, ... num_neg_per_pos=2, ... view=False, ... verbose=2 ... ) >>> print(combined.head()) See Also -------- check_spatial_columns : Ensures the existence of required spatial columns. exist_features : Verifies the presence of specified features in `<df>`. columns_manager : Handles both feature and spatial columns for processing. """ # Validate input and columns checked_values = _validate_negative_sampling( df=df, target_col=target_col, spatial_cols=spatial_cols, feature_cols=feature_cols, neg_feature_range=neg_feature_range, num_neg_per_pos=num_neg_per_pos, verbose=verbose, ) ( spatial_cols, feature_cols, neg_feature_range, num_neg_per_pos, ) = checked_values lon_col, lat_col = spatial_cols # Check GeoPandas HAS_GPD = False try: import geopandas as gpd from shapely.geometry import Point HAS_GPD = True except ImportError: HAS_GPD = False # Buffer conversion (km -> deg) buffer_deg = buffer_km / 111.0 if verbose >= 2: print( f"[INFO] Generating negative samples within " f"{buffer_km} km buffer zones." ) if verbose >= 3: print(f"[DEBUG] Buffer in degrees: {buffer_deg:.4f}") # Generate negative samples per row negative_samples = [] iterator = df.iterrows() if verbose >= 1 and HAS_TQDM: iterator = tqdm( iterator, total=len(df), desc="Generating negatives", ) for _, row in iterator: for _ in range(num_neg_per_pos): lat_offset = np.random.uniform( -buffer_deg, buffer_deg ) lon_offset = np.random.uniform( -buffer_deg, buffer_deg ) new_lat = row[lat_col] + lat_offset new_lon = row[lon_col] + lon_offset sample = { lat_col: new_lat, lon_col: new_lon, target_col: 0, } # Simulate or copy feature columns for negs for col in feature_cols: # Categorical features if ( df[col].dtype == "object" or df[col].dtype.name == "category" ): unique_vals = df[col].dropna().unique() if len(unique_vals) > 0: sample[col] = np.random.choice( unique_vals ) else: sample[col] = None # Numeric features elif np.issubdtype(df[col].dtype, np.number): sample[col] = np.random.randint( neg_feature_range[0], neg_feature_range[1] + 1, ) else: sample[col] = None negative_samples.append(sample) df_negative = pd.DataFrame(negative_samples) # Label positive set and combine df_positive = df.copy() df_positive[target_col] = 1 combined = pd.concat( [df_positive, df_negative], ignore_index=True ) if verbose >= 2: print( f"\n[INFO] Final dataset: {len(df_positive)} " f"positive and {len(df_negative)} negative " f"samples." ) # Optional View/Plot if view: if (use_gpd == "auto") and HAS_GPD: if verbose >= 2: print("[INFO] Visualizing with GeoPandas.") geometry = [ Point(xy) for xy in zip( combined[lon_col], combined[lat_col], strict=False, ) ] gdf = gpd.GeoDataFrame( combined, geometry=geometry ) fig, ax = plt.subplots(figsize=(10, 8)) for label, color, name in zip( [1, 0], ["red", "green"], ["Positive", "Negative"], strict=False, ): gdf[gdf[target_col] == label].plot( ax=ax, color=color, label=f"{name} Sample", markersize=50, ) for _, row in df_positive.iterrows(): circle = plt.Circle( (row[lon_col], row[lat_col]), buffer_deg, color="blue", alpha=0.2, ) ax.add_patch(circle) else: if verbose >= 2: print("[INFO] Visualizing with Matplotlib.") fig, ax = plt.subplots(figsize=(10, 8)) for label, color, name in zip( [1, 0], ["red", "green"], ["Positive", "Negative"], strict=False, ): subset = combined[ combined[target_col] == label ] ax.scatter( subset[lon_col], subset[lat_col], color=color, label=f"{name} Sample", s=50, ) for _, row in df_positive.iterrows(): circle = plt.Circle( (row[lon_col], row[lat_col]), buffer_deg, color="blue", alpha=0.2, ) ax.add_patch(circle) ax.set_title( "Negative Sample Generation with Spatial Buffers" ) ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") ax.legend( handles=[ Patch(color="red", label="Positive Samples"), Patch( color="green", label="Negative Samples" ), Patch( color="blue", alpha=0.2, label=f"{buffer_km} km Buffer", ), ] ) ax.grid(True, **{"linestyle": ":", "alpha": 0.7}) plt.tight_layout() plt.show() return combined
def _validate_negative_sampling( df: DataFrame, target_col: str, spatial_cols: tuple[str, str] = ("longitude", "latitude"), feature_cols: list[str] | None = None, neg_feature_range: tuple[float, float] = (0, 5), num_neg_per_pos: int = 1, id_col: str | None = "auto", verbose: int = 0, ): """ Helper that validates input parameters and column configurations for the generation of negative samples. Ensures required columns exist, processes feature and spatial columns, and checks numeric parameters like `neg_feature_range` and `num_neg_per_pos`. """ # Validate input and columns exist_features( df, features=target_col, name=f"Target '{target_col}'" ) feature_cols = columns_manager(feature_cols) spatial_cols = columns_manager(spatial_cols) check_spatial_columns(df, spatial_cols=spatial_cols) exist_features( df, features=spatial_cols, name="Spatial columns" ) neg_feature_range = validate_length_range( neg_feature_range, param_name="neg_feature_range" ) num_neg_per_pos = validate_positive_integer( num_neg_per_pos, "num_neg_per_pos", ) # detect id columns if auto if str(id_col).lower() == "auto": id_col = find_id_column( df, strategy="naive", errors="ignore" ) id_col = columns_manager(id_col, empty_as_none=False) # If no feature_cols provided, auto-detect from remaining columns # including if feature_cols is None: excluded_cols = ( list(spatial_cols) + [target_col] + id_col ) feature_cols = [ col for col in df.columns if col not in excluded_cols ] if verbose >= 2: print( f"[INFO] Auto-detected feature_cols:" f" {feature_cols}" ) feature_cols = columns_manager( feature_cols, empty_as_none=False ) if len(feature_cols) == 0: raise ValueError( "No feature columns found. Please " "specify `feature_cols` explicitly." ) # Warn if a feature column does not exist in df for col in feature_cols: if col not in df.columns and verbose >= 2: print( f"[WARN] Feature column '{col}' not " "found in df; it will be simulated." ) return ( spatial_cols, feature_cols, neg_feature_range, num_neg_per_pos, )
[docs] @SaveFile @isdf def create_spatial_clusters( df: pd.DataFrame, spatial_cols: list[str] | None = None, cluster_col: str = "region", n_clusters: int | None = None, algorithm: str = "kmeans", view: bool = True, figsize: tuple = (14, 10), s: int = 60, plot_style: str = "seaborn", cmap: str = "tab20", show_grid: bool = True, grid_props: dict = None, auto_scale: bool = True, savefile: str | None = None, verbose: int = 1, **kwargs, ) -> pd.DataFrame: """ Cluster 2D spatial data in ``df`` using `<algorithm>` and optionally plot the results. This function, `<create_spatial_clusters>`, extracts two coordinate columns from `<df>` to form clusters via methods such as 'kmeans', 'dbscan', or 'agglo' (agglomerative). It uses the function `filter_valid_kwargs` (when relevant) to strip out invalid parameters for certain estimators, and writes cluster labels into `<cluster_col>`. Parameters ---------- df : pandas.DataFrame Input DataFrame holding spatial coordinates and optional other fields. spatial_cols : list of str, optional Two-column list for x and y coordinates. Defaults to ``['longitude','latitude']`` if None. cluster_col : str, default='region' Name of the column to store the assigned cluster labels. n_clusters : int, optional Number of clusters to form. If not provided for KMeans, it is auto-detected. For DBSCAN or Agglomerative, a warning is issued if not set. algorithm : str, default='kmeans' Choice of clustering algorithm among ``['kmeans','dbscan','agglo']``. view : bool, default=True If True, displays a scatterplot of the final clusters. figsize : tuple, default=(14, 10) Size of the displayed figure for the cluster plot. s : int, default=60 Marker size in the scatterplot. plot_style : str, default='seaborn' Matplotlib style used for the plot. cmap : str, default='tab20' Colormap name used to differentiate clusters. show_grid : bool, default=True Toggles grid lines on or off. grid_props : dict, optional Additional keyword arguments controlling the grid style. auto_scale : bool, default=True If True, standardize coordinates before clustering. savefile : str, optional File path to save the data with an additional `<cluster_col>` storing the assigned cluster labels if desired. verbose : int, default=1 Controls console logs. Higher values yield more details about scaling and cluster detection. **kwargs Additional keyword arguments passed to the chosen algorithm (filtered by `filter_valid_kwargs` for KMeans, DBSCAN, AgglomerativeClustering ). Returns ------- pandas.DataFrame A copy of `<df>` with an additional `<cluster_col>` storing the assigned cluster labels. Notes ----- If `<auto_scale>` is True, it uses a standard scaler to normalize the coordinate columns. The scatterplot is generated using the library seaborn for enhanced styling. By default, for `<algorithm>` = "kmeans", the model attempts to minimize: .. math:: J = \\sum_{i=1}^{N} \\min_{\\mu_j} \\lVert x_i - \\mu_j \\rVert^2 where :math:`x_i` are the scaled or raw 2D coordinates in `<df>`. The function can optionally auto-detect ``n_clusters`` using a silhouette and elbow analysis if not provided. Examples -------- >>> from geoprior.utils.spatial_utils import create_spatial_clusters >>> import pandas as pd >>> df = pd.DataFrame({ ... "longitude": [0.1, 0.2, 2.2, 2.3], ... "latitude": [1.0, 1.1, 2.1, 2.2] ... }) >>> # KMeans with auto scale and auto-detect k >>> result = create_spatial_clusters( ... df=df, ... algorithm="kmeans", ... view=True ... ) >>> # DBSCAN with custom arguments >>> result_db = create_spatial_clusters( ... df=df, ... algorithm="dbscan", ... eps=0.5, ... min_samples=2 ... ) See Also -------- filter_valid_kwargs : Helps discard unsupported keyword arguments for chosen estimators. """ # Confirm required columns exist in DataFrame # This prevents missing key data issues if spatial_cols is None: spatial_cols = ["longitude", "latitude"] assert all(col in df.columns for col in spatial_cols), ( "Missing spatial columns" ) assert len(spatial_cols) == 2, ( f"Need exactly 2 spatial columns. Got {len(spatial_cols)}" ) assert algorithm in ["kmeans", "dbscan", "agglo"], ( "Invalid algorithm. Expect one of ['kmeans', 'dbscan', 'agglo']." ) # Use requested plotting style plt.style.use(plot_style) # Copy DataFrame to avoid modification local_df = df.copy() # Extract coordinates from the spatial columns coords = local_df[spatial_cols].values # Debug info about data shape if verbosity is high if verbose >= 2: print(f"DataFrame shape: {local_df.shape}") print("Initial coords sample:", coords[:5]) # Scale coordinates to standardize range, if requested if auto_scale: if verbose >= 1: print("Scaling coordinates...") scaler = StandardScaler() coords = scaler.fit_transform(coords) # Debug info about scaled coords if verbosity is high if verbose >= 2: print("Scaled coords sample:", coords[:5]) # Determine an optimal number of clusters if none provided if n_clusters is None: if algorithm == "kmeans": if verbose >= 1: print( "Auto-detecting optimal number of clusters..." ) n_clusters = _auto_detect_k( coords=coords, verbose=verbose, show_grid=show_grid, grid_props=grid_props, ) else: # Warn if user expects auto-detection with non-kmeans warnings.warn( "Auto-cluster detection only supported for KMeans", stacklevel=2, ) # Notify user about the clustering approach if verbose >= 1: print(f"Clustering with {algorithm.upper()}...") # Initialize the selected clustering algorithm if algorithm == "kmeans": kwargs = filter_valid_kwargs(KMeans, kwargs) clusterer = KMeans( n_clusters=n_clusters, random_state=42, **kwargs ) elif algorithm == "dbscan": kwargs = filter_valid_kwargs(DBSCAN, kwargs) clusterer = DBSCAN(**kwargs) else: # algorithm == 'agglo' kwargs = filter_valid_kwargs( AgglomerativeClustering, kwargs ) clusterer = AgglomerativeClustering( n_clusters=n_clusters, **kwargs ) # Fit the model and predict cluster labels labels = clusterer.fit_predict(coords) local_df[cluster_col] = labels # Debug info about clustering output if verbosity is high if verbose >= 2: unique_labels = np.unique(labels) print(f"Unique cluster labels: {unique_labels}") # Plot the clusters if requested if view: _plot_clusters( df=local_df, spatial_cols=spatial_cols, cluster_col=cluster_col, figsize=figsize, cmap=cmap, algorithm=algorithm, s=s, show_grid=show_grid, grid_props=grid_props, ) # Return the DataFrame with assigned cluster labels return local_df
def _auto_detect_k( coords: np.ndarray, verbose: int, max_k: int = 10, show_grid: bool = True, grid_props: dict = None, ) -> int: # Evaluate multiple k values using # elbow (distortion) and silhouette scores distortions = [] silhouettes = [] K_range = range(2, max_k + 1) if verbose >= 1: print(f"Evaluating k from 2 to {max_k}...") for k in K_range: kmeans = KMeans( n_clusters=k, random_state=42, n_init="auto" ) kmeans.fit(coords) distortions.append(kmeans.inertia_) silhouettes.append( silhouette_score(coords, kmeans.labels_) ) # Detailed iteration log if verbosity is high if verbose >= 2: output = "k={:<5} | Distortion={:^28} | Silhouette={:^20}".format( k, f"{distortions[-1]:.3f}", f"{silhouettes[-1]:.3f}", ) print(output) # Plot elbow and silhouette analyses fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5)) ax1.plot(K_range, distortions, "bo-") ax1.set( xlabel="Number of clusters", ylabel="Distortion", title="Elbow Method", ) ax2.plot(K_range, silhouettes, "go-") ax2.set( xlabel="Number of clusters", ylabel="Silhouette Score", title="Silhouette Analysis", ) # Enable gridlines if grid_props is None: grid_props = {"linestyle": ":", "alpha": 0.7} if show_grid: ax1.grid(show_grid, **grid_props) else: ax1.grid(show_grid) if show_grid: ax2.grid(show_grid, **grid_props) else: ax2.grid(show_grid) plt.tight_layout() plt.show() # Use silhouette score peak to suggest optimal k optimal_k = np.argmax(silhouettes) + 2 if verbose >= 1: print(f"Suggested optimal k: {optimal_k}") return optimal_k def _plot_clusters( df: pd.DataFrame, spatial_cols: list[str], cluster_col: str, figsize: tuple, cmap: str, s: int, algorithm: str, show_grid: bool = True, grid_props: dict = None, ) -> None: import seaborn as sns # Create a scatterplot to visualize clustered data plt.figure(figsize=figsize) ax = sns.scatterplot( x=spatial_cols[0], y=spatial_cols[1], hue=cluster_col, palette=cmap, data=df, s=s, edgecolor="k", linewidth=0.5, alpha=0.8, legend="auto", ) # Professional labeling and presentation plt.title( f"{algorithm.upper()} Clustering - " f"{df[cluster_col].nunique()} Clusters", fontsize=14, pad=20, ) plt.xlabel(spatial_cols[0], fontsize=12) plt.ylabel(spatial_cols[1], fontsize=12) # Enable gridlines if show_grid: plt.grid( show_grid, **( grid_props or {"linestyle": ":", "alpha": 0.7} ), ) else: plt.grid(show_grid) # Adjust and position legend plt.legend( title="Cluster", bbox_to_anchor=(1.05, 1), loc="upper left", ) # Annotate cluster labels near their median positions for cluster in df[cluster_col].unique(): # Skip noise cluster (-1) for DBSCAN if cluster == -1: continue median_position = df[df[cluster_col] == cluster][ spatial_cols ].median() plt.text( median_position[0], median_position[1], str(cluster), fontdict=dict(weight="bold", size=10), bbox=dict( facecolor="white", alpha=0.8, edgecolor="none" ), ) # Hide top/right spines, tighten layout ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.figure.tight_layout() # Display the final clustering plot plt.show()
[docs] @validate_params( { "threshold": [Interval(Real, 0, 1, closed="neither")], "error": [StrOptions({"raise", "warn", "ignore"})], "pos": [Real, "array-like"], } ) @isdf def filter_position( df, pos, pos_cols=None, find_closest=True, threshold=0.01, error="raise", verbose=0, ): """ filter_position is a utility that filters a pandas.DataFrame based on user-specified spatial positions. It can match positions exactly or compute distances to find the closest points within a threshold. For a single dimension, the distance is computed as: .. math:: d = |x - p| For multi-dimensional data with n coordinates, the Euclidean distance is computed as: .. math:: d = \\sqrt{\\sum_{i=1}^n (x_i - p_i)^2} Parameters ------------ df : pandas.DataFrame The DataFrame that will be filtered. This parameter is essential and must contain columns referenced by `pos_cols` if ``pos_cols`` is not None. pos : float or tuple of floats The reference position(s) to match or approximate. When `pos_cols` is None, `pos` is interpreted as an index value. Otherwise, each value in `pos` aligns with a specific column in `pos_cols`. pos_cols : str or tuple of str, optional Name(s) of the column(s) in `df` to match against `pos`. If ``pos_cols=None``, then `pos` is treated as an index filter. If multiple columns are given (e.g., latitude and longitude), each coordinate in `pos` should correspond to one column in `pos_cols`. find_closest : bool, optional If True, nearest-neighbor filtering is performed within the distance `threshold`. If False, exact matches are used. threshold : float, optional The maximum distance within which points are considered a match if `find_closest` is True. The unit corresponds to the column data (e.g., degrees for geographic lat/lon). error : {'raise', 'warn', 'ignore'}, optional Specifies how to handle dimension mismatches or missing values. If ``'raise'``, a ValueError will be raised. If ``'warn'``, a warning is printed and extra values are ignored. If ``'ignore'``, mismatches are silently ignored. verbose : int, optional Controls the level of output messages: - 0: No output - 1: Basic info - 2: Additional details - >=3: Comprehensive summary Returns ------- pandas.DataFrame A new DataFrame that contains only rows matching or approximating the specified position(s) within the given threshold if `find_closest` is True. Notes ----- When `pos_cols` is None, the function attempts to filter by DataFrame index using the first element of `pos`. This approach may fail for multi-level indexes unless ``error='warn'`` or ``error='ignore'`` is used to bypass the dimension mismatch. Examples -------- >>> from geoprior.utils.spatial_utils import filter_position >>> import pandas as pd >>> df = pd.DataFrame({ ... 'lat': [113.309998, 113.310001], ... 'lon': [22.831362, 22.831364] ... }) >>> # Exact match >>> result_exact = filter_position(df, pos=(113.309998, ... 22.831362), ... pos_cols=('lat', 'lon'), ... find_closest=False) >>> # Nearest match with threshold >>> result_close = filter_position(df, ... pos=(113.31, ... 22.83), ... pos_cols=('lat', ... 'lon'), ... find_closest=True, ... threshold=0.01) See Also -------- geoprior.utils.data_utils.truncate_data: Truncate multiple DataFrames based on spatial coordinates or index alignment with a base DataFrame. """ # Initialize filtered_df as the original DataFrame filtered_df = df.copy() # Convert pos to tuple if it's not already if not isinstance(pos, tuple | list): pos = (pos,) # Helper function to handle errors def _handle_error(msg, ex_msg=""): if error == "raise": raise ValueError(msg) elif error == "warn": warnings.warn(f"{msg}{ex_msg}", stacklevel=2) # if error == 'ignore', do nothing # If no position columns specified, # treat pos as DataFrame index if pos_cols is None: if len(pos) > 1: _handle_error( "Multiple position values provided but no pos_cols. " "Unable to match multi-level index. ", "Using the first position value only.", ) idx = pos[ 0 ] # Consider only the first if multiple are given if idx not in df.index: _handle_error( f"Index '{idx}' not found in DataFrame. ", "Returning empty DataFrame.", ) return filtered_df.iloc[ 0:0 ] # Return empty DF if not found # Filter by index filtered_df = filtered_df.loc[[idx]] return filtered_df # Ensure pos_cols is tuple for consistency if not isinstance(pos_cols, tuple | list): pos_cols = (pos_cols,) pos_cols = columns_manager(pos_cols) # Check dimension match between pos and pos_cols if len(pos) != len(pos_cols): # If mismatch, handle based on error param _handle_error( f"pos ({len(pos)}) and pos_cols ({len(pos_cols)}) lengths " "do not match.", " Extra values or columns will be ignored.", ) # If ignoring or warning, align up to the shortest length min_len = min(len(pos), len(pos_cols)) pos = pos[:min_len] pos_cols = pos_cols[:min_len] # If find_closest is disabled, do exact matching if not find_closest: # Build query for exact match mask = True for p_val, col in zip(pos, pos_cols, strict=False): mask &= filtered_df[col] == p_val filtered_df = filtered_df[mask] return filtered_df # If find_closest is enabled, use threshold-based filtering # Compute distance (Euclidean if multiple columns, absolute if single) if len(pos_cols) == 1: # Single dimension: absolute difference col = pos_cols[0] distance = (filtered_df[col] - pos[0]).abs() filtered_df = filtered_df[distance <= threshold] else: # Multi-dimensional: Euclidean distance # Summation of squared diffs for each column squared_diff = 0 for p_val, col in zip(pos, pos_cols, strict=False): squared_diff += (filtered_df[col] - p_val) ** 2 distance = squared_diff**0.5 filtered_df = filtered_df[distance <= threshold] # If verbose, provide basic info if verbose >= 1: print( f"Filtered {len(df)} rows to {len(filtered_df)} rows " f"within threshold {threshold}." ) # If verbose >= 2, show sample of distance if verbose >= 2 and not filtered_df.empty: print("Example distance values within threshold:") print(distance[distance <= threshold].head()) return filtered_df
[docs] @is_data_readable(data_to_read="data") @SaveFile @validate_params( { "z": ["array-like", str], "threshold": [ "array-like", StrOptions({"auto"}), Real, ], "condition": [ StrOptions({"auto", "above", "below", "between"}) ], "use_negative_criteria": [bool], "percentile": [Real], "x": ["array-like", str, None], "y": ["array-like", str, None], "data": ["array-like", None], } ) def extract_zones_from( z, threshold="auto", condition="auto", use_negative_criteria=True, percentile=10, x=None, y=None, data=None, view=False, plot_type="scatter", figsize=(8, 6), savefile=None, axis_off=False, show_grid=True, **kwargs, ): """ Extract zones by filtering values against a threshold rule. Parameters ---------- z : array-like, pandas.Series, or str Input data to filter. If ``z`` is a string, it is interpreted as a column name in ``data``. threshold : {'auto'} or float or int or tuple, optional Filtering criterion. Use ``'auto'`` for percentile-based thresholding, a scalar for a single cutoff, or a length-2 tuple for interval filtering. condition : {'auto', 'above', 'below', 'between'}, optional Relation between the data and the threshold. use_negative_criteria : bool, optional Controls the automatic condition when ``condition='auto'``. percentile : int or float, optional Percentile used when ``threshold='auto'``. x, y : array-like, pandas.Series, str, or None, optional Optional coordinates or column names used for plotting. data : pandas.DataFrame or None, optional Data source used when ``x``, ``y``, or ``z`` are column names. view : bool, optional Whether to visualize the filtered result. plot_type : str, optional Plot type used when ``view=True``. Common values include ``'scatter'``, ``'line'``, and ``'hist'``. figsize : tuple, optional Figure size for plotting. savefile : str or None, optional Optional path used when saving the figure. axis_off : bool, optional Whether to hide axes in the plot. show_grid : bool, optional Whether to display the plot grid. **kwargs : dict Additional plotting keyword arguments. Returns ------- object Filtered values and any optional plotting outputs defined by the implementation. Notes ----- When ``x``, ``y``, or ``z`` are passed as strings, the function relies on ``extract_array_from`` to retrieve the corresponding arrays from ``data``. """ # 1) If z, x, y are strings, extract from DataFrame if data is not None: z, x, y = extract_array_from( data, z, x, y, handle_unknown="raise", # raise if columns not found error="raise", check_size=True, # ensure x,y,z have same length ) # 2) Ensure z, x, y are arrays z = np.asarray(z) if not isinstance(z, np.ndarray) else z x = ( np.asarray(x) if (x is not None and not isinstance(x, np.ndarray)) else x ) y = ( np.asarray(y) if (y is not None and not isinstance(y, np.ndarray)) else y ) # 3) If threshold='auto', compute it using percentiles # and decide condition based on use_negative_criteria if ( isinstance(threshold, str) and threshold.lower() == "auto" ): if use_negative_criteria: thr_value = np.percentile( z, percentile ) # e.g., lower percentile condition = "below" else: thr_value = np.percentile(z, 100 - percentile) condition = "above" threshold = thr_value # 4) Build a mask depending on single-value or range threshold if ( isinstance(threshold, tuple | list) and len(threshold) == 2 ): threshold = validate_length_range( threshold, param_name="Threshold" ) if condition == "auto": condition = ( "between" # default if user didn't specify ) if condition.lower() == "between": mask = (z >= threshold[0]) & (z <= threshold[1]) else: raise ValueError( "For tuple/list threshold, specify condition='between'" " or handle logic manually." ) else: # Single numeric threshold if condition == "auto": condition = ( "below" if use_negative_criteria else "above" ) if condition.lower() == "above": mask = z > threshold elif condition.lower() == "below": mask = z < threshold else: raise ValueError( f"condition={condition} not valid for a single numeric threshold." ) # 5) Filter the arrays z_filtered = z[mask] x_filtered = x[mask] if x is not None else None y_filtered = y[mask] if y is not None else None # 6) Prepare the output # If x & y exist, return a DataFrame with columns [x, y, z]. # If only x, return [x, z]. # Otherwise, return just the filtered z as a Series. x_name = resolve_label(x, default_name="x") y_name = resolve_label(y, default_name="y") z_name = resolve_label(z, default_name="z") if x_filtered is not None and y_filtered is not None: result = pd.DataFrame( { x_name: x_filtered, y_name: y_filtered, z_name: z_filtered, } ) elif x_filtered is not None: result = pd.DataFrame( {x_name: x_filtered, z_name: z_filtered} ) else: result = pd.Series(z_filtered) # 7) If view=True, build a plot according to plot_type and data availability if view: plt.figure(figsize=figsize) # - If x & y provided, do a 2D scatter of x vs y (colored by z or sized by z). # - If only x provided, do 1D plot with z as the values. # - If neither x nor y, do a histogram of z. if x_filtered is not None and y_filtered is not None: if plot_type == "scatter": plt.scatter( x_filtered, y_filtered, c=z_filtered, **kwargs, ) plt.colorbar(label=z_name) plt.xlabel(x_name) plt.ylabel(y_name) plt.title("Filtered Scatter Plot") elif plot_type == "line": # Plot lines in x-y plane, ignoring z plt.plot(x_filtered, y_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(y_name) plt.title("Filtered Line Plot") else: # fallback plt.scatter(x_filtered, y_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(y_name) plt.title(f"Fallback to Scatter: {plot_type}") elif x_filtered is not None: # only x is given if plot_type == "scatter": plt.scatter(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title("Filtered Scatter (x vs z)") elif plot_type == "line": plt.plot(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title("Filtered Line (x vs z)") elif plot_type == "hist": plt.hist(z_filtered, **kwargs) plt.xlabel(z_name) plt.title("Filtered Histogram (z)") else: # fallback plt.scatter(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title(f"Fallback to Scatter: {plot_type}") else: # No x or y => just do a histogram of z plt.hist(z_filtered, **kwargs) plt.xlabel(z_name) plt.title("Filtered Histogram (z)") if axis_off: plt.axis("off") if not show_grid: plt.grid(False) plt.tight_layout() plt.show() return result
@SaveFile def dual_merge( df1: pd.DataFrame, df2: pd.DataFrame, feature_cols: list | tuple = ( "longitude", "latitude", ), find_closest: bool = False, force_coords: bool = False, threshold: float = 0.01, how: str = "inner", savefile: str | None = None, ) -> pd.DataFrame: """ Merge two DataFrames based on specified feature columns. The function can match the features exactly or find the closest matches within a specified distance threshold. It also allows for overwriting coordinates or feature values from one DataFrame to another when a close match is found. Parameters ---------- df1 : pd.DataFrame The first DataFrame to be merged. It contains the primary data along with the feature columns (e.g., longitude, latitude) to be merged on. df2 : pd.DataFrame The second DataFrame to be merged. It contains the data to be matched with `df1` based on the specified feature columns. feature_cols : tuple or list, default ``('longitude', 'latitude')`` The names of the columns in each DataFrame to merge on. It should contain two columns representing features, such as coordinates (longitude, latitude) or other relevant attributes. These columns will be used to match the rows from `df1` to `df2`. find_closest : bool, default ``False`` If ``True``, the function will attempt to find the closest points in ``df2`` for each point in ``df1`` within the specified distance threshold (`threshold`). If no exact match is found, the closest point within the threshold will be considered. force_coords : bool, default ``False`` If ``True``, when the closest points are found, the coordinates of `df1` will overwrite those of `df2` for the matched points. threshold : float, default ``0.01`` The maximum distance threshold within which points will be considered as "close" for the closest point matching. The value is in the same unit as the feature columns (e.g., degrees for latitude/longitude). how : str, default ``'inner'`` The type of merge to perform. Options include: - ``'inner'``: Only includes points that appear in both DataFrames. - ``'left'``: All points from `df1` are included; unmatched points from `df2` are excluded. - ``'right'``: All points from `df2` are included; unmatched points from `df1` are excluded. - ``'outer'``: Includes all points from both DataFrames, with NaN for unmatched points. Returns ------- pd.DataFrame A DataFrame containing the merged data based on the specified feature columns and merge type. If ``find_closest=True``, it will contain the closest matches within the specified threshold, with coordinates from ``df1`` overwritten if ``force_coordinates=True``. Notes ----- - This function uses a KDTree for efficient nearest-neighbor searching when ``find_closest=True``. This is useful when dealing with large datasets that may not have exact coordinate matches. - When ``force_coordinates=True``, the coordinates from ``df1`` will overwrite those from ``df2`` for the closest points. However, other feature values will be kept from ``df2``. Examples -------- >>> import pandas as pd >>> from geoprior.utils.datautils import dual_merge >>> df1 = pd.DataFrame({ >>> 'longitude': [1.1, 1.2, 1.3], >>> 'latitude': [2.1, 2.2, 2.3], >>> 'value1': [10, 20, 30] >>> }) >>> df2 = pd.DataFrame({ >>> 'longitude': [1.1, 1.4], >>> 'latitude': [2.1, 2.4], >>> 'value2': [100, 200] >>> }) >>> result = dual_merge(df1, df2, feature_cols=('longitude', 'latitude'), >>> find_closest=True, threshold=0.05) >>> print(result) longitude latitude value1 value2 0 1.1 2.1 10 100 See Also -------- scipy.spatial.cKDTree: Used for finding the closest points in `df2` when ``find_closest=True``. pandas.merge: pandas.DataFrame.merge The pandas merge function, used to merge DataFrames based on columns. """ # Ensure feature_cols are valid and both contain two elements feature_cols = columns_manager( feature_cols, empty_as_none=False ) if len(feature_cols) != 2: raise ValueError( "feature_cols must contain exactly two features (e.g., longitude, latitude)" ) # check wether df1 and df2 are both dataframes. are_all_frames_valid(df1, df2) # Extract columns from feature_cols for both DataFrames feature1_1, feature1_2 = feature_cols feature2_1, feature2_2 = feature_cols # Filter for relevant columns in both DataFrames df1_coords = df1[[feature1_1, feature1_2]] df2_coords = df2[[feature2_1, feature2_2]] if find_closest: # Use KDTree for fast nearest-neighbor search tree = cKDTree(df2_coords.values) # Query for the closest points in df2 for each point in df1 dist, indices = tree.query( df1_coords.values, distance_upper_bound=threshold ) # Filter out points that couldn't find a close match valid_idx = dist != np.inf df1_coords_closest = df1_coords.iloc[valid_idx] df2_coords_closest = df2_coords.iloc[ indices[valid_idx] ] if force_coords: # Force coordinates of df1 to overwrite df2 df2_coords_closest[feature2_1] = ( df1_coords_closest[feature1_1].values ) df2_coords_closest[feature2_2] = ( df1_coords_closest[feature1_2].values ) # Update df1 with the closest matches from df2 df1 = df1.iloc[valid_idx] df2 = df2.iloc[indices[valid_idx]] # Perform the merge based on the feature columns merged_data = pd.merge( df1, df2, how=how, left_on=[feature1_1, feature1_2], right_on=[feature2_1, feature2_2], ) return merged_data
[docs] @isdf def extract_coordinates( df: pd.DataFrame, as_frame: bool = False, drop_xy: bool = False, error: bool | str = "raise", verbose: int = 0, ) -> tuple[ tuple[float, float] | pd.DataFrame | None, pd.DataFrame, tuple[str, str], ]: """ Extract coordinate columns or their midpoint from a DataFrame. Parameters ---------- df : pandas.DataFrame DataFrame expected to contain longitude/latitude or easting/northing columns. as_frame : bool, optional If ``True``, return the coordinate columns as a DataFrame. Otherwise return their midpoint. drop_xy : bool, optional If ``True``, remove detected coordinate columns from the returned DataFrame. error : bool or {'raise', 'warn', 'ignore'}, optional Error-handling policy for invalid inputs. verbose : int, optional Verbosity level for detection messages. Returns ------- tuple Tuple containing the extracted coordinates or midpoint, the DataFrame with optional coordinate removal, and the detected coordinate-column names. Notes ----- Longitude/latitude are preferred over easting/northing when both are present. """ def rename_if_exists( val: str, col: pd.Index, default: str ) -> pd.DataFrame: """Rename column in `d` if `val` is found in column names.""" match = list(filter(lambda x: val in x.lower(), col)) if match: df.rename( columns={match[0]: default}, inplace=True ) return df # Validate input is a DataFrame if not ( hasattr(df, "columns") and hasattr(df, "__array__") ): emsg = ( "Expected a DataFrame containing coordinates (`longitude`/" "`latitude` or `easting`/`northing`). Got type: " f"{type(df).__name__!r}" ) error = str(error).lower().strip() if error == "raise": raise TypeError(emsg) if error == "warn": warnings.warn(emsg, stacklevel=2) return None, df, () # Rename columns to standardized names if they contain coordinate values for name, std_name in zip( ["lat", "lon", "east", "north"], ["latitude", "longitude", "easting", "northing"], strict=False, ): df = rename_if_exists(name, df.columns, std_name) # Check for and prioritize coordinate columns coord_columns = [] for x, y in [ ("longitude", "latitude"), ("easting", "northing"), ]: if x in df.columns and y in df.columns: coord_columns = [x, y] break # Extract coordinates as DataFrame or midpoint if coord_columns: xy = ( df[coord_columns] if as_frame else tuple( np.nanmean(df[coord_columns].values, axis=0) ) ) else: xy = None # Drop coordinates if `drop_xy=True` if drop_xy and coord_columns: df.drop(columns=coord_columns, inplace=True) # Verbose messaging if verbose > 0: print( "###", "No" if not coord_columns else coord_columns, "coordinates found.", ) return xy, df, tuple(coord_columns)
@Deprecated( reason=( "This function is deprecated and will be removed in future versions. " "Please use `extract_coordinates` instead, which provides enhanced " "flexibility and robustness for coordinates extraction." ) ) @isdf def get_xy_coordinates( df, as_frame=False, drop_xy=False, raise_exception=True, verbose=0, ): """Check whether the coordinate values x, y exist in the data. Parameters ------------ df: Dataframe Frame that is expected to contain the longitude/latitude or easting/northing coordinates. Note if all types of coordinates are included in the data frame, the longitude/latitude takes the priority. as_frame: bool, default= False, Returns the coordinates values if included in the data as a frame rather than computing the middle points of the line drop_xy: bool, default=False, Drop the coordinates in the data and return the data transformed inplace raise_exception: bool, default=True raise error message if data is not a dataframe. If set to ``False``, exception is converted to a warning instead. To mute the warning set `raise_exception` to ``mute`` verbose: int, default=0 Send message whether coordinates are detected. returns -------- xy, d, xynames: Tuple xy : tuple of float ( longitude, latitude) or (easting/northing ) if `as_frame` is set to ``True``. d: Dataframe transformed (coordinated removed ) or not xynames: str, the name of coordinates detected. Examples ---------- >>> import gofast as gf >>> from geoprior.utils.spatial_utils import get_xy_coordinates >>> testdata = gf.datasets.make_erp ( samples =7, seed =42 , as_frame=True) >>> xy, d, xynames = get_xy_coordinates ( testdata, ) >>> xy , xynames ((110.48627946874444, 26.051952363176344), ('longitude', 'latitude')) >>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True ) >>> xy.head(2) longitude latitude easting northing 0 110.485833 26.051389 448565.380621 2.881476e+06 1 110.485982 26.051577 448580.339199 2.881497e+06 >>> # remove longitude and lat in data >>> testdata = testdata.drop (columns =['longitude', 'latitude']) >>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True ) >>> xy.head(2) easting northing 0 448565.380621 2.881476e+06 1 448580.339199 2.881497e+06 >>> # note testdata should be transformed inplace when drop_xy is set to True >>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True) >>> xy, xynames ((448610.25612032827, 2881538.4380570543), ('easting', 'northing')) >>> d.head(2) station resistivity 0 0.0 1.0 1 20.0 167.5 >>> testdata.head(2) # coordinates are henceforth been dropped station resistivity 0 0.0 1.0 1 20.0 167.5 >>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True) >>> xy, xynames (None, ()) >>> d.head(2) station resistivity 0 0.0 1.0 1 20.0 167.5 """ def get_value_in(val, col, default): """Get the value in the frame columns if `val` exists in""" x = list(filter(lambda x: x.find(val) >= 0, col)) if len(x) != 0: # now rename col df.rename( columns={x[0]: str(default)}, inplace=True ) return df if not ( hasattr(df, "columns") and hasattr(df, "__array__") ): emsg = ( "Expect dataframe containing coordinates longitude/latitude" f" or easting/northing. Got {type(df).__name__!r}" ) raise_exception = str(raise_exception).lower().strip() if raise_exception == "true": raise TypeError(emsg) if raise_exception not in ("mute", "silence"): warnings.warn(emsg, stacklevel=2) return df # check whether coordinates exists in the data columns for name, tname in zip( ("lat", "lon", "east", "north"), ("latitude", "longitude", "easting", "northing"), strict=False, ): df = get_value_in(name, col=df.columns, default=tname) # get the exist coodinates coord_columns = [] for x, y in zip( ("longitude", "easting"), ("latitude", "northing"), strict=False, ): if x in df.columns and y in df.columns: coord_columns.extend([x, y]) xy = ( df[coord_columns] if len(coord_columns) != 0 else None ) if not as_frame and xy is not None: # take the middle of the line and if both types of # coordinates are supplied , take longitude and latitude # and drop easting and northing xy = tuple(np.nanmean(np.array(xy), axis=0))[:2] xynames = tuple(coord_columns)[:2] if drop_xy and len(coord_columns) != 0: # modifie the data inplace df.drop(columns=coord_columns, inplace=True) if verbose: print( "###", "No" if len(xynames) == 0 else (tuple(xy.columns) if as_frame else xy), "coordinates found.", ) return xy, df, xynames
[docs] @is_data_readable @validate_params( { "data": ["array-like"], "method": [ StrOptions({"abs", "absolute", "relative"}), None, ], } ) @isdf def batch_spatial_sampling( data, sample_size=0.1, n_batches=10, stratify_by=None, spatial_bins=10, spatial_cols=None, method="abs", min_relative_ratio=0.01, random_state=42, verbose=1, ): """ Create stratified spatial sample batches from a DataFrame. Parameters ---------- data : pandas.DataFrame Input DataFrame used for sampling. sample_size : float or int, optional Total sample size as a fraction or absolute count. n_batches : int, optional Number of batches to generate. stratify_by : str or list of str or None, optional Additional columns used for stratification. spatial_bins : int or sequence of int, optional Number of spatial bins used when discretizing coordinates. spatial_cols : list or tuple of str or None, optional Spatial coordinate columns. method : {'abs', 'absolute', 'relative'}, optional Strategy used to translate ``sample_size`` into per-batch sample counts. min_relative_ratio : float, optional Minimum relative group size used by ``method='relative'``. random_state : int, optional Random seed for reproducibility. verbose : int, optional Verbosity level. Returns ------- list of pandas.DataFrame Stratified batches sampled without overlap. Notes ----- Spatial coordinates are discretized with ``pandas.qcut`` and combined with ``stratify_by`` columns so batches preserve the overall data distribution as closely as possible. """ data = data.copy() total_samples = sample_size if isinstance(sample_size, float): if not 0 < sample_size < 1: raise ValueError( "When sample_size is a float, it must be between 0 and 1." ) total_samples = int(len(data) * sample_size) elif isinstance(sample_size, int): if sample_size <= 0: raise ValueError("sample_size must be positive.") else: raise ValueError( "sample_size must be a float or int." ) if total_samples > len(data): raise ValueError( "sample_size is larger than the dataset." ) if n_batches <= 0: raise ValueError( "n_batches must be a positive integer." ) sample_size_per_batch = total_samples // n_batches leftover = total_samples % n_batches batches = [] remaining_data = data.copy() sampled_indices = set() rng = np.random.RandomState(random_state) # Set default spatial columns if not specified spatial_cols = columns_manager(spatial_cols) if spatial_cols is None: spatial_cols = [] if "longitude" in data.columns: spatial_cols.append("longitude") if "latitude" in data.columns: spatial_cols.append("latitude") if not spatial_cols: raise ValueError( "No spatial columns specified and " "'longitude' and 'latitude' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' found. " "Using it for spatial stratification. " "For more accurate sampling, providing both spatial " "columns is recommended.", UserWarning, stacklevel=2, ) else: if not isinstance(spatial_cols, list | tuple): raise ValueError( "spatial_cols must be a list or tuple of column names." ) if len(spatial_cols) > 2: raise ValueError( "spatial_cols can have at most two columns." ) for col in spatial_cols: if col not in data.columns: raise ValueError( f"Spatial column '{col}' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' specified. " "For more accurate sampling, providing two spatial columns " "is recommended.", UserWarning, stacklevel=2, ) # Validate spatial_bins if isinstance(spatial_bins, int): n_bins_list = [spatial_bins] * len(spatial_cols) elif isinstance(spatial_bins, tuple | list): if len(spatial_bins) != len(spatial_cols): raise ValueError( "Length of spatial_bins must match number of spatial_cols." ) n_bins_list = list(spatial_bins) else: raise ValueError( "spatial_bins must be int or tuple/list of int." ) # Create spatial bins in the original data for col, n_bins, axis in zip( spatial_cols, n_bins_list, ["x_bin", "y_bin"], strict=False, ): data[axis] = pd.qcut( data[col], q=n_bins, duplicates="drop" ) # Create combined stratification key in original data # Create combined stratification key if stratify_by is not None: strat_columns = stratify_by + [ axis for axis in ["x_bin", "y_bin"][ : len(spatial_cols) ] ] if verbose and len(data) > 10_000: print( f"\nGenerating stratification keys for {len(data):,}" " records..." ) print( " This may take some time. Please be patient..." ) # Optimized vectorized concatenation data["strat_key"] = ( data[strat_columns] .astype(str) .agg("_".join, axis=1) ) if verbose and len(data) > 10_000: print( "Stratification keys generated" f" successfully for {len(data):,} records." ) else: data["strat_key"] = ( "all_data" # Single group for random sampling ) # Initialize remaining data remaining_data = data.copy() batches = [] # Set initial random state rng = np.random.RandomState(random_state) if verbose: print( f"\nCreating {n_batches} stratified batches with" f" a total of {total_samples:,} samples..." ) # TQDM progress bar if verbose and HAS_TQDM: progbar = tqdm( range(n_batches), total=n_batches, ascii=True, ncols=80, desc="Batch Sampling Progress", ) if method == "relative": min_relative_ratio = assert_ratio( min_relative_ratio, bounds=(0, 1), exclude_values=(0, 1), name="`min_relative_ratio`", ) for batch_idx in range(n_batches): # Adjust sample size for batches if total_samples # is not divisible by n_batches if batch_idx < leftover: batch_sample_size = sample_size_per_batch + 1 else: batch_sample_size = sample_size_per_batch # if batch_sample_size > len(remaining_data): # batch_sample_size = len(remaining_data) batch_sample_size = min( batch_sample_size, len(remaining_data) ) # Group remaining data by stratification key grouped = remaining_data.groupby("strat_key") # Calculate number of samples per group group_sizes = grouped.size() total_size = group_sizes.sum() if method in ["abs", "absolute"]: group_sample_sizes = ( (group_sizes / total_size * batch_sample_size) .round() .astype(int) ) else: # "relative" relative_scale = np.clip( batch_sample_size / len(remaining_data), min_relative_ratio, 1, ) group_sample_sizes = ( (group_sizes * relative_scale) .round() .astype(int) ) # Sample data from each group sampled_indices = [] for strat_value, group in grouped: n = group_sample_sizes.loc[strat_value] if n > 0 and len(group) > 0: sampled_group = group.sample( n=min(n, len(group)), random_state=rng.randint(0, 10000), ) sampled_indices.extend(sampled_group.index) # Create the sampled DataFrame batch_sampled_data = remaining_data.loc[ sampled_indices ] batches.append( batch_sampled_data.drop( columns=["strat_key"] + [ axis for axis in ["x_bin", "y_bin"][ : len(spatial_cols) ] ] ) ) # Remove sampled data from remaining_data remaining_data = remaining_data.drop( index=sampled_indices ) if len(remaining_data) == 0: break # No more data to sample if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() if verbose: print( f"\nBatch sampling completed. {len(batches)} batches created." ) has_empty_batches = any([b.empty for b in batches]) if verbose and has_empty_batches: warnings.warn( "\nNo records were sampled. This is likely due to" " insufficient data for the specified stratification" f" columns {stratify_by}. To resolve this, consider:\n" " • Using a different stratification method.\n" " • Increasing the dataset size to include more" " representative data.\n" " • Adjusting the sample size to ensure sufficient" " records per group.\n" " • Or, setting `stratify_by=None` to perform" " random sampling instead.", stacklevel=2, ) return batches
[docs] @SaveFile @is_data_readable @validate_params( { "data": ["array-like"], "method": [ StrOptions({"abs", "absolute", "relative"}), None, ], } ) @isdf def spatial_sampling( data, sample_size=0.01, stratify_by=None, spatial_bins=10, spatial_cols=None, method="abs", min_relative_ratio=0.01, random_state=42, savefile=None, verbose=1, ): """ Sample spatial data intelligently to represent the distribution of the whole area and include different years. This function performs stratified sampling on spatial data, ensuring that the sample reflects both spatial distribution and temporal aspects of the entire dataset. It combines spatial stratification based on coordinates and additional stratification columns specified by the user. Parameters ---------- data : pandas.DataFrame The input DataFrame to sample from. Must contain spatial coordinate columns (e.g., `'longitude'`, `'latitude'`) and any columns specified in ``stratify_by``. sample_size : float or int, optional The proportion or absolute number of samples to select. If float, should be between 0.0 and 1.0 and represents the fraction of the dataset to include in the sample. If int, represents the absolute number of samples to select. Default is ``0.01`` (1% of the data). stratify_by : list of str, optional List of column names to stratify by. spatial_bins : int or tuple/list of int, optional Number of bins to divide the spatial coordinates into. If an integer, the same number of bins is used for all spatial dimensions. If a tuple or list, its length must match the number of spatial columns, specifying the number of bins for each spatial dimension. Default is ``10``. spatial_cols : list or tuple of str, optional List of spatial coordinate column names. Can accept one or two columns. If ``None``, the function checks for columns named `'longitude'` and/or `'latitude'` in ``data``. If only one spatial column is provided or found, a warning is issued, suggesting that providing both spatial columns is recommended for more accurate sampling. If more than two columns are provided, an error is raised. method : str, {'abs', 'relative'}, default='abs' Defines how the sample size is determined. ``'abs'`` or ``'absolute'`` uses a fixed sampling proportion based on ``sample_size``. ``'relative'`` scales sampling by dataset stratification so small groups still receive a proportional sample controlled by ``min_relative_ratio``. min_relative_ratio : float, default=0.01 Controls the minimum allowable fraction of records that must be sampled when ``method='relative'``. It must be between ``0`` and ``1``. For example, ``min_relative_ratio=0.05`` requests at least 5 percent of the total dataset size from each stratification group when possible; if a group is smaller than that minimum, the entire subset is sampled instead. random_state : int, optional Random seed for reproducibility. Default is ``42``. verbose : int, default=1 Controls progress-bar and status output during execution. Larger values produce more detailed messages. Returns ------- sampled_data : pandas.DataFrame A sampled DataFrame representing the distribution of the whole area and including different years. Notes ----- The function performs stratified sampling based on spatial bins and other specified stratification columns. Spatial coordinates are binned using quantile-based discretization (:func:`pandas.qcut`), ensuring each bin has approximately the same number of observations. Let :math:`N` be the total number of samples in ``data``, and :math:`n` be the desired sample size. The function calculates the number of samples to draw from each stratification group based on the proportion of the group size to the total dataset size: .. math:: n_i = \\left\\lceil \\frac{N_i}{N} \\times n \\right\\rceil where :math:`N_i` is the size of group :math:`i`, and :math:`n_i` is the number of samples to draw from group :math:`i`. The function ensures that all specified spatial and stratification columns exist in ``data``, that the number of spatial bins matches the number of spatial columns, and that the sample size is valid. A warning is issued when only one spatial column is used because two spatial columns usually give more reliable spatial sampling. Examples -------- >>> from geoprior.utils.spatial_utils import spatial_sampling >>> import pandas as pd >>> # Assume 'df' is a pandas DataFrame with columns >>> # 'longitude', 'latitude', 'year', and other data. >>> sampled_df = spatial_sampling( ... data=df, ... sample_size=0.05, ... stratify_by=['year', 'geological_category'], ... spatial_bins=(10, 15), ... spatial_cols=['longitude', 'latitude'], ... random_state=42 ... ) >>> print(sampled_df.shape) See Also -------- pandas.qcut : Quantile-based discretization function used for binning. sklearn.model_selection.StratifiedShuffleSplit : For stratified sampling. batch_spatial_sampling: Resample spatial data with batching. """ data = data.copy() # Set default spatial columns if not specified spatial_cols = columns_manager(spatial_cols) if spatial_cols is None: spatial_cols = [] if "longitude" in data.columns: spatial_cols.append("longitude") if "latitude" in data.columns: spatial_cols.append("latitude") if not spatial_cols: raise ValueError( "No spatial columns specified and " "'longitude' and 'latitude' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' found. " "Using it for spatial stratification. " "For more accurate sampling, providing both spatial " "columns is recommended.", UserWarning, stacklevel=2, ) else: if not isinstance(spatial_cols, list | tuple): raise ValueError( "spatial_cols must be a list or tuple of column names." ) if len(spatial_cols) > 2: raise ValueError( "spatial_cols can have at most two columns." ) for col in spatial_cols: if col not in data.columns: raise ValueError( f"Spatial column '{col}' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' specified. " "For more accurate sampling, providing two spatial columns " "is recommended.", UserWarning, stacklevel=2, ) # Validate spatial_bins if isinstance(spatial_bins, int): n_bins = validate_positive_integer( spatial_bins, "spatial_bins" ) n_bins_list = [n_bins] * len(spatial_cols) elif isinstance(spatial_bins, tuple | list): if len(spatial_bins) != len(spatial_cols): raise ValueError( "Length of spatial_bins must match number of spatial_cols." ) n_bins_list = [ validate_positive_integer(n, "spatial_bins") for n in spatial_bins ] else: raise ValueError( "spatial_bins must be int or tuple/list of int." ) # if verbose and HAS_TQDM are True. if verbose and HAS_TQDM: progbar = tqdm( zip( spatial_cols, n_bins_list, ["x_bin", "y_bin"], strict=False, ), total=len(spatial_cols), ascii=True, ncols=77, desc=f"{'Creating spat. bins: ' + str(len(spatial_cols)):<20}", ) # Create spatial bins for col, n_bins, axis in zip( spatial_cols, n_bins_list, ["x_bin", "y_bin"], strict=False, ): data[axis] = pd.qcut( data[col], q=n_bins, duplicates="drop" ) if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() stratify_by = columns_manager( stratify_by, empty_as_none=False ) # Create combined stratification key strat_columns = stratify_by + [ axis for axis in ["x_bin", "y_bin"][: len(spatial_cols)] ] if verbose and len(data) > 10_000: print("\nGenerating stratification keys...") print( f"This may take some time for {len(data):,}" " records. Please be patient..." ) # data['strat_key'] = data[strat_columns].apply( # lambda row: '_'.join(row.values.astype(str)), # axis=1 # ) # Using .agg and .astype(str) for vectorized string concatenation data["strat_key"] = ( data[strat_columns].astype(str).agg("_".join, axis=1) ) # Verbose message when done if verbose and len(data) > 10_000: print( "Stratification keys generated successfully" f" for {len(data):,} records." ) # Determine total number of samples if isinstance(sample_size, float): if not 0 < sample_size < 1: raise ValueError( "When sample_size is a float, it must be between 0 and 1." ) n_samples = int(len(data) * sample_size) elif isinstance(sample_size, int): n_samples = validate_positive_integer( sample_size, "sample_size" ) else: raise ValueError( "sample_size must be a positive float or int." ) # Group data by stratification key grouped = data.groupby("strat_key") if verbose: print( f"Data grouped into {len(grouped):,} stratified bins." ) # Apply stratification if stratify_by is provided if stratify_by is not None: # Calculate number of samples per group group_sizes = grouped.size() total_size = group_sizes.sum() if method in ["abs", "absolute"]: group_sample_sizes = ( (group_sizes / total_size * n_samples) .round() .astype(int) ) else: # "relative" min_relative_ratio = assert_ratio( min_relative_ratio, bounds=(0, 1), exclude_values=[0, 1], name="`min_relative_ratio`", ) relative_scale = np.clip( n_samples / len(data), min_relative_ratio, 1 ) group_sample_sizes = ( (group_sizes * relative_scale) .round() .astype(int) ) # Sample data from each group sampled_indices = [] np.random.seed(random_state) # Use tqdm to wrap the grouped iterator # if verbose and HAS_TQM are True. if verbose and HAS_TQDM: progbar = tqdm( grouped, total=len(grouped), ascii=True, ncols=77, desc=f"Sampling {n_samples:,} records", ) for strat_value, group in grouped: n = group_sample_sizes.loc[strat_value] if n > 0 and len(group) > 0: sampled_group = group.sample( n=min(n, len(group)), random_state=np.random.randint(0, 10_000), ) sampled_indices.extend(sampled_group.index) if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() else: sampled_indices = np.random.choice( data.index, size=n_samples, replace=False ) # Create the sampled DataFrame sampled_data = data.loc[sampled_indices] # Drop helper columns cols_to_drop = ["strat_key"] + [ axis for axis in ["x_bin", "y_bin"][: len(spatial_cols)] ] sampled_data = sampled_data.drop(columns=cols_to_drop) if verbose: print( f"\nSampling completed: {len(sampled_indices):,}" " records selected." ) if verbose and sampled_data.empty: warnings.warn( "\nNo records were sampled. This is likely due to" " insufficient data for the specified stratification" f" columns {stratify_by}. To resolve this, consider:\n" " • Using a different stratification method.\n" " • Increasing the dataset size to include more" " representative data.\n" " • Adjusting the sample size to ensure sufficient" " records per group.\n" " • Or, setting `stratify_by=None` to perform" " random sampling instead.", stacklevel=2, ) return sampled_data.reset_index(drop=True)