Source code for geoprior.cli.build_add_zsurf_from_coords

# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 — https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>

"""Build harmonized datasets enriched with surface elevation.

This command merges a main tabular dataset with a coordinate-to-
elevation lookup on rounded longitude/latitude pairs, then optionally
computes hydraulic head when a depth-below-ground-surface column is
available.
"""

from __future__ import annotations

import argparse
import json
from dataclasses import dataclass
from pathlib import Path

import pandas as pd

from .config import (
    add_city_arg,
    add_config_args,
    add_outdir_arg,
    bootstrap_runtime_config,
    ensure_outdir,
)


[docs] @dataclass(frozen=True) class CityPaths: """Resolved input paths for one city.""" city: str main_csv: Path elev_csv: Path
def _pair_items_to_map( items: list[str] | None, *, label: str, ) -> dict[str, Path]: """Parse repeated CITY=PATH items into a mapping.""" out: dict[str, Path] = {} for item in items or []: if "=" not in item: raise SystemExit( f"Each {label} must be CITY=PATH. Got: {item!r}" ) city, raw = item.split("=", 1) city = city.strip() if not city: raise SystemExit( f"Invalid empty city in {label}: {item!r}" ) out[city] = Path(raw).expanduser().resolve() return out def _append_suffix(path: Path, suffix: str) -> Path: """Append a suffix before the final extension.""" if ( suffix.endswith(".csv") and path.suffix.lower() == ".csv" ): stem = path.name[: -len(path.suffix)] return path.with_name(f"{stem}{suffix}") return path.with_name(f"{path.name}{suffix}") def _round_coords( df: pd.DataFrame, *, lon_col: str, lat_col: str, decimals: int, ) -> pd.DataFrame: """Return a frame with rounded numeric coordinates.""" out = df.copy() out[lon_col] = pd.to_numeric( out[lon_col], errors="coerce", ).round(decimals) out[lat_col] = pd.to_numeric( out[lat_col], errors="coerce", ).round(decimals) return out def _resolve_city_paths( args: argparse.Namespace, cfg: dict[str, object], ) -> list[CityPaths]: """Resolve per-city main/elevation input paths.""" main_map = _pair_items_to_map( args.main_csvs, label="--main-csv", ) elev_map = _pair_items_to_map( args.elev_csvs, label="--elev-csv", ) cities = list(args.cities or []) if not cities: shared = sorted(set(main_map) & set(elev_map)) if shared: cities = shared if not cities: cfg_city = cfg.get("CITY_NAME") if isinstance(cfg_city, str) and cfg_city.strip(): cities = [cfg_city.strip()] if not cities: raise SystemExit( "No city was resolved. Provide --city, or matched " "CITY=PATH pairs via --main-csv and --elev-csv." ) data_root = None coords_root = None if args.data_root: data_root = ( Path(args.data_root).expanduser().resolve() ) elif isinstance(cfg.get("DATA_ROOT"), str): data_root = ( Path(str(cfg["DATA_ROOT"])).expanduser().resolve() ) if args.coords_root: coords_root = ( Path(args.coords_root).expanduser().resolve() ) elif isinstance(cfg.get("COORDS_ROOT"), str): coords_root = ( Path(str(cfg["COORDS_ROOT"])) .expanduser() .resolve() ) out: list[CityPaths] = [] for city in cities: city = str(city).strip() if not city: continue main_csv = main_map.get(city) if main_csv is None: if data_root is None: raise SystemExit( f"Missing main CSV for {city!r}. Provide " "--main-csv city=PATH or --data-root." ) main_csv = data_root / args.main_pattern.format( city=city, ) elev_csv = elev_map.get(city) if elev_csv is None: if coords_root is None: raise SystemExit( f"Missing elevation CSV for {city!r}. Provide " "--elev-csv city=PATH or --coords-root." ) elev_csv = coords_root / args.elev_pattern.format( city=city, ) out.append( CityPaths( city=city, main_csv=main_csv, elev_csv=elev_csv, ) ) return out def _reduce_elevation( elev: pd.DataFrame, *, lon_col: str, lat_col: str, elev_col: str, reducer: str, ) -> pd.DataFrame: """Deduplicate coordinate-elevation rows.""" grouped = elev.groupby( [lon_col, lat_col], as_index=False, )[elev_col] if reducer == "mean": return grouped.mean() if reducer == "median": return grouped.median() if reducer == "first": return grouped.first() raise ValueError( "Unsupported reducer. Expected one of: " "mean, median, first." ) def _pick_depth_col( merged: pd.DataFrame, args: argparse.Namespace, ) -> str | None: """Choose the depth-bgs column used to compute head.""" if args.depth_col: for col in args.depth_col: if col in merged.columns: return col return None for col in ( "GWL_depth_bgs", "GWL_depth_bgs_m", "gwl_depth_bgs", ): if col in merged.columns: return col return None
[docs] def enrich_city_dataset( paths: CityPaths, *, lon_col: str, lat_col: str, elev_col: str, zsurf_col: str, round_decimals: int, reducer: str, compute_head: bool, head_col: str, args: argparse.Namespace, ) -> tuple[pd.DataFrame, dict[str, object]]: """Merge one city dataset with its surface elevation lookup.""" if not paths.main_csv.exists(): raise FileNotFoundError( f"Main dataset not found: {paths.main_csv}" ) if not paths.elev_csv.exists(): raise FileNotFoundError( f"Elevation dataset not found: {paths.elev_csv}" ) main = pd.read_csv(paths.main_csv) elev = pd.read_csv(paths.elev_csv) for frame, label in ((main, "main"), (elev, "elev")): missing = {lon_col, lat_col} - set(frame.columns) if missing: raise ValueError( f"{label} is missing coordinate columns: " f"{sorted(missing)}" ) if elev_col not in elev.columns: raise ValueError( f"Elevation file is missing {elev_col!r}." ) main_r = _round_coords( main, lon_col=lon_col, lat_col=lat_col, decimals=round_decimals, ) elev_r = _round_coords( elev, lon_col=lon_col, lat_col=lat_col, decimals=round_decimals, ) elev_r = _reduce_elevation( elev_r, lon_col=lon_col, lat_col=lat_col, elev_col=elev_col, reducer=reducer, ) merged = main_r.merge( elev_r, on=[lon_col, lat_col], how="left", validate="m:1", ) merged = merged.rename(columns={elev_col: zsurf_col}) depth_col = None if compute_head: depth_col = _pick_depth_col(merged, args) if depth_col is not None: merged[head_col] = pd.to_numeric( merged[zsurf_col], errors="coerce", ) - pd.to_numeric( merged[depth_col], errors="coerce", ) miss_rate = float(merged[zsurf_col].isna().mean()) n_miss = int(merged[zsurf_col].isna().sum()) n_rows = int(len(merged)) diag = { "city": paths.city, "main_csv": str(paths.main_csv), "elev_csv": str(paths.elev_csv), "rows": n_rows, "missing_zsurf": n_miss, "missing_rate": miss_rate, "computed_head": depth_col is not None, "depth_col": depth_col, "round_decimals": round_decimals, "reducer": reducer, } return merged, diag
def _default_output_path( main_csv: Path, *, outdir: Path | None, out_suffix: str, ) -> Path: """Return the default output path for one city.""" base = outdir or main_csv.parent target = base / main_csv.name return _append_suffix(target, out_suffix)
[docs] def build_add_zsurf_main( argv: list[str] | None = None, ) -> None: """CLI entry point for z_surf enrichment.""" ap = argparse.ArgumentParser( prog="add-zsurf-from-coords", description=( "Merge coordinate elevation into harmonized main " "datasets and optionally compute hydraulic head." ), ) add_config_args(ap) add_city_arg( ap, required=False, action="append", dest="cities", help=( "City name. Repeat to process multiple cities. " "When omitted, the command tries config CITY_NAME " "or the shared keys from --main-csv and --elev-csv." ), ) ap.add_argument( "--data-root", "--data-dir", dest="data_root", help=( "Root directory used to resolve main dataset files " "through --main-pattern." ), ) ap.add_argument( "--coords-root", dest="coords_root", help=( "Root directory used to resolve elevation lookup files " "through --elev-pattern." ), ) ap.add_argument( "--main-pattern", default="{city}_final_main_std.harmonized.csv", help=( "Filename pattern under --data-root. Use {city} as " "the placeholder." ), ) ap.add_argument( "--elev-pattern", default="{city}_coords_with_elevation.csv", help=( "Filename pattern under --coords-root. Use {city} as " "the placeholder." ), ) ap.add_argument( "--main-csv", dest="main_csvs", action="append", help=( "Explicit CITY=PATH mapping for a main dataset CSV. " "Repeat as needed." ), ) ap.add_argument( "--elev-csv", dest="elev_csvs", action="append", help=( "Explicit CITY=PATH mapping for an elevation lookup " "CSV. Repeat as needed." ), ) add_outdir_arg( ap, required=False, help=( "Optional output directory. When omitted, each output " "is written next to its main dataset." ), ) ap.add_argument( "--out-suffix", default=".with_zsurf.csv", help=( "Suffix appended to each input main CSV name when " "building output files." ), ) ap.add_argument( "--summary-json", help=( "Optional JSON path for merge diagnostics across all " "processed cities." ), ) ap.add_argument( "--longitude-col", "--lon-col", dest="lon_col", default="longitude", help="Longitude column name shared by both tables.", ) ap.add_argument( "--latitude-col", "--lat-col", dest="lat_col", default="latitude", help="Latitude column name shared by both tables.", ) ap.add_argument( "--elevation-col", dest="elev_col", default="elevation", help="Elevation column name in the lookup CSV.", ) ap.add_argument( "--zsurf-col", default="z_surf", help="Output column name for surface elevation.", ) ap.add_argument( "--head-col", default="head_m", help="Output hydraulic head column name.", ) ap.add_argument( "--depth-col", action="append", help=( "Candidate depth-below-ground-surface column. Repeat " "to provide multiple fallback names." ), ) ap.add_argument( "--round-decimals", type=int, default=6, help="Coordinate rounding decimals before merging.", ) ap.add_argument( "--reducer", choices=("mean", "median", "first"), default="mean", help="Reducer used for duplicate lookup coordinates.", ) ap.add_argument( "--no-head", action="store_true", help="Disable hydraulic-head computation.", ) args = ap.parse_args(argv) cfg = bootstrap_runtime_config(args, field_map={}) city_paths = _resolve_city_paths(args, cfg) outdir = None if args.outdir: outdir = ensure_outdir(args.outdir) diagnostics: list[dict[str, object]] = [] written: list[str] = [] for paths in city_paths: merged, diag = enrich_city_dataset( paths, lon_col=args.lon_col, lat_col=args.lat_col, elev_col=args.elev_col, zsurf_col=args.zsurf_col, round_decimals=args.round_decimals, reducer=args.reducer, compute_head=not args.no_head, head_col=args.head_col, args=args, ) out_path = _default_output_path( paths.main_csv, outdir=outdir, out_suffix=args.out_suffix, ) out_path.parent.mkdir(parents=True, exist_ok=True) merged.to_csv(out_path, index=False) diag["output_csv"] = str(out_path) diagnostics.append(diag) written.append(str(out_path)) print(f"[{paths.city}] rows: {diag['rows']}") print( f"[{paths.city}] missing {args.zsurf_col}: " f"{diag['missing_zsurf']} " f"({diag['missing_rate']:.2%})" ) if diag["computed_head"]: print( f"[{paths.city}] computed {args.head_col} " f"from {diag['depth_col']}" ) else: print(f"[{paths.city}] head not computed") print(f"[{paths.city}] saved: {out_path}") if args.summary_json: summary_path = ( Path(args.summary_json).expanduser().resolve() ) summary_path.parent.mkdir(parents=True, exist_ok=True) summary_path.write_text( json.dumps(diagnostics, indent=2), encoding="utf-8", ) print(f"[OK] wrote summary: {summary_path}") print("[OK] completed z_surf enrichment for:") for item in written: print(f" - {item}")
[docs] def main(argv: list[str] | None = None) -> None: """Command alias entry point.""" build_add_zsurf_main(argv)
if __name__ == "__main__": main()