# SPDX-License-Identifier: Apache-2.0
# Author: LKouadio <etanoyau@gmail.com>
# Adapted from: earthai-tech/fusionlab-learn — https://github.com/earthai-tech/fusionlab-learn
# Modified for GeoPrior-v3 API.
"""
geospatial_utils - A collection of utilities for geospatial and positional
data analysis, filtering, and transformations.
"""
from __future__ import annotations
import warnings
from numbers import Real
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.patches import Circle, Patch
from scipy.spatial import cKDTree
from sklearn.cluster import (
DBSCAN,
AgglomerativeClustering,
KMeans,
)
from sklearn.metrics import silhouette_score
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from ..api.types import DataFrame
from ..compat.sklearn import (
Interval,
StrOptions,
validate_params,
)
from ..core.array_manager import extract_array_from
from ..core.checks import (
are_all_frames_valid,
assert_ratio,
check_spatial_columns,
exist_features,
)
from ..core.handlers import columns_manager, resolve_label
from ..core.io import SaveFile, is_data_readable
from ..decorators import Deprecated, isdf
from .generic_utils import find_id_column
from .validator import (
filter_valid_kwargs,
parameter_validator,
validate_length_range,
validate_positive_integer,
)
HAS_TQDM = True
try:
from tqdm import tqdm
except:
HAS_TQDM = False
__all__ = [
"spatial_sampling",
"extract_coordinates",
"batch_spatial_sampling",
"extract_zones_from",
"filter_position",
"create_spatial_clusters",
"gen_negative_samples",
"gen_buffered_negative_samples",
"gen_negative_samples_plus",
"extract_spatial_roi",
"make_forecast_ready_sample",
]
def _longest_consecutive_run(
values: np.ndarray | list[int],
) -> int:
"""Return the longest consecutive integer run."""
if len(values) == 0:
return 0
years = np.asarray(values, dtype=int)
years = np.unique(years)
years.sort()
if years.size == 1:
return 1
diffs = np.diff(years)
longest = 1
current = 1
for d in diffs:
if d == 1:
current += 1
longest = max(longest, current)
else:
current = 1
return int(longest)
def _export_demo_sample(
df: pd.DataFrame,
export_path: str | None = None,
export_format: str | None = None,
) -> pd.DataFrame:
"""
Export a demo sample to a non-CSV format.
Notes
-----
- `savefile` is still handled by `@SaveFile`
and remains the simplest CSV export path.
- This helper only handles explicit non-CSV
or forced-format exports.
"""
if export_path is None:
return df
path = Path(export_path).expanduser()
fmt = export_format
if fmt is None:
suffix = path.suffix.lower().lstrip(".")
if suffix in {"xlsx", "xls"}:
fmt = "xlsx"
elif suffix in {"pq", "parquet"}:
fmt = "parquet"
elif suffix in {"json"}:
fmt = "json"
elif suffix in {"feather", "ftr"}:
fmt = "feather"
elif suffix in {"pkl", "pickle"}:
fmt = "pickle"
else:
fmt = "csv"
fmt = str(fmt).strip().lower()
path.parent.mkdir(parents=True, exist_ok=True)
if fmt == "csv":
df.to_csv(path, index=False)
elif fmt in {"xlsx", "xls", "excel"}:
df.to_excel(path, index=False)
elif fmt == "parquet":
df.to_parquet(path, index=False)
elif fmt == "json":
df.to_json(
path,
orient="records",
indent=2,
)
elif fmt in {"feather", "ftr"}:
df.reset_index(drop=True).to_feather(path)
elif fmt in {"pkl", "pickle"}:
df.to_pickle(path)
else:
raise ValueError(
"Unsupported export_format. "
"Use one of: "
"{'csv', 'xlsx', 'parquet', "
"'json', 'feather', 'pickle'}."
)
return df
[docs]
@SaveFile
@is_data_readable
@validate_params(
{
"data": ["array-like"],
"method": [
StrOptions({"abs", "absolute", "relative"}),
None,
],
"year_mode": [
StrOptions(
{
"all",
"latest",
"earliest",
"random",
}
),
None,
],
"export_format": [
StrOptions(
{
"csv",
"xlsx",
"excel",
"parquet",
"json",
"feather",
"pickle",
}
),
None,
],
}
)
@isdf
def make_forecast_ready_sample(
data,
sample_size=0.05,
time_col: str = "year",
spatial_cols: tuple[str, str] | list[str] | None = None,
group_cols: tuple[str, ...] | list[str] | None = None,
stratify_by: list[str] | tuple[str, ...] | None = None,
spatial_bins: int | tuple[int, int] | list[int] = 10,
time_steps: int = 3,
forecast_horizon: int = 1,
require_consecutive: bool = True,
keep_years: int | None = None,
year_mode: str = "latest",
min_groups: int = 5,
max_groups: int | None = None,
columns_to_keep: list[str]
| tuple[str, ...]
| None = None,
method: str = "abs",
min_relative_ratio: float = 0.01,
random_state: int | None = 42,
export_path: str | None = None,
export_format: str | None = None,
savefile: str | None = None,
sort_output: bool = True,
verbose: int = 1,
) -> pd.DataFrame:
"""
Build a compact, forecast-ready panel sample.
The function samples *spatial groups* rather than
individual rows, then reconstructs the full panel
for the selected groups. This is much safer for
demo/testing than row-wise sampling because each
sampled location keeps enough temporal history for
sequence construction.
Parameters
----------
data : pandas.DataFrame
Input panel DataFrame.
sample_size : float or int, default=0.05
Group-level sample size passed to the internal
spatial sampler.
- float: fraction of eligible groups
- int: absolute number of eligible groups
time_col : str, default='year'
Time column.
spatial_cols : tuple/list of str, optional
Spatial coordinate columns. If None, the function
searches for 'longitude' and 'latitude'.
group_cols : tuple/list of str, optional
Group identifier columns. If None, uses
`spatial_cols`.
stratify_by : list/tuple of str, optional
Extra group-level columns used for stratification.
Typical examples:
['lithology_class'] or ['city', 'lithology_class'].
spatial_bins : int or tuple/list, default=10
Spatial bins passed to `spatial_sampling()`.
time_steps : int, default=3
Lookback window length.
forecast_horizon : int, default=1
Forecast horizon length.
require_consecutive : bool, default=True
If True, each kept group must contain at least
one consecutive run of length
`time_steps + forecast_horizon`.
keep_years : int, optional
If provided, keep only this many years per group
after sampling. Must be >=
`time_steps + forecast_horizon`.
year_mode : {'all', 'latest', 'earliest', 'random'}
How to trim years when `keep_years` is given.
min_groups : int, default=5
Minimum number of eligible groups required.
max_groups : int, optional
Hard cap on sampled groups after spatial sampling.
columns_to_keep : list/tuple of str, optional
Restrict the returned columns. Group and time
columns are always preserved.
method : {'abs', 'absolute', 'relative'}, default='abs'
Same meaning as in `spatial_sampling()`.
min_relative_ratio : float, default=0.01
Minimum relative sampling ratio when
`method='relative'`.
random_state : int, optional
Random seed.
export_path : str, optional
Explicit path for non-CSV export.
export_format : str, optional
Export format for `export_path`. If None,
inferred from suffix.
savefile : str, optional
CSV save path handled by `@SaveFile`.
sort_output : bool, default=True
Sort final output by group and time.
verbose : int, default=1
Verbosity level.
Returns
-------
pandas.DataFrame
A compact panel sample that preserves group-wise
temporal structure for forecast demos/tests.
Examples
--------
>>> from geoprior.utils.spatial_utils import make_forecast_ready_sample
>>> # Small demo sample, latest 4 years per location
>>> demo = make_forecast_ready_sample(
data=df,
sample_size=0.05,
stratify_by=["lithology_class"],
spatial_cols=("longitude", "latitude"),
time_col="year",
time_steps=3,
forecast_horizon=1,
keep_years=4,
year_mode="latest",
require_consecutive=True,
savefile="demo_panel.csv",
)
>>> #Slightly richer test panel, keep all years, export parquet
demo = make_forecast_ready_sample(
data=df,
sample_size=150,
stratify_by=["city", "lithology_class"],
spatial_cols=("longitude", "latitude"),
time_col="year",
time_steps=3,
forecast_horizon=1,
keep_years=None,
export_path="demo_panel.parquet",
export_format="parquet",
)
"""
df = data.copy()
rng = np.random.RandomState(random_state)
spatial_cols = columns_manager(spatial_cols)
if spatial_cols is None:
spatial_cols = []
if "longitude" in df.columns:
spatial_cols.append("longitude")
if "latitude" in df.columns:
spatial_cols.append("latitude")
if not spatial_cols:
raise ValueError(
"No spatial columns found. Provide spatial_cols "
"or ensure 'longitude'/'latitude' exist."
)
if len(spatial_cols) > 2:
raise ValueError(
"spatial_cols can contain at most two columns."
)
if group_cols is None:
group_cols = tuple(spatial_cols)
else:
group_cols = tuple(columns_manager(group_cols))
stratify_by = (
[]
if stratify_by is None
else list(columns_manager(stratify_by))
)
required_cols = set(group_cols) | {time_col}
required_cols |= set(spatial_cols)
required_cols |= set(stratify_by)
missing = [
c for c in required_cols if c not in df.columns
]
if missing:
raise ValueError(
f"Missing required columns: {missing}"
)
min_required_years = int(time_steps) + int(
forecast_horizon
)
if min_required_years <= 1:
raise ValueError(
"time_steps + forecast_horizon must be >= 2."
)
if keep_years is not None:
keep_years = validate_positive_integer(
keep_years,
"keep_years",
)
if keep_years < min_required_years:
raise ValueError(
"keep_years must be >= "
"time_steps + forecast_horizon."
)
min_groups = validate_positive_integer(
min_groups,
"min_groups",
)
if max_groups is not None:
max_groups = validate_positive_integer(
max_groups,
"max_groups",
)
if not 0 < float(min_relative_ratio) <= 1:
raise ValueError(
"min_relative_ratio must be in (0, 1]."
)
df = df.dropna(subset=list(required_cols)).copy()
if df.empty:
raise ValueError(
"No rows remain after dropping NA values in "
"required columns."
)
if not pd.api.types.is_numeric_dtype(df[time_col]):
dt = pd.to_datetime(df[time_col], errors="coerce")
if dt.isna().all():
raise ValueError(
f"Could not coerce {time_col!r} to a "
"numeric or datetime-like time column."
)
df[time_col] = dt.dt.year.astype(int)
else:
df[time_col] = df[time_col].astype(int)
stats = []
grouped = df.groupby(list(group_cols), dropna=False)
for key, g in grouped:
years = np.asarray(
sorted(pd.unique(g[time_col])),
dtype=int,
)
n_years = int(years.size)
longest_run = _longest_consecutive_run(years)
ok = n_years >= min_required_years
if require_consecutive:
ok = ok and (longest_run >= min_required_years)
if not ok:
continue
row0 = g.iloc[0]
record = {
"__group_key__": "||".join(map(str, key))
if isinstance(key, tuple)
else str(key),
"__n_years__": n_years,
"__min_year__": int(years.min()),
"__max_year__": int(years.max()),
"__longest_run__": longest_run,
}
for col in group_cols:
record[col] = row0[col]
for col in spatial_cols:
record[col] = row0[col]
for col in stratify_by:
record[col] = row0[col]
stats.append(record)
eligible = pd.DataFrame(stats)
if eligible.empty:
raise ValueError(
"No eligible groups found. Try lowering "
"time_steps/forecast_horizon, or set "
"require_consecutive=False."
)
if len(eligible) < min_groups:
raise ValueError(
f"Only {len(eligible)} eligible groups found, "
f"but min_groups={min_groups}."
)
# Group-level spatial sampling.
select_cols = list(
dict.fromkeys(
list(spatial_cols)
+ list(stratify_by)
+ list(group_cols)
+ [
"__group_key__",
"__n_years__",
"__min_year__",
"__max_year__",
"__longest_run__",
]
)
)
dups = eligible.loc[:, select_cols].columns[
eligible.loc[:, select_cols].columns.duplicated()
]
if len(dups) > 0:
raise RuntimeError(
"Duplicate columns detected before spatial_sampling: "
f"{list(dups)}"
)
sampled_groups = spatial_sampling(
eligible.loc[:, select_cols],
sample_size=sample_size,
stratify_by=stratify_by or None,
spatial_bins=spatial_bins,
spatial_cols=spatial_cols,
method=method,
min_relative_ratio=min_relative_ratio,
random_state=random_state,
verbose=max(0, int(verbose) - 1),
)
if sampled_groups.empty:
raise ValueError(
"The group-level sampler returned no groups."
)
if (
max_groups is not None
and len(sampled_groups) > max_groups
):
sampled_groups = sampled_groups.sample(
n=max_groups,
random_state=random_state,
)
sampled = df.merge(
sampled_groups.loc[
:, list(group_cols)
].drop_duplicates(),
on=list(group_cols),
how="inner",
)
if sampled.empty:
raise ValueError(
"No rows were recovered after merging sampled "
"groups back to the source panel."
)
if keep_years is not None:
parts = []
for _, g in sampled.groupby(
list(group_cols),
dropna=False,
sort=False,
):
g = g.sort_values(time_col)
if len(g) <= keep_years:
parts.append(g)
continue
if year_mode == "latest":
parts.append(g.tail(keep_years))
elif year_mode == "earliest":
parts.append(g.head(keep_years))
elif year_mode == "random":
years = np.asarray(
sorted(pd.unique(g[time_col])),
dtype=int,
)
chosen = np.sort(
rng.choice(
years,
size=keep_years,
replace=False,
)
)
parts.append(
g[g[time_col].isin(chosen)].copy()
)
else: # "all"
parts.append(g)
sampled = pd.concat(
parts,
axis=0,
ignore_index=True,
)
if columns_to_keep is not None:
keep = list(columns_manager(columns_to_keep))
keep = list(
dict.fromkeys(
list(group_cols) + [time_col] + keep
)
)
keep = [c for c in keep if c in sampled.columns]
sampled = sampled.loc[:, keep].copy()
if sort_output:
sampled = sampled.sort_values(
list(group_cols) + [time_col]
).reset_index(drop=True)
if verbose:
n_groups = (
sampled.loc[
:,
list(group_cols),
]
.drop_duplicates()
.shape[0]
)
msg = (
"[INFO] Built forecast-ready sample: "
f"{len(sampled)} rows, "
f"{n_groups} groups, "
f"min_required_years={min_required_years}"
)
if keep_years is not None:
msg += f", keep_years={keep_years}"
print(msg)
sampled = _export_demo_sample(
sampled,
export_path=export_path,
export_format=export_format,
)
return sampled
[docs]
def deg_to_m_from_lat(lat_deg: float) -> tuple[float, float]:
"""
Approx WGS84 meters per degree at reference latitude.
Returns (deg_to_m_lon, deg_to_m_lat).
"""
phi = np.deg2rad(lat_deg)
# Common geodesy approximations (meters per degree)
deg_to_m_lat = (
111132.92
- 559.82 * np.cos(2 * phi)
+ 1.175 * np.cos(4 * phi)
- 0.0023 * np.cos(6 * phi)
)
deg_to_m_lon = (
111412.84 * np.cos(phi)
- 93.5 * np.cos(3 * phi)
+ 0.118 * np.cos(5 * phi)
)
return float(deg_to_m_lon), float(deg_to_m_lat)
def _find_closest_value(
series: pd.Series, value: float
) -> float:
"""Finds the value in a Series closest to a target value."""
return series.iloc[(series - value).abs().argmin()]
[docs]
@SaveFile
@isdf
def gen_negative_samples_plus(
df: pd.DataFrame,
target_col: str,
spatial_cols: tuple[str, str] = ("longitude", "latitude"),
feature_cols: list[str] | None = None,
buffer_km: float = 10,
neg_feature_range: tuple[float, float] = (0, 5),
num_neg_per_pos: int = 1,
strategy: str = "landslide",
gauge_data: pd.DataFrame | None = None,
elevation_data: pd.DataFrame | None = None,
similarity_features: list[str] | None = None,
time_col: str | None = None,
cluster_method: str = "kmeans",
use_gpd: bool | str = "auto",
id_col="auto",
view: bool = False,
savefile: str | None = None,
verbose: int = 1,
seed: int | None = None,
) -> pd.DataFrame:
"""
Generates negative samples for modeling in spatial scenarios,
offering multiple strategies. The function calls
`gen_buffered_negative_samples` when the ``strategy`` argument
is ``'landslide'``, ``'event'``, or ``'gauge'``. It also calls
`generate_negative_samples` partially in the ``'hybrid'``
strategy. Internally, each sample is augmented to produce
negative instances according to a chosen method.
.. math::
\\text{buffer_deg} = \\frac{\\text{buffer_km}}{111.0}
The above formula approximates degrees from kilometers near
the equator. The output is a combined dataset containing
original positives and generated negatives. The ratio of
negatives per positive is controlled by
``num_neg_per_pos``.
Parameters
----------
df : pandas.DataFrame
Input data containing spatial coordinates and features.
target_col : str
Name of the classification target column. Positive
samples in `df` are labeled, negatives will be generated.
spatial_cols : tuple of str, optional
Columns representing longitude and latitude in `df`.
feature_cols : list of str, optional
Additional feature columns used to drive or constrain
negative sampling processes.
buffer_km : float, optional
Radius in kilometers for local negative sampling. Used
to compute buffer degrees.
neg_feature_range : tuple of float, optional
Lower and upper range for continuous features in random
negative generation.
num_neg_per_pos : int, optional
Number of negatives to generate per positive instance.
strategy : str, optional
Defines the sampling approach. Options include
``'landslide'``, ``'gauge'``, ``'random_global'``,
``'temporal_shift'``, ``'clustered_negatives'``,
``'environmental_similarity'``, ``'elevation_based'``,
and ``'hybrid'``.
See more in :ref:`User Guide <user_guide>`.
gauge_data : pandas.DataFrame, optional
Reference data for gauge-based or hybrid strategies.
elevation_data : pandas.DataFrame, optional
Elevation records, used if ``strategy='elevation_based'``.
similarity_features : list of str, optional
Columns for nearest neighbor computation in
``'environmental_similarity'``.
time_col : str, optional
Name of the time column for ``'temporal_shift'``. Required
if using that strategy.
cluster_method : str, optional
Clustering algorithm for ``'clustered_negatives'``. Default
is ``'kmeans'``.
use_gpd : bool or str, optional
Indicator for whether geopandas is used in buffer-based
processes.
id_col : str, optional
Column name used as an identifier. If `'auto'`, a default is
used.
view : bool, optional
Flag for visualizing or previewing the results.
savefile : str, optional
Path to save the output dataset. If None, no file is saved.
verbose : int, optional
Verbosity level. Higher values yield more logs.
seed : int, optional
Random seed for reproducibility. If None, randomness is not
fixed.
Returns
-------
pandas.DataFrame
A combined DataFrame containing the original positive samples
labeled as 1 and newly generated negative samples labeled 0.
Notes
-----
When ``strategy='hybrid'``, partial sets of negatives come from
two distinct calls to `generate_negative_samples` for
``'landslide'`` and ``'gauge'`` sub-strategies, then merged.
Examples
--------
>>> from geoprior.utils.spatial_utils import gen_negative_samples_plus
>>> import pandas as pd
>>> df_example = pd.DataFrame({{
... "longitude": [10.1, 10.2, 10.3],
... "latitude": [45.1, 45.2, 45.3],
... "feature": [3.4, 2.1, 6.7],
... "target": [1, 1, 1]
... }})
>>> gauge_data = pd.DataFrame({{
... 'gauge_id': ['G1', 'G2', 'G3'],
... 'latitude': np.random.uniform(24.0, 25.0, 3),
... 'longitude': np.random.uniform(113.0, 114.0, 3)
... }})
>>> # Generate random global negatives
>>> result = gen_negative_samples_plus(
... df_example,
... target_col="target",
... strategy="random_global"
... )
>>> print(result.head())
See Also
--------
gen_buffered_negative_samples : Generates negative samples
within a buffer region around reference events or gauges.
generate_negative_samples : A simpler negative sampling
utility for certain strategies.
"""
# If the strategy is one of the recognized
# types (landslide, event, gauge), we
# directly call gen_buffered_negative_samples.
if str(strategy).lower() in (
"landslide",
"event",
"gauge",
):
return gen_buffered_negative_samples(
df=df,
target_col=target_col,
spatial_cols=spatial_cols,
feature_cols=feature_cols,
buffer_km=buffer_km,
neg_feature_range=neg_feature_range,
num_neg_per_pos=num_neg_per_pos,
strategy=strategy,
gauge_data=gauge_data,
use_gpd=use_gpd,
view=view,
id_col=id_col,
savefile=savefile,
seed=seed,
verbose=verbose,
)
# If not in the above categories, we proceed
# with advanced strategies.
np.random.seed(seed)
# Validate columns and parameters via the
# _validate_negative_sampling utility.
checked_values = _validate_negative_sampling(
df=df,
target_col=target_col,
spatial_cols=spatial_cols,
feature_cols=feature_cols,
neg_feature_range=neg_feature_range,
num_neg_per_pos=num_neg_per_pos,
verbose=verbose,
id_col=id_col,
)
(
spatial_cols,
feature_cols,
neg_feature_range,
num_neg_per_pos,
) = checked_values
# Extract column names for lon/lat.
lon_col, lat_col = spatial_cols
# Convert buffer kilometers to degrees,
# approximating 1 deg ~ 111 km.
buffer_deg = buffer_km / 111.0
# Additional advanced strategy options.
add_strategies = [
"random_global",
"temporal_shift",
"clustered_negatives",
"environmental_similarity",
"elevation_based",
"hybrid",
]
# Validate the selected strategy from
# the advanced list.
strategy = parameter_validator(
"strategy", target_strs=add_strategies, deep=True
)(strategy)
# Collect negative samples in a list.
negatives = []
# -----------------------
# RANDOM_GLOBAL strategy:
# Generate uniform random coordinates
# across the bounding box of df.
# -----------------------
if strategy == "random_global":
lon_min, lon_max = (
df[lon_col].min(),
df[lon_col].max(),
)
lat_min, lat_max = (
df[lat_col].min(),
df[lat_col].max(),
)
for _ in range(len(df) * num_neg_per_pos):
sample = {
lon_col: np.random.uniform(lon_min, lon_max),
lat_col: np.random.uniform(lat_min, lat_max),
target_col: 0,
}
for col in feature_cols:
sample[col] = np.random.uniform(
*neg_feature_range
)
negatives.append(sample)
# -----------------------
# TEMPORAL_SHIFT strategy:
# Shift time backward (e.g. 30 days)
# and mark as negative samples.
# -----------------------
elif strategy == "temporal_shift":
if time_col is None:
raise ValueError(
"Time column must be provided "
"for 'temporal_shift'."
)
df_shifted = df.copy()
df_shifted[time_col] = pd.to_datetime(
df_shifted[time_col]
) - pd.Timedelta(days=30)
for _, row in df_shifted.iterrows():
sample = row.to_dict()
sample[target_col] = 0
negatives.append(sample)
# ---------------------------
# CLUSTERED_NEGATIVES:
# Cluster positive points and
# sample negatives around the
# resulting centroids. Two
# clustering options are
# supported: KMeans and DBSCAN.
# ---------------------------
elif strategy == "clustered_negatives":
coords = df[[lon_col, lat_col]].values
# --- choose clustering backend
if cluster_method.lower() == "kmeans":
clustering = KMeans(
n_clusters=max(2, len(df) // 10),
random_state=seed,
).fit(coords)
centers = clustering.cluster_centers_
elif cluster_method.lower() == "dbscan":
clustering = DBSCAN(
eps=buffer_deg * 2, # search radius (deg)
min_samples=5,
).fit(coords)
labels = clustering.labels_
centers = []
for lbl in np.unique(labels):
if lbl == -1: # DBSCAN noise
continue
pts = coords[labels == lbl]
centers.append(pts.mean(axis=0))
centers = np.asarray(centers)
else:
raise ValueError(
"cluster_method must be either "
"'KMeans' or 'DBSCAN'"
)
# --- generate negatives near each centroid
for center in centers:
for _ in range(num_neg_per_pos):
lat_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
lon_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
sample = {
lat_col: center[1] + lat_offset,
lon_col: center[0] + lon_offset,
target_col: 0,
}
for col in feature_cols:
sample[col] = np.random.uniform(
*neg_feature_range
)
negatives.append(sample)
# -----------------------
# ENVIRONMENTAL_SIMILARITY:
# Nearest neighbor approach on
# specific feature vectors.
# -----------------------
elif strategy == "environmental_similarity":
if similarity_features is None:
raise ValueError(
"Specify similarity_features for "
"'environmental_similarity'."
)
nn = NearestNeighbors(n_neighbors=1)
nn.fit(df[similarity_features])
for _ in range(len(df) * num_neg_per_pos):
random_vector = np.random.uniform(
0, 1, size=len(similarity_features)
)
dist, idx = nn.kneighbors([random_vector])
ref = df.iloc[idx[0][0]]
sample = ref.copy()
sample[target_col] = 0
sample[lon_col] += np.random.uniform(
-buffer_deg, buffer_deg
)
sample[lat_col] += np.random.uniform(
-buffer_deg, buffer_deg
)
negatives.append(sample.to_dict())
# -----------------------
# ELEVATION_BASED:
# Pick negative samples from
# low slope areas, etc.
# -----------------------
elif strategy == "elevation_based":
if elevation_data is None:
raise ValueError(
"Provide elevation data for "
"'elevation_based' strategy."
)
low_slope_areas = elevation_data[
elevation_data["slope"] < buffer_km
]
sample_count = min(
len(df) * num_neg_per_pos, len(low_slope_areas)
)
for _, row in low_slope_areas.sample(
n=sample_count
).iterrows():
sample = row.to_dict()
sample[target_col] = 0
negatives.append(sample)
# -----------------------
# HYBRID:
# Combine partial sets of
# negative samples from
# different strategies.
# -----------------------
elif strategy == "hybrid":
part1 = gen_buffered_negative_samples(
df,
target_col,
spatial_cols,
feature_cols,
buffer_km,
neg_feature_range,
num_neg_per_pos // 2,
strategy="landslide",
gauge_data=None,
seed=seed,
verbose=0,
)
part2 = gen_buffered_negative_samples(
df,
target_col,
spatial_cols,
feature_cols,
buffer_km,
neg_feature_range,
num_neg_per_pos // 2,
strategy="gauge",
gauge_data=gauge_data,
seed=seed,
verbose=0,
)
combined = pd.concat(
[part1, part2], ignore_index=True
)
# Deduplicate based on spatial + feature + target columns
dedup_cols = (
spatial_cols + (feature_cols or []) + [target_col]
)
combined = combined.drop_duplicates(subset=dedup_cols)
if verbose >= 1:
print(
"[INFO] Final dataset after deduplication:"
f" {len(combined)} samples"
)
if view:
_visualize_negative_sampling(
df_combined=combined,
base_points=df
if strategy == "landslide"
else gauge_data,
strategy=strategy,
spatial_cols=spatial_cols,
target_col=target_col,
buffer_km=buffer_km,
title="Sample Generation via Hybrid Strategy",
)
return combined
# Create DataFrame of negative samples.
df_neg = pd.DataFrame(negatives)
# Copy df for positive samples.
df_pos = df.copy()
df_pos[target_col] = 1
# Combine positives and negatives.
combined = pd.concat([df_pos, df_neg], ignore_index=True)
if view:
_visualize_negative_sampling(
df_combined=combined,
base_points=df,
strategy=strategy,
spatial_cols=spatial_cols,
target_col=target_col,
buffer_km=buffer_km,
)
# Optionally print info about the results.
if verbose >= 1:
print(
f"[INFO] Generated {len(df_neg)} negative "
f"samples using strategy '{strategy}'"
)
print(
f"[INFO] Total dataset: {len(combined)} samples"
)
return combined
[docs]
@SaveFile
@isdf
def gen_buffered_negative_samples(
df: pd.DataFrame,
target_col: str,
spatial_cols: tuple[str, str] = ("longitude", "latitude"),
feature_cols: list[str] | None = None,
buffer_km: float = 10,
neg_feature_range: tuple[float, float] = (0, 5),
num_neg_per_pos: int = 1,
strategy: str = "landslide",
gauge_data: pd.DataFrame | None = None,
use_gpd: bool | str = "auto",
id_col="auto",
view: bool = False,
savefile: str | None = None,
seed: int | None = None,
verbose: int = 1,
) -> pd.DataFrame:
"""
Generate buffer-based negative samples around existing
points or gauge stations.
This function creates additional negative samples
for binary spatial events. It either takes an existing
landslide dataset (when `strategy` is `'landslide'`)
or a separate gauge dataset (if `strategy` is
`'gauge'`) to serve as the base points for generating
negatives within a circular buffer. The function
validates input columns and parameters via
`_validate_negatives_sampling` before constructing
synthetic samples.
Parameters
----------
df : pandas.DataFrame
The DataFrame containing positive event samples
(e.g., landslides). Must include `<target_col>`
and `<spatial_cols>`.
target_col : str
Name of the binary target column (1 for event,
0 for no event).
spatial_cols : tuple of str, default ('longitude', 'latitude')
Indicates which columns hold `<longitude>` and
`latitude` in `df`.
feature_cols : list of str, optional
Additional feature columns to simulate or copy
for negatives. If ``None``, all columns except
`<spatial_cols>` and `target_col` are used.
buffer_km : float, default 10
The radial distance in kilometers for sampling
negative points around each base point.
neg_feature_range : tuple of float, default (0, 5)
A numeric range from which feature values are
drawn for negative samples if the column
is numeric.
num_neg_per_pos : int, default 1
Number of negatives to generate per positive
(landslide) or gauge point.
strategy : str, default 'landslide'
Defines the base from which negative samples are generated.
Use ``'landslide'`` or ``'event'`` to sample around rows in
``df``. Use ``'gauge'`` to sample around rows in
``gauge_data``.
gauge_data : pandas.DataFrame, optional
Required if ``strategy`` is ``'gauge'``. Must
contain ``spatial_cols``.
use_gpd : bool or 'auto', default 'auto'
If ``'auto'``, attempts to use GeoPandas for
visualization if installed. Otherwise,
falls back to Matplotlib. This parameter
is forwarded to the underlying visualization
function.
id_col : str or list of str, default 'auto'
Column(s) representing IDs in ``df``. If
``'auto'``, the function tries to detect
possible ID columns. Used by
``_validate_negatives_sampling``.
view : bool, default False
Whether to visualize the sampled negatives
around the base points.
savefile : str, optional
If provided, saves the final combined dataset
(positives and negatives) to a CSV file at the
specified path.
seed : int, optional
Seed for NumPy's random generator, ensuring
reproducible offsets in negative sampling.
verbose : int, default 1
Controls console messages: ``1`` for minimal,
``2`` for more detailed logs.
Returns
-------
pandas.DataFrame
The combined dataset containing both the original
(positive) rows, labeled with ``target_col`` = 1,
and the newly generated negative rows, labeled
``target_col`` = 0.
Methods
-------
``_validate_negatives_sampling``
Validates required columns and parameters,
including ``num_neg_per_pos`` and
``neg_feature_range``.
``visualize_negative_sampling``
Generates a plot showing the negative samples
around the base points if ``view`` is True.
Notes
-----
- If ``strategy`` is ``'gauge'``, ``gauge_data`` must
be provided and contain columns ``longitude`` and
``latitude``.
- When ``view`` is True, circles are drawn to
illustrate the buffer radius.
- The ratio of 1 degree to roughly 111 km is an
approximation and can vary slightly by latitude.
Formally, a buffer in degrees
:math:`\\Delta` is computed by:
.. math::
\\Delta = \\frac{\\text{buffer\\_km}}{111},
where :math:`111` is an approximate km-per-degree
conversion factor. Each base point
:math:`(lat, lon)` spawns :math:`n` negatives, each
offset by :math:`\\delta_{lat}`, :math:`\\delta_{lon}`
drawn from :math:`U(-\\Delta, \\Delta)`.
Examples
--------
Below is an illustration of how to generate negative samples
around both existing event locations (strategy=`landslide`)
and separate gauge stations (strategy=`gauge`) using
``gen_buffered_negative_samples``.
First, we simulate a small DataFrame of positive
landslide samples with rainfall attributes, as well
as a separate DataFrame for gauge stations:
>>> import numpy as np
>>> import pandas as pd
>>> np.random.seed(42)
>>> positive_samples = pd.DataFrame({
... 'id': [1, 2, 3, 4, 5],
... 'latitude': np.random.uniform(24.0, 25.0, 5),
... 'longitude': np.random.uniform(113.0, 114.0, 5),
... 'rainfall_day_1': np.random.randint(10, 30, 5),
... 'rainfall_day_2': np.random.randint(10, 30, 5),
... 'rainfall_day_3': np.random.randint(10, 30, 5),
... 'rainfall_day_4': np.random.randint(10, 30, 5),
... 'rainfall_day_5': np.random.randint(10, 30, 5),
... 'landslide': [1]*5
... })
>>> gauge_data = pd.DataFrame({
... 'gauge_id': ['G1', 'G2', 'G3'],
... 'latitude': np.random.uniform(24.0, 25.0, 3),
... 'longitude': np.random.uniform(113.0, 114.0, 3)
... })
We then call ``gen_buffered_negative_samples`` to
produce negatives around these data using two
different strategies:
>>> from geoprior.utils.spatial_utils import gen_buffered_negative_samples
>>> # Generate negatives around landslide points
>>> results_landslide = generate_negative_samples_with(
... df=positive_samples,
... target_col='landslide',
... spatial_cols=('longitude', 'latitude'),
... feature_cols=[f'rainfall_day_{i+1}' for i in range(5)],
... buffer_km=10,
... num_neg_per_pos=1,
... strategy='landslide',
... verbose=1
... )
>>> # Generate negatives around the gauge stations
>>> results_gauge = gen_buffered_negative_samples(
... df=positive_samples,
... target_col='landslide',
... spatial_cols=('longitude', 'latitude'),
... feature_cols=[f'rainfall_day_{i+1}' for i in range(5)],
... buffer_km=10,
... num_neg_per_pos=1,
... strategy='gauge',
... gauge_data=gauge_data,
... verbose=1
... )
See Also
--------
generate_negative_samples:
Generate synthetic negative samples for spatial
binary classification tasks.
_validate_negatives_sampling:
Ensures inputs and parameters are correct.
visualize_negative_sampling :
Plots the positive and negative points for inspection.
"""
# See for reproducibility
strategy = (
"landslide"
if strategy.lower() in ("landslide", "event")
else strategy
)
np.random.seed(seed)
# Validate input and columns
checked_values = _validate_negative_sampling(
df=df,
target_col=target_col,
spatial_cols=spatial_cols,
feature_cols=feature_cols,
neg_feature_range=neg_feature_range,
num_neg_per_pos=num_neg_per_pos,
verbose=verbose,
id_col=id_col,
)
(
spatial_cols,
feature_cols,
neg_feature_range,
num_neg_per_pos,
) = checked_values
lon_col, lat_col = spatial_cols
if verbose >= 1:
print(
f"[INFO] Generating negative samples using strategy: {strategy}"
)
buffer_deg = buffer_km / 111.0
negatives = []
if strategy == "gauge":
if gauge_data is None:
raise ValueError(
"Gauge data must be provided for 'gauge' strategy."
)
base_points = (
df if strategy == "landslide" else gauge_data
)
if base_points is None or not all(
c in base_points.columns for c in spatial_cols
):
raise ValueError(
"Missing gauge data or invalid spatial columns."
)
for _, point in base_points.iterrows():
for _ in range(num_neg_per_pos):
lat_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
lon_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
new_lat = point[lat_col] + lat_offset
new_lon = point[lon_col] + lon_offset
sample = {
lat_col: new_lat,
lon_col: new_lon,
target_col: 0,
}
for col in feature_cols:
sample[col] = np.random.uniform(
*neg_feature_range
)
negatives.append(sample)
df_neg = pd.DataFrame(negatives)
df_pos = df.copy()
df_pos[target_col] = 1
combined = pd.concat([df_pos, df_neg], ignore_index=True)
if verbose >= 1:
print(
f"[INFO] Generated {len(df_neg)} negative samples"
)
print(
f"[INFO] Total dataset: {len(combined)} samples"
)
if view:
_visualize_negative_sampling(
df_combined=combined,
base_points=base_points,
strategy=strategy,
spatial_cols=spatial_cols,
target_col=target_col,
buffer_km=buffer_km,
)
return combined
def _visualize_negative_sampling(
df_combined,
base_points,
strategy,
spatial_cols,
target_col,
buffer_km=10,
s=50,
title=None,
):
lon_col, lat_col = spatial_cols
buffer_deg = buffer_km / 111.0
fig, ax = plt.subplots(figsize=(10, 8))
colors = {1: "red", 0: "green"}
labels = {1: "Positive", 0: "Negative"}
for val in [1, 0]:
subset = df_combined[df_combined[target_col] == val]
ax.scatter(
subset[lon_col],
subset[lat_col],
color=colors[val],
label=f"{labels[val]} Samples",
s=s,
)
for _, row in base_points.iterrows():
circle = Circle(
(row[lon_col], row[lat_col]),
buffer_deg,
color="blue",
alpha=0.2,
)
ax.add_patch(circle)
if strategy == "gauge":
ax.scatter(
base_points[lon_col],
base_points[lat_col],
color="blue",
label="Gauges",
marker="x",
s=+20,
)
if strategy == "clustered_negatives":
ax.scatter(
base_points[lon_col],
base_points[lat_col],
color="blue",
label="Cluster Centers",
marker="P",
s=s + 30,
)
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.set_title(
title
or f"Negative Sample Generation Strategy: {strategy}"
)
ax.legend(
handles=[
Patch(color="red", label="Positive Samples"),
Patch(color="green", label="Negative Samples"),
Patch(
color="blue",
alpha=0.2,
label=f"{buffer_km} km Buffer",
),
]
)
ax.grid(True, **{"linestyle": ":", "alpha": 0.7})
plt.tight_layout()
plt.show()
[docs]
@SaveFile
@isdf
def gen_negative_samples(
df: DataFrame,
target_col: str,
spatial_cols: tuple[str, str] = ("longitude", "latitude"),
feature_cols: list[str] | None = None,
buffer_km: float = 10,
neg_feature_range: tuple[float, float] = (0, 5),
num_neg_per_pos: int = 1,
use_gpd: bool | str = "auto",
view: bool = False,
savefile: str | None = None,
verbose: int = 1,
):
r"""
Generate synthetic negative samples for spatial binary
classification tasks.
This function creates additional samples labeled as
non-events within a specified spatial buffer around
the positive (event) observations. The main idea is to
generate negative examples that reflect realistic
conditions but have not triggered an event, thereby
assisting models in distinguishing occurrences from
non-occurrences.
Parameters
----------
df : pandas.DataFrame
Input DataFrame containing the positive samples
(events). Must include both the target column
and the specified spatial columns.
target_col : str
Column name for the binary target (e.g.
`landslide`). Rows where this column is
1 (or True) are considered positive samples.
spatial_cols : tuple of str, default ('longitude', 'latitude')
Tuple specifying the longitude> and latitude
column names in `df`.
feature_cols : list of str or None, default None
List of feature columns to use or to simulate
for generated negatives. If ``None``, the
function automatically detects numeric and
categorical columns excluding `spatial_cols`
and `target_col`.
buffer_km : float, default 10
Spatial buffer in kilometers used to define
the radius around each positive sample
within which negative samples are created.
neg_feature_range : tuple of int, default (0, 5)
Value range (minimum, maximum) used for
simulating numeric feature values in negative
samples if the corresponding feature column
does not exist in `<df>`.
num_neg_per_pos : int, default 1
Number of negative samples to generate
per positive sample. For instance, if
``num_neg_per_pos=2``, each positive sample
spawns two negatives.
use_gpd : str, default 'auto'
If set to 'auto', the function tries to
import GeoPandas for visualization. If
`'none'`, no GeoPandas usage will occur.
view : bool, default False
Whether to visualize the generated samples
on a map. Attempts to use `geopandas` if
installed; falls back to `matplotlib` if
`'auto'` is chosen and GeoPandas is not
available.
savefile : str or None, default None
Path to which the resulting DataFrame is
saved if provided. Handled by the decorator
that wraps this function.
verbose : int, default 1
Verbosity level. `0` for silent,
`1` for progress indication, `2` for
more messages, `3` for debugging output.
Returns
-------
pandas.DataFrame
Combined DataFrame with both original positive
samples and newly generated negative samples. The
`<target_col>` is 1 for positives and 0 for negatives.
Methods
-------
`columns_manager`
This internal function is used to handle the
processing of columns for features and
spatial parameters.
Notes
-----
- If a feature column exists in `df`, the negative
samples will copy or randomly select categories for
categorical columns, and sample integers within
``neg_feature_range`` for numeric columns.
- If `feature_cols` is empty or does not exist
in `df`, the function simulates all values for
negative samples.
- When `view=True`, circles depicting the buffer
zone around each positive sample are drawn for
visualization.
- The approximation of 1 degree to roughly 111 km
varies slightly depending on latitude.
Mathematically, we define the spatial buffer in degrees
as:
.. math::
\\Delta = \\frac{\\text{buffer_km}}{111.0},
where :math:`111.0` km approximates the distance of
one degree of latitude or longitude. For each
positive sample at location :math:`(lat, lon)`,
we generate :math:`n` new points with offsets
:math:`\\delta_{lat}` and :math:`\\delta_{lon}`, each
drawn from a uniform distribution
:math:`U(-\\Delta, \\Delta)`:
.. math::
\\begin{aligned}
&lat_{new} = lat + \\delta_{lat},\\\\
&lon_{new} = lon + \\delta_{lon}.
\\end{aligned}
Combined with randomly sampled or inferred feature
values, these new samples serve as negative examples
for modeling tasks such as landslide prediction.
Examples
--------
>>> from geoprior.utils.spatial_utils import gen_negative_samples
>>> import pandas as pd
>>> import numpy as np
>>> df_pos = pd.DataFrame({
... 'latitude': np.random.uniform(24.0, 25.0, 5),
... 'longitude': np.random.uniform(113.0, 114.0, 5),
... 'rainfall_day_1': np.random.randint(10, 30, 5),
... 'rainfall_day_2': np.random.randint(10, 30, 5),
... 'rainfall_day_3': np.random.randint(10, 30, 5),
... 'rainfall_day_4': np.random.randint(10, 30, 5),
... 'rainfall_day_5': np.random.randint(10, 30, 5),
... 'landslide': 1
... })
>>> combined = gen_negative_samples(
... df=df_pos,
... target_col='landslide',
... buffer_km=10,
... num_neg_per_pos=2,
... view=False,
... verbose=2
... )
>>> print(combined.head())
See Also
--------
check_spatial_columns : Ensures the existence of
required spatial columns.
exist_features : Verifies the presence of
specified features in `<df>`.
columns_manager : Handles both feature and spatial
columns for processing.
"""
# Validate input and columns
checked_values = _validate_negative_sampling(
df=df,
target_col=target_col,
spatial_cols=spatial_cols,
feature_cols=feature_cols,
neg_feature_range=neg_feature_range,
num_neg_per_pos=num_neg_per_pos,
verbose=verbose,
)
(
spatial_cols,
feature_cols,
neg_feature_range,
num_neg_per_pos,
) = checked_values
lon_col, lat_col = spatial_cols
# Check GeoPandas
HAS_GPD = False
try:
import geopandas as gpd
from shapely.geometry import Point
HAS_GPD = True
except ImportError:
HAS_GPD = False
# Buffer conversion (km -> deg)
buffer_deg = buffer_km / 111.0
if verbose >= 2:
print(
f"[INFO] Generating negative samples within "
f"{buffer_km} km buffer zones."
)
if verbose >= 3:
print(f"[DEBUG] Buffer in degrees: {buffer_deg:.4f}")
# Generate negative samples per row
negative_samples = []
iterator = df.iterrows()
if verbose >= 1 and HAS_TQDM:
iterator = tqdm(
iterator,
total=len(df),
desc="Generating negatives",
)
for _, row in iterator:
for _ in range(num_neg_per_pos):
lat_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
lon_offset = np.random.uniform(
-buffer_deg, buffer_deg
)
new_lat = row[lat_col] + lat_offset
new_lon = row[lon_col] + lon_offset
sample = {
lat_col: new_lat,
lon_col: new_lon,
target_col: 0,
}
# Simulate or copy feature columns for negs
for col in feature_cols:
# Categorical features
if (
df[col].dtype == "object"
or df[col].dtype.name == "category"
):
unique_vals = df[col].dropna().unique()
if len(unique_vals) > 0:
sample[col] = np.random.choice(
unique_vals
)
else:
sample[col] = None
# Numeric features
elif np.issubdtype(df[col].dtype, np.number):
sample[col] = np.random.randint(
neg_feature_range[0],
neg_feature_range[1] + 1,
)
else:
sample[col] = None
negative_samples.append(sample)
df_negative = pd.DataFrame(negative_samples)
# Label positive set and combine
df_positive = df.copy()
df_positive[target_col] = 1
combined = pd.concat(
[df_positive, df_negative], ignore_index=True
)
if verbose >= 2:
print(
f"\n[INFO] Final dataset: {len(df_positive)} "
f"positive and {len(df_negative)} negative "
f"samples."
)
# Optional View/Plot
if view:
if (use_gpd == "auto") and HAS_GPD:
if verbose >= 2:
print("[INFO] Visualizing with GeoPandas.")
geometry = [
Point(xy)
for xy in zip(
combined[lon_col],
combined[lat_col],
strict=False,
)
]
gdf = gpd.GeoDataFrame(
combined, geometry=geometry
)
fig, ax = plt.subplots(figsize=(10, 8))
for label, color, name in zip(
[1, 0],
["red", "green"],
["Positive", "Negative"],
strict=False,
):
gdf[gdf[target_col] == label].plot(
ax=ax,
color=color,
label=f"{name} Sample",
markersize=50,
)
for _, row in df_positive.iterrows():
circle = plt.Circle(
(row[lon_col], row[lat_col]),
buffer_deg,
color="blue",
alpha=0.2,
)
ax.add_patch(circle)
else:
if verbose >= 2:
print("[INFO] Visualizing with Matplotlib.")
fig, ax = plt.subplots(figsize=(10, 8))
for label, color, name in zip(
[1, 0],
["red", "green"],
["Positive", "Negative"],
strict=False,
):
subset = combined[
combined[target_col] == label
]
ax.scatter(
subset[lon_col],
subset[lat_col],
color=color,
label=f"{name} Sample",
s=50,
)
for _, row in df_positive.iterrows():
circle = plt.Circle(
(row[lon_col], row[lat_col]),
buffer_deg,
color="blue",
alpha=0.2,
)
ax.add_patch(circle)
ax.set_title(
"Negative Sample Generation with Spatial Buffers"
)
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.legend(
handles=[
Patch(color="red", label="Positive Samples"),
Patch(
color="green", label="Negative Samples"
),
Patch(
color="blue",
alpha=0.2,
label=f"{buffer_km} km Buffer",
),
]
)
ax.grid(True, **{"linestyle": ":", "alpha": 0.7})
plt.tight_layout()
plt.show()
return combined
def _validate_negative_sampling(
df: DataFrame,
target_col: str,
spatial_cols: tuple[str, str] = ("longitude", "latitude"),
feature_cols: list[str] | None = None,
neg_feature_range: tuple[float, float] = (0, 5),
num_neg_per_pos: int = 1,
id_col: str | None = "auto",
verbose: int = 0,
):
"""
Helper that validates input parameters and column configurations
for the generation of negative samples. Ensures required columns exist,
processes feature and spatial columns, and checks numeric parameters
like `neg_feature_range` and `num_neg_per_pos`.
"""
# Validate input and columns
exist_features(
df, features=target_col, name=f"Target '{target_col}'"
)
feature_cols = columns_manager(feature_cols)
spatial_cols = columns_manager(spatial_cols)
check_spatial_columns(df, spatial_cols=spatial_cols)
exist_features(
df, features=spatial_cols, name="Spatial columns"
)
neg_feature_range = validate_length_range(
neg_feature_range, param_name="neg_feature_range"
)
num_neg_per_pos = validate_positive_integer(
num_neg_per_pos,
"num_neg_per_pos",
)
# detect id columns if auto
if str(id_col).lower() == "auto":
id_col = find_id_column(
df, strategy="naive", errors="ignore"
)
id_col = columns_manager(id_col, empty_as_none=False)
# If no feature_cols provided, auto-detect from remaining columns
# including
if feature_cols is None:
excluded_cols = (
list(spatial_cols) + [target_col] + id_col
)
feature_cols = [
col
for col in df.columns
if col not in excluded_cols
]
if verbose >= 2:
print(
f"[INFO] Auto-detected feature_cols:"
f" {feature_cols}"
)
feature_cols = columns_manager(
feature_cols, empty_as_none=False
)
if len(feature_cols) == 0:
raise ValueError(
"No feature columns found. Please "
"specify `feature_cols` explicitly."
)
# Warn if a feature column does not exist in df
for col in feature_cols:
if col not in df.columns and verbose >= 2:
print(
f"[WARN] Feature column '{col}' not "
"found in df; it will be simulated."
)
return (
spatial_cols,
feature_cols,
neg_feature_range,
num_neg_per_pos,
)
[docs]
@SaveFile
@isdf
def create_spatial_clusters(
df: pd.DataFrame,
spatial_cols: list[str] | None = None,
cluster_col: str = "region",
n_clusters: int | None = None,
algorithm: str = "kmeans",
view: bool = True,
figsize: tuple = (14, 10),
s: int = 60,
plot_style: str = "seaborn",
cmap: str = "tab20",
show_grid: bool = True,
grid_props: dict = None,
auto_scale: bool = True,
savefile: str | None = None,
verbose: int = 1,
**kwargs,
) -> pd.DataFrame:
"""
Cluster 2D spatial data in ``df`` using `<algorithm>`
and optionally plot the results.
This function, `<create_spatial_clusters>`, extracts
two coordinate columns from `<df>` to form clusters
via methods such as 'kmeans', 'dbscan', or 'agglo'
(agglomerative). It uses the function
`filter_valid_kwargs` (when relevant) to strip out
invalid parameters for certain estimators, and
writes cluster labels into `<cluster_col>`.
Parameters
----------
df : pandas.DataFrame
Input DataFrame holding spatial coordinates
and optional other fields.
spatial_cols : list of str, optional
Two-column list for x and y coordinates.
Defaults to ``['longitude','latitude']`` if
None.
cluster_col : str, default='region'
Name of the column to store the assigned
cluster labels.
n_clusters : int, optional
Number of clusters to form. If not provided
for KMeans, it is auto-detected. For DBSCAN
or Agglomerative, a warning is issued if not
set.
algorithm : str, default='kmeans'
Choice of clustering algorithm among
``['kmeans','dbscan','agglo']``.
view : bool, default=True
If True, displays a scatterplot of the final
clusters.
figsize : tuple, default=(14, 10)
Size of the displayed figure for the
cluster plot.
s : int, default=60
Marker size in the scatterplot.
plot_style : str, default='seaborn'
Matplotlib style used for the plot.
cmap : str, default='tab20'
Colormap name used to differentiate clusters.
show_grid : bool, default=True
Toggles grid lines on or off.
grid_props : dict, optional
Additional keyword arguments controlling
the grid style.
auto_scale : bool, default=True
If True, standardize coordinates before
clustering.
savefile : str, optional
File path to save the data with an additional
`<cluster_col>` storing the assigned
cluster labels if desired.
verbose : int, default=1
Controls console logs. Higher values yield
more details about scaling and cluster
detection.
**kwargs
Additional keyword arguments passed to the
chosen algorithm (filtered by
`filter_valid_kwargs` for KMeans, DBSCAN,
AgglomerativeClustering ).
Returns
-------
pandas.DataFrame
A copy of `<df>` with an additional
`<cluster_col>` storing the assigned
cluster labels.
Notes
-----
If `<auto_scale>` is True, it uses a standard
scaler to normalize the coordinate columns. The
scatterplot is generated using the library
seaborn for enhanced styling.
By default, for `<algorithm>` = "kmeans", the model
attempts to minimize:
.. math::
J = \\sum_{i=1}^{N} \\min_{\\mu_j} \\lVert x_i
- \\mu_j \\rVert^2
where :math:`x_i` are the scaled or raw 2D
coordinates in `<df>`. The function can optionally
auto-detect ``n_clusters`` using a silhouette and
elbow analysis if not provided.
Examples
--------
>>> from geoprior.utils.spatial_utils import create_spatial_clusters
>>> import pandas as pd
>>> df = pd.DataFrame({
... "longitude": [0.1, 0.2, 2.2, 2.3],
... "latitude": [1.0, 1.1, 2.1, 2.2]
... })
>>> # KMeans with auto scale and auto-detect k
>>> result = create_spatial_clusters(
... df=df,
... algorithm="kmeans",
... view=True
... )
>>> # DBSCAN with custom arguments
>>> result_db = create_spatial_clusters(
... df=df,
... algorithm="dbscan",
... eps=0.5,
... min_samples=2
... )
See Also
--------
filter_valid_kwargs : Helps discard unsupported
keyword arguments for chosen estimators.
"""
# Confirm required columns exist in DataFrame
# This prevents missing key data issues
if spatial_cols is None:
spatial_cols = ["longitude", "latitude"]
assert all(col in df.columns for col in spatial_cols), (
"Missing spatial columns"
)
assert len(spatial_cols) == 2, (
f"Need exactly 2 spatial columns. Got {len(spatial_cols)}"
)
assert algorithm in ["kmeans", "dbscan", "agglo"], (
"Invalid algorithm. Expect one of ['kmeans', 'dbscan', 'agglo']."
)
# Use requested plotting style
plt.style.use(plot_style)
# Copy DataFrame to avoid modification
local_df = df.copy()
# Extract coordinates from the spatial columns
coords = local_df[spatial_cols].values
# Debug info about data shape if verbosity is high
if verbose >= 2:
print(f"DataFrame shape: {local_df.shape}")
print("Initial coords sample:", coords[:5])
# Scale coordinates to standardize range, if requested
if auto_scale:
if verbose >= 1:
print("Scaling coordinates...")
scaler = StandardScaler()
coords = scaler.fit_transform(coords)
# Debug info about scaled coords if verbosity is high
if verbose >= 2:
print("Scaled coords sample:", coords[:5])
# Determine an optimal number of clusters if none provided
if n_clusters is None:
if algorithm == "kmeans":
if verbose >= 1:
print(
"Auto-detecting optimal number of clusters..."
)
n_clusters = _auto_detect_k(
coords=coords,
verbose=verbose,
show_grid=show_grid,
grid_props=grid_props,
)
else:
# Warn if user expects auto-detection with non-kmeans
warnings.warn(
"Auto-cluster detection only supported for KMeans",
stacklevel=2,
)
# Notify user about the clustering approach
if verbose >= 1:
print(f"Clustering with {algorithm.upper()}...")
# Initialize the selected clustering algorithm
if algorithm == "kmeans":
kwargs = filter_valid_kwargs(KMeans, kwargs)
clusterer = KMeans(
n_clusters=n_clusters, random_state=42, **kwargs
)
elif algorithm == "dbscan":
kwargs = filter_valid_kwargs(DBSCAN, kwargs)
clusterer = DBSCAN(**kwargs)
else: # algorithm == 'agglo'
kwargs = filter_valid_kwargs(
AgglomerativeClustering, kwargs
)
clusterer = AgglomerativeClustering(
n_clusters=n_clusters, **kwargs
)
# Fit the model and predict cluster labels
labels = clusterer.fit_predict(coords)
local_df[cluster_col] = labels
# Debug info about clustering output if verbosity is high
if verbose >= 2:
unique_labels = np.unique(labels)
print(f"Unique cluster labels: {unique_labels}")
# Plot the clusters if requested
if view:
_plot_clusters(
df=local_df,
spatial_cols=spatial_cols,
cluster_col=cluster_col,
figsize=figsize,
cmap=cmap,
algorithm=algorithm,
s=s,
show_grid=show_grid,
grid_props=grid_props,
)
# Return the DataFrame with assigned cluster labels
return local_df
def _auto_detect_k(
coords: np.ndarray,
verbose: int,
max_k: int = 10,
show_grid: bool = True,
grid_props: dict = None,
) -> int:
# Evaluate multiple k values using
# elbow (distortion) and silhouette scores
distortions = []
silhouettes = []
K_range = range(2, max_k + 1)
if verbose >= 1:
print(f"Evaluating k from 2 to {max_k}...")
for k in K_range:
kmeans = KMeans(
n_clusters=k, random_state=42, n_init="auto"
)
kmeans.fit(coords)
distortions.append(kmeans.inertia_)
silhouettes.append(
silhouette_score(coords, kmeans.labels_)
)
# Detailed iteration log if verbosity is high
if verbose >= 2:
output = "k={:<5} | Distortion={:^28} | Silhouette={:^20}".format(
k,
f"{distortions[-1]:.3f}",
f"{silhouettes[-1]:.3f}",
)
print(output)
# Plot elbow and silhouette analyses
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
ax1.plot(K_range, distortions, "bo-")
ax1.set(
xlabel="Number of clusters",
ylabel="Distortion",
title="Elbow Method",
)
ax2.plot(K_range, silhouettes, "go-")
ax2.set(
xlabel="Number of clusters",
ylabel="Silhouette Score",
title="Silhouette Analysis",
)
# Enable gridlines
if grid_props is None:
grid_props = {"linestyle": ":", "alpha": 0.7}
if show_grid:
ax1.grid(show_grid, **grid_props)
else:
ax1.grid(show_grid)
if show_grid:
ax2.grid(show_grid, **grid_props)
else:
ax2.grid(show_grid)
plt.tight_layout()
plt.show()
# Use silhouette score peak to suggest optimal k
optimal_k = np.argmax(silhouettes) + 2
if verbose >= 1:
print(f"Suggested optimal k: {optimal_k}")
return optimal_k
def _plot_clusters(
df: pd.DataFrame,
spatial_cols: list[str],
cluster_col: str,
figsize: tuple,
cmap: str,
s: int,
algorithm: str,
show_grid: bool = True,
grid_props: dict = None,
) -> None:
import seaborn as sns
# Create a scatterplot to visualize clustered data
plt.figure(figsize=figsize)
ax = sns.scatterplot(
x=spatial_cols[0],
y=spatial_cols[1],
hue=cluster_col,
palette=cmap,
data=df,
s=s,
edgecolor="k",
linewidth=0.5,
alpha=0.8,
legend="auto",
)
# Professional labeling and presentation
plt.title(
f"{algorithm.upper()} Clustering - "
f"{df[cluster_col].nunique()} Clusters",
fontsize=14,
pad=20,
)
plt.xlabel(spatial_cols[0], fontsize=12)
plt.ylabel(spatial_cols[1], fontsize=12)
# Enable gridlines
if show_grid:
plt.grid(
show_grid,
**(
grid_props or {"linestyle": ":", "alpha": 0.7}
),
)
else:
plt.grid(show_grid)
# Adjust and position legend
plt.legend(
title="Cluster",
bbox_to_anchor=(1.05, 1),
loc="upper left",
)
# Annotate cluster labels near their median positions
for cluster in df[cluster_col].unique():
# Skip noise cluster (-1) for DBSCAN
if cluster == -1:
continue
median_position = df[df[cluster_col] == cluster][
spatial_cols
].median()
plt.text(
median_position[0],
median_position[1],
str(cluster),
fontdict=dict(weight="bold", size=10),
bbox=dict(
facecolor="white", alpha=0.8, edgecolor="none"
),
)
# Hide top/right spines, tighten layout
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.figure.tight_layout()
# Display the final clustering plot
plt.show()
[docs]
@validate_params(
{
"threshold": [Interval(Real, 0, 1, closed="neither")],
"error": [StrOptions({"raise", "warn", "ignore"})],
"pos": [Real, "array-like"],
}
)
@isdf
def filter_position(
df,
pos,
pos_cols=None,
find_closest=True,
threshold=0.01,
error="raise",
verbose=0,
):
"""
filter_position is a utility that filters a
pandas.DataFrame based on user-specified spatial
positions. It can match positions exactly or compute
distances to find the closest points within a threshold.
For a single dimension, the distance is computed as:
.. math::
d = |x - p|
For multi-dimensional data with n coordinates, the
Euclidean distance is computed as:
.. math::
d = \\sqrt{\\sum_{i=1}^n (x_i - p_i)^2}
Parameters
------------
df : pandas.DataFrame
The DataFrame that will be filtered. This parameter
is essential and must contain columns referenced by
`pos_cols` if ``pos_cols`` is not None.
pos : float or tuple of floats
The reference position(s) to match or approximate.
When `pos_cols` is None, `pos` is interpreted as an
index value. Otherwise, each value in `pos` aligns
with a specific column in `pos_cols`.
pos_cols : str or tuple of str, optional
Name(s) of the column(s) in `df` to match against
`pos`. If ``pos_cols=None``, then `pos` is treated
as an index filter. If multiple columns are given
(e.g., latitude and longitude), each coordinate in
`pos` should correspond to one column in `pos_cols`.
find_closest : bool, optional
If True, nearest-neighbor filtering is performed
within the distance `threshold`. If False, exact
matches are used.
threshold : float, optional
The maximum distance within which points are
considered a match if `find_closest` is True. The
unit corresponds to the column data (e.g., degrees
for geographic lat/lon).
error : {'raise', 'warn', 'ignore'}, optional
Specifies how to handle dimension mismatches or
missing values. If ``'raise'``, a ValueError will be
raised. If ``'warn'``, a warning is printed and extra
values are ignored. If ``'ignore'``, mismatches are
silently ignored.
verbose : int, optional
Controls the level of output messages:
- 0: No output
- 1: Basic info
- 2: Additional details
- >=3: Comprehensive summary
Returns
-------
pandas.DataFrame
A new DataFrame that contains only rows matching or
approximating the specified position(s) within the
given threshold if `find_closest` is True.
Notes
-----
When `pos_cols` is None, the function attempts to filter
by DataFrame index using the first element of `pos`. This
approach may fail for multi-level indexes unless
``error='warn'`` or ``error='ignore'`` is used to bypass
the dimension mismatch.
Examples
--------
>>> from geoprior.utils.spatial_utils import filter_position
>>> import pandas as pd
>>> df = pd.DataFrame({
... 'lat': [113.309998, 113.310001],
... 'lon': [22.831362, 22.831364]
... })
>>> # Exact match
>>> result_exact = filter_position(df, pos=(113.309998,
... 22.831362),
... pos_cols=('lat', 'lon'),
... find_closest=False)
>>> # Nearest match with threshold
>>> result_close = filter_position(df,
... pos=(113.31,
... 22.83),
... pos_cols=('lat',
... 'lon'),
... find_closest=True,
... threshold=0.01)
See Also
--------
geoprior.utils.data_utils.truncate_data:
Truncate multiple DataFrames based on spatial
coordinates or index alignment with a base DataFrame.
"""
# Initialize filtered_df as the original DataFrame
filtered_df = df.copy()
# Convert pos to tuple if it's not already
if not isinstance(pos, tuple | list):
pos = (pos,)
# Helper function to handle errors
def _handle_error(msg, ex_msg=""):
if error == "raise":
raise ValueError(msg)
elif error == "warn":
warnings.warn(f"{msg}{ex_msg}", stacklevel=2)
# if error == 'ignore', do nothing
# If no position columns specified,
# treat pos as DataFrame index
if pos_cols is None:
if len(pos) > 1:
_handle_error(
"Multiple position values provided but no pos_cols. "
"Unable to match multi-level index. ",
"Using the first position value only.",
)
idx = pos[
0
] # Consider only the first if multiple are given
if idx not in df.index:
_handle_error(
f"Index '{idx}' not found in DataFrame. ",
"Returning empty DataFrame.",
)
return filtered_df.iloc[
0:0
] # Return empty DF if not found
# Filter by index
filtered_df = filtered_df.loc[[idx]]
return filtered_df
# Ensure pos_cols is tuple for consistency
if not isinstance(pos_cols, tuple | list):
pos_cols = (pos_cols,)
pos_cols = columns_manager(pos_cols)
# Check dimension match between pos and pos_cols
if len(pos) != len(pos_cols):
# If mismatch, handle based on error param
_handle_error(
f"pos ({len(pos)}) and pos_cols ({len(pos_cols)}) lengths "
"do not match.",
" Extra values or columns will be ignored.",
)
# If ignoring or warning, align up to the shortest length
min_len = min(len(pos), len(pos_cols))
pos = pos[:min_len]
pos_cols = pos_cols[:min_len]
# If find_closest is disabled, do exact matching
if not find_closest:
# Build query for exact match
mask = True
for p_val, col in zip(pos, pos_cols, strict=False):
mask &= filtered_df[col] == p_val
filtered_df = filtered_df[mask]
return filtered_df
# If find_closest is enabled, use threshold-based filtering
# Compute distance (Euclidean if multiple columns, absolute if single)
if len(pos_cols) == 1:
# Single dimension: absolute difference
col = pos_cols[0]
distance = (filtered_df[col] - pos[0]).abs()
filtered_df = filtered_df[distance <= threshold]
else:
# Multi-dimensional: Euclidean distance
# Summation of squared diffs for each column
squared_diff = 0
for p_val, col in zip(pos, pos_cols, strict=False):
squared_diff += (filtered_df[col] - p_val) ** 2
distance = squared_diff**0.5
filtered_df = filtered_df[distance <= threshold]
# If verbose, provide basic info
if verbose >= 1:
print(
f"Filtered {len(df)} rows to {len(filtered_df)} rows "
f"within threshold {threshold}."
)
# If verbose >= 2, show sample of distance
if verbose >= 2 and not filtered_df.empty:
print("Example distance values within threshold:")
print(distance[distance <= threshold].head())
return filtered_df
@SaveFile
def dual_merge(
df1: pd.DataFrame,
df2: pd.DataFrame,
feature_cols: list | tuple = (
"longitude",
"latitude",
),
find_closest: bool = False,
force_coords: bool = False,
threshold: float = 0.01,
how: str = "inner",
savefile: str | None = None,
) -> pd.DataFrame:
"""
Merge two DataFrames based on specified feature columns. The function
can match the features exactly or find the closest matches within a
specified distance threshold. It also allows for overwriting coordinates
or feature values from one DataFrame to another when a close match is found.
Parameters
----------
df1 : pd.DataFrame
The first DataFrame to be merged. It contains the primary data
along with the feature columns (e.g., longitude, latitude) to be
merged on.
df2 : pd.DataFrame
The second DataFrame to be merged. It contains the data to be
matched with `df1` based on the specified feature columns.
feature_cols : tuple or list, default ``('longitude', 'latitude')``
The names of the columns in each DataFrame to merge on. It should
contain two columns representing features, such as coordinates
(longitude, latitude) or other relevant attributes. These columns
will be used to match the rows from `df1` to `df2`.
find_closest : bool, default ``False``
If ``True``, the function will attempt to find the closest points
in ``df2`` for each point in ``df1`` within the specified distance
threshold (`threshold`). If no exact match is found, the closest
point within the threshold will be considered.
force_coords : bool, default ``False``
If ``True``, when the closest points are found, the coordinates
of `df1` will overwrite those of `df2` for the matched points.
threshold : float, default ``0.01``
The maximum distance threshold within which points will be considered
as "close" for the closest point matching. The value is in the same
unit as the feature columns (e.g., degrees for latitude/longitude).
how : str, default ``'inner'``
The type of merge to perform. Options include:
- ``'inner'``: Only includes points that appear in both DataFrames.
- ``'left'``: All points from `df1` are included; unmatched points
from `df2` are excluded.
- ``'right'``: All points from `df2` are included; unmatched points
from `df1` are excluded.
- ``'outer'``: Includes all points from both DataFrames, with NaN
for unmatched points.
Returns
-------
pd.DataFrame
A DataFrame containing the merged data based on the specified
feature columns and merge type. If ``find_closest=True``, it will
contain the closest matches within the specified threshold, with
coordinates from ``df1`` overwritten if ``force_coordinates=True``.
Notes
-----
- This function uses a KDTree for efficient nearest-neighbor searching
when ``find_closest=True``. This is useful when dealing with large
datasets that may not have exact coordinate matches.
- When ``force_coordinates=True``, the coordinates from ``df1`` will
overwrite those from ``df2`` for the closest points. However, other
feature values will be kept from ``df2``.
Examples
--------
>>> import pandas as pd
>>> from geoprior.utils.datautils import dual_merge
>>> df1 = pd.DataFrame({
>>> 'longitude': [1.1, 1.2, 1.3],
>>> 'latitude': [2.1, 2.2, 2.3],
>>> 'value1': [10, 20, 30]
>>> })
>>> df2 = pd.DataFrame({
>>> 'longitude': [1.1, 1.4],
>>> 'latitude': [2.1, 2.4],
>>> 'value2': [100, 200]
>>> })
>>> result = dual_merge(df1, df2, feature_cols=('longitude', 'latitude'),
>>> find_closest=True, threshold=0.05)
>>> print(result)
longitude latitude value1 value2
0 1.1 2.1 10 100
See Also
--------
scipy.spatial.cKDTree: Used for finding the closest points in `df2`
when ``find_closest=True``.
pandas.merge: pandas.DataFrame.merge
The pandas merge function, used to merge DataFrames based on columns.
"""
# Ensure feature_cols are valid and both contain two elements
feature_cols = columns_manager(
feature_cols, empty_as_none=False
)
if len(feature_cols) != 2:
raise ValueError(
"feature_cols must contain exactly two features (e.g., longitude, latitude)"
)
# check wether df1 and df2 are both dataframes.
are_all_frames_valid(df1, df2)
# Extract columns from feature_cols for both DataFrames
feature1_1, feature1_2 = feature_cols
feature2_1, feature2_2 = feature_cols
# Filter for relevant columns in both DataFrames
df1_coords = df1[[feature1_1, feature1_2]]
df2_coords = df2[[feature2_1, feature2_2]]
if find_closest:
# Use KDTree for fast nearest-neighbor search
tree = cKDTree(df2_coords.values)
# Query for the closest points in df2 for each point in df1
dist, indices = tree.query(
df1_coords.values, distance_upper_bound=threshold
)
# Filter out points that couldn't find a close match
valid_idx = dist != np.inf
df1_coords_closest = df1_coords.iloc[valid_idx]
df2_coords_closest = df2_coords.iloc[
indices[valid_idx]
]
if force_coords:
# Force coordinates of df1 to overwrite df2
df2_coords_closest[feature2_1] = (
df1_coords_closest[feature1_1].values
)
df2_coords_closest[feature2_2] = (
df1_coords_closest[feature1_2].values
)
# Update df1 with the closest matches from df2
df1 = df1.iloc[valid_idx]
df2 = df2.iloc[indices[valid_idx]]
# Perform the merge based on the feature columns
merged_data = pd.merge(
df1,
df2,
how=how,
left_on=[feature1_1, feature1_2],
right_on=[feature2_1, feature2_2],
)
return merged_data
@Deprecated(
reason=(
"This function is deprecated and will be removed in future versions. "
"Please use `extract_coordinates` instead, which provides enhanced "
"flexibility and robustness for coordinates extraction."
)
)
@isdf
def get_xy_coordinates(
df,
as_frame=False,
drop_xy=False,
raise_exception=True,
verbose=0,
):
"""Check whether the coordinate values x, y exist in the data.
Parameters
------------
df: Dataframe
Frame that is expected to contain the longitude/latitude or
easting/northing coordinates. Note if all types of coordinates are
included in the data frame, the longitude/latitude takes the
priority.
as_frame: bool, default= False,
Returns the coordinates values if included in the data as a frame rather
than computing the middle points of the line
drop_xy: bool, default=False,
Drop the coordinates in the data and return the data transformed inplace
raise_exception: bool, default=True
raise error message if data is not a dataframe. If set to ``False``,
exception is converted to a warning instead. To mute the warning set
`raise_exception` to ``mute``
verbose: int, default=0
Send message whether coordinates are detected.
returns
--------
xy, d, xynames: Tuple
xy : tuple of float ( longitude, latitude) or (easting/northing )
if `as_frame` is set to ``True``.
d: Dataframe transformed (coordinated removed ) or not
xynames: str, the name of coordinates detected.
Examples
----------
>>> import gofast as gf
>>> from geoprior.utils.spatial_utils import get_xy_coordinates
>>> testdata = gf.datasets.make_erp ( samples =7, seed =42 , as_frame=True)
>>> xy, d, xynames = get_xy_coordinates ( testdata, )
>>> xy , xynames
((110.48627946874444, 26.051952363176344), ('longitude', 'latitude'))
>>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True )
>>> xy.head(2)
longitude latitude easting northing
0 110.485833 26.051389 448565.380621 2.881476e+06
1 110.485982 26.051577 448580.339199 2.881497e+06
>>> # remove longitude and lat in data
>>> testdata = testdata.drop (columns =['longitude', 'latitude'])
>>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True )
>>> xy.head(2)
easting northing
0 448565.380621 2.881476e+06
1 448580.339199 2.881497e+06
>>> # note testdata should be transformed inplace when drop_xy is set to True
>>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True)
>>> xy, xynames
((448610.25612032827, 2881538.4380570543), ('easting', 'northing'))
>>> d.head(2)
station resistivity
0 0.0 1.0
1 20.0 167.5
>>> testdata.head(2) # coordinates are henceforth been dropped
station resistivity
0 0.0 1.0
1 20.0 167.5
>>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True)
>>> xy, xynames
(None, ())
>>> d.head(2)
station resistivity
0 0.0 1.0
1 20.0 167.5
"""
def get_value_in(val, col, default):
"""Get the value in the frame columns if `val` exists in"""
x = list(filter(lambda x: x.find(val) >= 0, col))
if len(x) != 0:
# now rename col
df.rename(
columns={x[0]: str(default)}, inplace=True
)
return df
if not (
hasattr(df, "columns") and hasattr(df, "__array__")
):
emsg = (
"Expect dataframe containing coordinates longitude/latitude"
f" or easting/northing. Got {type(df).__name__!r}"
)
raise_exception = str(raise_exception).lower().strip()
if raise_exception == "true":
raise TypeError(emsg)
if raise_exception not in ("mute", "silence"):
warnings.warn(emsg, stacklevel=2)
return df
# check whether coordinates exists in the data columns
for name, tname in zip(
("lat", "lon", "east", "north"),
("latitude", "longitude", "easting", "northing"),
strict=False,
):
df = get_value_in(name, col=df.columns, default=tname)
# get the exist coodinates
coord_columns = []
for x, y in zip(
("longitude", "easting"),
("latitude", "northing"),
strict=False,
):
if x in df.columns and y in df.columns:
coord_columns.extend([x, y])
xy = (
df[coord_columns] if len(coord_columns) != 0 else None
)
if not as_frame and xy is not None:
# take the middle of the line and if both types of
# coordinates are supplied , take longitude and latitude
# and drop easting and northing
xy = tuple(np.nanmean(np.array(xy), axis=0))[:2]
xynames = tuple(coord_columns)[:2]
if drop_xy and len(coord_columns) != 0:
# modifie the data inplace
df.drop(columns=coord_columns, inplace=True)
if verbose:
print(
"###",
"No"
if len(xynames) == 0
else (tuple(xy.columns) if as_frame else xy),
"coordinates found.",
)
return xy, df, xynames
[docs]
@is_data_readable
@validate_params(
{
"data": ["array-like"],
"method": [
StrOptions({"abs", "absolute", "relative"}),
None,
],
}
)
@isdf
def batch_spatial_sampling(
data,
sample_size=0.1,
n_batches=10,
stratify_by=None,
spatial_bins=10,
spatial_cols=None,
method="abs",
min_relative_ratio=0.01,
random_state=42,
verbose=1,
):
"""
Create stratified spatial sample batches from a DataFrame.
Parameters
----------
data : pandas.DataFrame
Input DataFrame used for sampling.
sample_size : float or int, optional
Total sample size as a fraction or absolute count.
n_batches : int, optional
Number of batches to generate.
stratify_by : str or list of str or None, optional
Additional columns used for stratification.
spatial_bins : int or sequence of int, optional
Number of spatial bins used when discretizing coordinates.
spatial_cols : list or tuple of str or None, optional
Spatial coordinate columns.
method : {'abs', 'absolute', 'relative'}, optional
Strategy used to translate ``sample_size`` into per-batch sample
counts.
min_relative_ratio : float, optional
Minimum relative group size used by ``method='relative'``.
random_state : int, optional
Random seed for reproducibility.
verbose : int, optional
Verbosity level.
Returns
-------
list of pandas.DataFrame
Stratified batches sampled without overlap.
Notes
-----
Spatial coordinates are discretized with ``pandas.qcut`` and combined
with ``stratify_by`` columns so batches preserve the overall data
distribution as closely as possible.
"""
data = data.copy()
total_samples = sample_size
if isinstance(sample_size, float):
if not 0 < sample_size < 1:
raise ValueError(
"When sample_size is a float, it must be between 0 and 1."
)
total_samples = int(len(data) * sample_size)
elif isinstance(sample_size, int):
if sample_size <= 0:
raise ValueError("sample_size must be positive.")
else:
raise ValueError(
"sample_size must be a float or int."
)
if total_samples > len(data):
raise ValueError(
"sample_size is larger than the dataset."
)
if n_batches <= 0:
raise ValueError(
"n_batches must be a positive integer."
)
sample_size_per_batch = total_samples // n_batches
leftover = total_samples % n_batches
batches = []
remaining_data = data.copy()
sampled_indices = set()
rng = np.random.RandomState(random_state)
# Set default spatial columns if not specified
spatial_cols = columns_manager(spatial_cols)
if spatial_cols is None:
spatial_cols = []
if "longitude" in data.columns:
spatial_cols.append("longitude")
if "latitude" in data.columns:
spatial_cols.append("latitude")
if not spatial_cols:
raise ValueError(
"No spatial columns specified and "
"'longitude' and 'latitude' not found in data."
)
if len(spatial_cols) == 1:
warnings.warn(
f"Only one spatial column '{spatial_cols[0]}' found. "
"Using it for spatial stratification. "
"For more accurate sampling, providing both spatial "
"columns is recommended.",
UserWarning,
stacklevel=2,
)
else:
if not isinstance(spatial_cols, list | tuple):
raise ValueError(
"spatial_cols must be a list or tuple of column names."
)
if len(spatial_cols) > 2:
raise ValueError(
"spatial_cols can have at most two columns."
)
for col in spatial_cols:
if col not in data.columns:
raise ValueError(
f"Spatial column '{col}' not found in data."
)
if len(spatial_cols) == 1:
warnings.warn(
f"Only one spatial column '{spatial_cols[0]}' specified. "
"For more accurate sampling, providing two spatial columns "
"is recommended.",
UserWarning,
stacklevel=2,
)
# Validate spatial_bins
if isinstance(spatial_bins, int):
n_bins_list = [spatial_bins] * len(spatial_cols)
elif isinstance(spatial_bins, tuple | list):
if len(spatial_bins) != len(spatial_cols):
raise ValueError(
"Length of spatial_bins must match number of spatial_cols."
)
n_bins_list = list(spatial_bins)
else:
raise ValueError(
"spatial_bins must be int or tuple/list of int."
)
# Create spatial bins in the original data
for col, n_bins, axis in zip(
spatial_cols,
n_bins_list,
["x_bin", "y_bin"],
strict=False,
):
data[axis] = pd.qcut(
data[col], q=n_bins, duplicates="drop"
)
# Create combined stratification key in original data
# Create combined stratification key
if stratify_by is not None:
strat_columns = stratify_by + [
axis
for axis in ["x_bin", "y_bin"][
: len(spatial_cols)
]
]
if verbose and len(data) > 10_000:
print(
f"\nGenerating stratification keys for {len(data):,}"
" records..."
)
print(
" This may take some time. Please be patient..."
)
# Optimized vectorized concatenation
data["strat_key"] = (
data[strat_columns]
.astype(str)
.agg("_".join, axis=1)
)
if verbose and len(data) > 10_000:
print(
"Stratification keys generated"
f" successfully for {len(data):,} records."
)
else:
data["strat_key"] = (
"all_data" # Single group for random sampling
)
# Initialize remaining data
remaining_data = data.copy()
batches = []
# Set initial random state
rng = np.random.RandomState(random_state)
if verbose:
print(
f"\nCreating {n_batches} stratified batches with"
f" a total of {total_samples:,} samples..."
)
# TQDM progress bar
if verbose and HAS_TQDM:
progbar = tqdm(
range(n_batches),
total=n_batches,
ascii=True,
ncols=80,
desc="Batch Sampling Progress",
)
if method == "relative":
min_relative_ratio = assert_ratio(
min_relative_ratio,
bounds=(0, 1),
exclude_values=(0, 1),
name="`min_relative_ratio`",
)
for batch_idx in range(n_batches):
# Adjust sample size for batches if total_samples
# is not divisible by n_batches
if batch_idx < leftover:
batch_sample_size = sample_size_per_batch + 1
else:
batch_sample_size = sample_size_per_batch
# if batch_sample_size > len(remaining_data):
# batch_sample_size = len(remaining_data)
batch_sample_size = min(
batch_sample_size, len(remaining_data)
)
# Group remaining data by stratification key
grouped = remaining_data.groupby("strat_key")
# Calculate number of samples per group
group_sizes = grouped.size()
total_size = group_sizes.sum()
if method in ["abs", "absolute"]:
group_sample_sizes = (
(group_sizes / total_size * batch_sample_size)
.round()
.astype(int)
)
else: # "relative"
relative_scale = np.clip(
batch_sample_size / len(remaining_data),
min_relative_ratio,
1,
)
group_sample_sizes = (
(group_sizes * relative_scale)
.round()
.astype(int)
)
# Sample data from each group
sampled_indices = []
for strat_value, group in grouped:
n = group_sample_sizes.loc[strat_value]
if n > 0 and len(group) > 0:
sampled_group = group.sample(
n=min(n, len(group)),
random_state=rng.randint(0, 10000),
)
sampled_indices.extend(sampled_group.index)
# Create the sampled DataFrame
batch_sampled_data = remaining_data.loc[
sampled_indices
]
batches.append(
batch_sampled_data.drop(
columns=["strat_key"]
+ [
axis
for axis in ["x_bin", "y_bin"][
: len(spatial_cols)
]
]
)
)
# Remove sampled data from remaining_data
remaining_data = remaining_data.drop(
index=sampled_indices
)
if len(remaining_data) == 0:
break # No more data to sample
if verbose and HAS_TQDM:
progbar.update(1)
if verbose and HAS_TQDM:
progbar.close()
if verbose:
print(
f"\nBatch sampling completed. {len(batches)} batches created."
)
has_empty_batches = any([b.empty for b in batches])
if verbose and has_empty_batches:
warnings.warn(
"\nNo records were sampled. This is likely due to"
" insufficient data for the specified stratification"
f" columns {stratify_by}. To resolve this, consider:\n"
" • Using a different stratification method.\n"
" • Increasing the dataset size to include more"
" representative data.\n"
" • Adjusting the sample size to ensure sufficient"
" records per group.\n"
" • Or, setting `stratify_by=None` to perform"
" random sampling instead.",
stacklevel=2,
)
return batches
[docs]
@SaveFile
@is_data_readable
@validate_params(
{
"data": ["array-like"],
"method": [
StrOptions({"abs", "absolute", "relative"}),
None,
],
}
)
@isdf
def spatial_sampling(
data,
sample_size=0.01,
stratify_by=None,
spatial_bins=10,
spatial_cols=None,
method="abs",
min_relative_ratio=0.01,
random_state=42,
savefile=None,
verbose=1,
):
"""
Sample spatial data intelligently to represent the distribution
of the whole area and include different years.
This function performs stratified sampling on spatial data,
ensuring that the sample reflects both spatial distribution
and temporal aspects of the entire dataset. It combines spatial
stratification based on coordinates and additional stratification
columns specified by the user.
Parameters
----------
data : pandas.DataFrame
The input DataFrame to sample from. Must contain spatial
coordinate columns (e.g., `'longitude'`, `'latitude'`) and
any columns specified in ``stratify_by``.
sample_size : float or int, optional
The proportion or absolute number of samples to select.
If float, should be between 0.0 and 1.0 and represents the
fraction of the dataset to include in the sample.
If int, represents the absolute number of samples to select.
Default is ``0.01`` (1% of the data).
stratify_by : list of str, optional
List of column names to stratify by.
spatial_bins : int or tuple/list of int, optional
Number of bins to divide the spatial coordinates into.
If an integer, the same number of bins is used for all spatial
dimensions. If a tuple or list, its length must match the number
of spatial columns, specifying the number of bins for each spatial
dimension. Default is ``10``.
spatial_cols : list or tuple of str, optional
List of spatial coordinate column names. Can accept one or two
columns. If ``None``, the function checks for columns named
`'longitude'` and/or `'latitude'` in ``data``. If only one spatial
column is provided or found, a warning is issued, suggesting that
providing both spatial columns is recommended for more accurate
sampling. If more than two columns are provided, an error is raised.
method : str, {'abs', 'relative'}, default='abs'
Defines how the sample size is determined. ``'abs'`` or
``'absolute'`` uses a fixed sampling proportion based on
``sample_size``. ``'relative'`` scales sampling by dataset
stratification so small groups still receive a proportional
sample controlled by ``min_relative_ratio``.
min_relative_ratio : float, default=0.01
Controls the minimum allowable fraction of records that must
be sampled when ``method='relative'``. It must be between
``0`` and ``1``. For example, ``min_relative_ratio=0.05``
requests at least 5 percent of the total dataset size from
each stratification group when possible; if a group is smaller
than that minimum, the entire subset is sampled instead.
random_state : int, optional
Random seed for reproducibility. Default is ``42``.
verbose : int, default=1
Controls progress-bar and status output during execution.
Larger values produce more detailed messages.
Returns
-------
sampled_data : pandas.DataFrame
A sampled DataFrame representing the distribution of the whole
area and including different years.
Notes
-----
The function performs stratified sampling based on spatial bins
and other specified stratification columns. Spatial coordinates
are binned using quantile-based discretization (:func:`pandas.qcut`),
ensuring each bin has approximately the same number of observations.
Let :math:`N` be the total number of samples in ``data``, and
:math:`n` be the desired sample size. The function calculates the
number of samples to draw from each stratification group based on
the proportion of the group size to the total dataset size:
.. math::
n_i = \\left\\lceil \\frac{N_i}{N} \\times n \\right\\rceil
where :math:`N_i` is the size of group :math:`i`, and :math:`n_i`
is the number of samples to draw from group :math:`i`.
The function ensures that all specified spatial and stratification
columns exist in ``data``, that the number of spatial bins matches
the number of spatial columns, and that the sample size is valid.
A warning is issued when only one spatial column is used because
two spatial columns usually give more reliable spatial sampling.
Examples
--------
>>> from geoprior.utils.spatial_utils import spatial_sampling
>>> import pandas as pd
>>> # Assume 'df' is a pandas DataFrame with columns
>>> # 'longitude', 'latitude', 'year', and other data.
>>> sampled_df = spatial_sampling(
... data=df,
... sample_size=0.05,
... stratify_by=['year', 'geological_category'],
... spatial_bins=(10, 15),
... spatial_cols=['longitude', 'latitude'],
... random_state=42
... )
>>> print(sampled_df.shape)
See Also
--------
pandas.qcut : Quantile-based discretization function used for binning.
sklearn.model_selection.StratifiedShuffleSplit : For stratified sampling.
batch_spatial_sampling: Resample spatial data with batching.
"""
data = data.copy()
# Set default spatial columns if not specified
spatial_cols = columns_manager(spatial_cols)
if spatial_cols is None:
spatial_cols = []
if "longitude" in data.columns:
spatial_cols.append("longitude")
if "latitude" in data.columns:
spatial_cols.append("latitude")
if not spatial_cols:
raise ValueError(
"No spatial columns specified and "
"'longitude' and 'latitude' not found in data."
)
if len(spatial_cols) == 1:
warnings.warn(
f"Only one spatial column '{spatial_cols[0]}' found. "
"Using it for spatial stratification. "
"For more accurate sampling, providing both spatial "
"columns is recommended.",
UserWarning,
stacklevel=2,
)
else:
if not isinstance(spatial_cols, list | tuple):
raise ValueError(
"spatial_cols must be a list or tuple of column names."
)
if len(spatial_cols) > 2:
raise ValueError(
"spatial_cols can have at most two columns."
)
for col in spatial_cols:
if col not in data.columns:
raise ValueError(
f"Spatial column '{col}' not found in data."
)
if len(spatial_cols) == 1:
warnings.warn(
f"Only one spatial column '{spatial_cols[0]}' specified. "
"For more accurate sampling, providing two spatial columns "
"is recommended.",
UserWarning,
stacklevel=2,
)
# Validate spatial_bins
if isinstance(spatial_bins, int):
n_bins = validate_positive_integer(
spatial_bins, "spatial_bins"
)
n_bins_list = [n_bins] * len(spatial_cols)
elif isinstance(spatial_bins, tuple | list):
if len(spatial_bins) != len(spatial_cols):
raise ValueError(
"Length of spatial_bins must match number of spatial_cols."
)
n_bins_list = [
validate_positive_integer(n, "spatial_bins")
for n in spatial_bins
]
else:
raise ValueError(
"spatial_bins must be int or tuple/list of int."
)
# if verbose and HAS_TQDM are True.
if verbose and HAS_TQDM:
progbar = tqdm(
zip(
spatial_cols,
n_bins_list,
["x_bin", "y_bin"],
strict=False,
),
total=len(spatial_cols),
ascii=True,
ncols=77,
desc=f"{'Creating spat. bins: ' + str(len(spatial_cols)):<20}",
)
# Create spatial bins
for col, n_bins, axis in zip(
spatial_cols,
n_bins_list,
["x_bin", "y_bin"],
strict=False,
):
data[axis] = pd.qcut(
data[col], q=n_bins, duplicates="drop"
)
if verbose and HAS_TQDM:
progbar.update(1)
if verbose and HAS_TQDM:
progbar.close()
stratify_by = columns_manager(
stratify_by, empty_as_none=False
)
# Create combined stratification key
strat_columns = stratify_by + [
axis
for axis in ["x_bin", "y_bin"][: len(spatial_cols)]
]
if verbose and len(data) > 10_000:
print("\nGenerating stratification keys...")
print(
f"This may take some time for {len(data):,}"
" records. Please be patient..."
)
# data['strat_key'] = data[strat_columns].apply(
# lambda row: '_'.join(row.values.astype(str)),
# axis=1
# )
# Using .agg and .astype(str) for vectorized string concatenation
data["strat_key"] = (
data[strat_columns].astype(str).agg("_".join, axis=1)
)
# Verbose message when done
if verbose and len(data) > 10_000:
print(
"Stratification keys generated successfully"
f" for {len(data):,} records."
)
# Determine total number of samples
if isinstance(sample_size, float):
if not 0 < sample_size < 1:
raise ValueError(
"When sample_size is a float, it must be between 0 and 1."
)
n_samples = int(len(data) * sample_size)
elif isinstance(sample_size, int):
n_samples = validate_positive_integer(
sample_size, "sample_size"
)
else:
raise ValueError(
"sample_size must be a positive float or int."
)
# Group data by stratification key
grouped = data.groupby("strat_key")
if verbose:
print(
f"Data grouped into {len(grouped):,} stratified bins."
)
# Apply stratification if stratify_by is provided
if stratify_by is not None:
# Calculate number of samples per group
group_sizes = grouped.size()
total_size = group_sizes.sum()
if method in ["abs", "absolute"]:
group_sample_sizes = (
(group_sizes / total_size * n_samples)
.round()
.astype(int)
)
else: # "relative"
min_relative_ratio = assert_ratio(
min_relative_ratio,
bounds=(0, 1),
exclude_values=[0, 1],
name="`min_relative_ratio`",
)
relative_scale = np.clip(
n_samples / len(data), min_relative_ratio, 1
)
group_sample_sizes = (
(group_sizes * relative_scale)
.round()
.astype(int)
)
# Sample data from each group
sampled_indices = []
np.random.seed(random_state)
# Use tqdm to wrap the grouped iterator
# if verbose and HAS_TQM are True.
if verbose and HAS_TQDM:
progbar = tqdm(
grouped,
total=len(grouped),
ascii=True,
ncols=77,
desc=f"Sampling {n_samples:,} records",
)
for strat_value, group in grouped:
n = group_sample_sizes.loc[strat_value]
if n > 0 and len(group) > 0:
sampled_group = group.sample(
n=min(n, len(group)),
random_state=np.random.randint(0, 10_000),
)
sampled_indices.extend(sampled_group.index)
if verbose and HAS_TQDM:
progbar.update(1)
if verbose and HAS_TQDM:
progbar.close()
else:
sampled_indices = np.random.choice(
data.index, size=n_samples, replace=False
)
# Create the sampled DataFrame
sampled_data = data.loc[sampled_indices]
# Drop helper columns
cols_to_drop = ["strat_key"] + [
axis
for axis in ["x_bin", "y_bin"][: len(spatial_cols)]
]
sampled_data = sampled_data.drop(columns=cols_to_drop)
if verbose:
print(
f"\nSampling completed: {len(sampled_indices):,}"
" records selected."
)
if verbose and sampled_data.empty:
warnings.warn(
"\nNo records were sampled. This is likely due to"
" insufficient data for the specified stratification"
f" columns {stratify_by}. To resolve this, consider:\n"
" • Using a different stratification method.\n"
" • Increasing the dataset size to include more"
" representative data.\n"
" • Adjusting the sample size to ensure sufficient"
" records per group.\n"
" • Or, setting `stratify_by=None` to perform"
" random sampling instead.",
stacklevel=2,
)
return sampled_data.reset_index(drop=True)