# SPDX-License-Identifier: Apache-2.0
# GeoPrior-v3 - https://github.com/earthai-tech/geoprior-v3
# Copyright (c) 2026-present
# Author: LKouadio <https://lkouadio.com>
r"""Parallel execution helpers for GeoPrior workflows."""
from __future__ import annotations
import os
import subprocess
[docs]
def resolve_n_jobs(n_jobs: int) -> int:
try:
n = int(n_jobs)
except Exception:
return 1
if n == 0 or n == 1:
return 1
if n < 0:
return int(os.cpu_count() or 1)
return max(1, n)
[docs]
def threads_per_job(
*,
n_jobs: int,
threads: int = 0,
reserve: int = 1,
) -> int:
if int(threads) > 0:
return int(threads)
cpu = int(os.cpu_count() or 1)
cpu = max(1, cpu - int(reserve))
return max(1, cpu // max(1, int(n_jobs)))
[docs]
def apply_thread_env(
env: dict[str, str],
*,
n_jobs: int,
threads: int = 0,
reserve: int = 1,
) -> dict[str, str]:
out = dict(env)
t = threads_per_job(
n_jobs=n_jobs,
threads=threads,
reserve=reserve,
)
inter = max(1, min(4, t // 2))
out["OMP_NUM_THREADS"] = str(t)
out["MKL_NUM_THREADS"] = str(t)
out["NUMEXPR_NUM_THREADS"] = str(t)
out["TF_NUM_INTRAOP_THREADS"] = str(t)
out["TF_NUM_INTEROP_THREADS"] = str(inter)
return out
[docs]
def apply_tf_threading(
*,
intra: int,
inter: int,
) -> None:
import tensorflow as tf
tf.config.threading.set_intra_op_parallelism_threads(
int(intra)
)
tf.config.threading.set_inter_op_parallelism_threads(
int(inter)
)
def _split_ids(s: str) -> list[str]:
s = str(s).strip()
if not s:
return []
out: list[str] = []
for part in s.split(","):
p = part.strip()
if p:
out.append(p)
return out
[docs]
def detect_gpu_ids(
*,
env: dict[str, str] | None = None,
) -> list[str]:
e = env or os.environ
# Respect an existing restriction first
cvd = e.get("CUDA_VISIBLE_DEVICES", None)
if cvd is not None:
ids = _split_ids(cvd)
return ids
# Try nvidia-smi (fast, no TF import)
try:
r = subprocess.run(
["nvidia-smi", "-L"],
capture_output=True,
text=True,
check=False,
)
if r.returncode == 0 and r.stdout.strip():
lines = r.stdout.splitlines()
return [str(i) for i in range(len(lines))]
except Exception:
pass
# Fall back to TF if available
try:
import tensorflow as tf
g = tf.config.list_physical_devices("GPU")
return [str(i) for i in range(len(g))]
except Exception:
return []
[docs]
def resolve_device(
device: str,
*,
env: dict[str, str] | None = None,
) -> str:
d = str(device).strip().lower()
if d in {"cpu"}:
return "cpu"
if d in {"gpu"}:
return "gpu"
# auto
return "gpu" if detect_gpu_ids(env=env) else "cpu"
[docs]
def resolve_gpu_ids(
gpu_ids: list[str] | None,
*,
env: dict[str, str] | None = None,
) -> list[str]:
if gpu_ids:
return [
str(x).strip() for x in gpu_ids if str(x).strip()
]
return detect_gpu_ids(env=env)
[docs]
def pick_gpu_id(
idx: int,
gpu_ids: list[str],
) -> str | None:
if not gpu_ids:
return None
return gpu_ids[int(idx) % len(gpu_ids)]
[docs]
def apply_gpu_env(
env: dict[str, str],
*,
gpu_id: str | None,
allow_growth: bool = True,
) -> dict[str, str]:
out = dict(env)
if gpu_id is None:
out["CUDA_VISIBLE_DEVICES"] = ""
return out
out["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
out["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
if allow_growth:
out["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"
return out