# License: Apache-2.0
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
"""
System utilities module for managing system-level operations.
This module provides utilities essential for system-level tasks such as
color management, regular expression searching, and projection validation,
along with other miscellaneous system operations.
"""
import functools
import gc
import importlib.util
import inspect
import itertools
import logging
import multiprocessing
import os
import pickle
import platform
import re
import shutil
import socket
import subprocess
import tempfile
import time
from collections.abc import Callable, Sequence
from concurrent.futures import (
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
)
from functools import partial
from pathlib import Path
from typing import (
Any,
)
import numpy as np
import pandas as pd
from ..api.util import get_table_size
from ..backends.selector import check_processor
from ..core.checks import is_iterable
from ..logging import get_logger
from .deps_utils import (
ensure_pkgs,
import_optional_dependency,
is_module_installed,
)
try:
import psutil
except:
pass
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
try:
import pyarrow # noqa
_HAVE_PYARROW = True
except ImportError:
_HAVE_PYARROW = False
TW = get_table_size()
logger = get_logger(__name__)
__all__ = [
"BatchDataFrameBuilder",
"WorkflowOptimizer",
"check_port_in_use",
"clean_temp_files",
"create_temp_dir",
"create_temp_file",
"environment_summary",
"find_by_regex",
"find_similar_string",
"get_cpu_usage",
"get_disk_usage",
"get_gpu_info",
"get_installed_packages",
"get_memory_usage",
"get_python_version",
"get_system_info",
"get_uptime",
"is_gpu_available",
"is_package_installed",
"is_path_accessible",
"is_port_open",
"manage_env_variable",
"manage_file_lock",
"manage_temp",
"parallelize_jobs",
"represent_callable",
"run_command",
"safe_getattr",
"safe_optimize",
"system_uptime",
"build_large_df",
]
try:
import cudf
except ImportError:
cudf = None
[docs]
class BatchDataFrameBuilder:
r"""
Manages incremental construction of a large DataFrame in
controlled-size chunks. This can reduce peak memory usage
and allow GPU-accelerated libraries (e.g., ``cudf``) if
they are available and desired.
The approach can be expressed mathematically as a chunking
process that partitions an incoming stream of :math:`N`
row-dictionaries into :math:`k` subsets of size
:math:`m\ (\text{<=}\ \text{chunk\_size})`:
.. math::
k = \left\lceil \frac{N}{m} \right\rceil
Each subset is converted into a DataFrame, stored, and
released from memory, and then concatenated at
finalization time.
Parameters
----------
chunk_size : int, optional
The maximum number of rows to hold in the internal
buffer before converting them into a DataFrame
chunk. Default is 100000.
processor : {'auto', 'cpu', 'gpu'}, optional
Controls the engine used to build the DataFrame:
- ``'cpu'`` : Always use pandas.
- ``'gpu'`` : Attempt to use cudf (raise an error if
not available).
- ``'auto'`` : Use cudf if a GPU is detected and cudf
is installed; otherwise fallback to pandas.
verbose : int, optional
Verbosity level. Default is 1:
- 0 : Silent.
- 1 : Basic information.
- 2 : Debug / detailed printing.
Notes
-----
This object is intended for situations where the total row
count can be very large, potentially in the millions. By
breaking data into chunks, you can avoid excessive memory
usage and keep the system more responsive. If `processor`
is ``'auto'`` or ``'gpu'``, the module calls
``check_processor`` to verify GPU availability, then uses
cudf if appropriate.
.. note::
If the total data is larger than your available memory
(whether RAM or GPU), consider writing out each chunk
to disk as a partitioned file (e.g., Parquet or
Feather) instead of storing them all in memory.
Examples
--------
>>> from geoprior.utils.sys_utils import BatchDataFrameBuilder
>>> # Suppose we have a large list of dictionaries
>>> data = [
... {'colA': i, 'colB': i**2} for i in range(10**6)
... ]
>>> with BatchDataFrameBuilder(chunk_size=50000,
... processor='auto',
... verbose=2) as builder:
... builder.add_rows(data)
...
>>> # After exiting the context, the final DataFrame is
>>> # automatically built and stored in builder.final_df
>>> final_df = builder.final_df
>>> print(final_df.shape)
(1000000, 2)
See Also
--------
pandas.DataFrame : Core pandas DataFrame object.
cudf.DataFrame : GPU DataFrame object from RAPIDS.
check_processor : Utility for detecting GPU availability.
"""
[docs]
def __init__(
self, chunk_size=100_000, processor="auto", verbose=1
):
"""
Initializes the builder, setting up chunk size,
processor preference, and verbosity. Checks for GPU
availability if requested.
"""
self.chunk_size = chunk_size
self.processor = processor
self.verbose = verbose
# This list will collect row dictionaries until we
# reach the chunk_size.
self._rows = []
# We'll store chunked DataFrames here for concatenation.
self._dfs = []
# Decide whether to use GPU or CPU by calling
# check_processor and verifying cudf if needed.
self.gpu_enabled = False
self._initialize_processor()
# Will hold the final DataFrame once we exit context.
self.final_df = None
[docs]
def __enter__(self):
"""
Enters the context manager. Returns `self` so we can
use it in a with-statement scope.
"""
return self
[docs]
def add_row(self, row: dict):
r"""
Adds a single row to the internal buffer.
This method appends the given dictionary `row` to the
in-memory buffer. If the buffer reaches
`self.chunk_size`, it is automatically flushed.
Parameters
----------
row : dict
A row in dictionary form, where keys correspond
to column names and values represent the row data.
Notes
-----
Internally calls :meth:`_flush` once the buffer has
reached its maximum size.
"""
self._rows.append(row)
if len(self._rows) >= self.chunk_size:
self._flush()
[docs]
def add_rows(self, rows: list):
r"""
Adds multiple rows to the internal buffer.
This method iterates over the list of dictionaries
`rows`. For each element, :meth:`add_row` is called,
which may trigger a flush if the buffer is full.
Parameters
----------
rows : list of dict
Each dictionary should have the same structure as
a typical row in the final DataFrame.
Notes
-----
This method is merely a convenience layer over
:meth:`add_row`.
"""
for row in rows:
self.add_row(row)
[docs]
def finalize(self):
r"""
Flushes remaining rows and concatenates all chunks.
Once the remaining rows in `_rows` are processed into
a chunk, this method concatenates all stored chunk
DataFrames (either pandas or cudf) into one final
DataFrame. The resulting DataFrame is returned.
Returns
-------
DataFrame
The final DataFrame, which may be a pandas
DataFrame or a cudf DataFrame (if `processor` is
set to allow GPU usage and cudf is available).
Notes
-----
After concatenation, all chunk DataFrames are cleared
from memory. This method is called automatically upon
exiting the context (i.e., in :meth:`__exit__`).
"""
self._flush()
if not self._dfs:
if self.gpu_enabled:
return cudf.DataFrame()
return pd.DataFrame()
if self.verbose > 0:
print(
f"[BatchDataFrameBuilder] Concatenating "
f"{len(self._dfs)} chunk(s)."
)
if self.gpu_enabled:
final_df = cudf.concat(
self._dfs, ignore_index=True
)
else:
final_df = pd.concat(self._dfs, ignore_index=True)
self._dfs.clear()
return final_df
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exits the context manager. Automatically finalizes
the DataFrame by calling :meth:`finalize`, storing
the result in `self.final_df`.
"""
self.final_df = self.finalize()
def _initialize_processor(self):
"""
Internal helper to set up GPU or CPU usage
according to `self.processor`. Uses the
`check_processor` function and verifies the
presence of cudf if GPU is requested.
"""
# from typing import Literal
# from subprocess import PIPE, CalledProcessError, run
# from pathlib import Path
if self.processor == "cpu":
if self.verbose > 0:
print(
"[BatchDataFrameBuilder] Using CPU (pandas)."
)
return
if self.processor == "gpu":
try:
gpu_ok = check_processor(
authorized="gpu",
error="raise",
verbose=self.verbose,
)
if not cudf:
raise ImportError(
"cudf is not installed, but 'gpu' "
"processor was requested."
)
if gpu_ok:
self.gpu_enabled = True
if self.verbose > 0:
print(
"[BatchDataFrameBuilder] "
"Using GPU (cudf)."
)
except (RuntimeError, ImportError) as e:
raise RuntimeError(
f"GPU required but not available: {e}"
)
return
# 'auto'
gpu_ok = False
try:
gpu_ok = check_processor(
authorized="auto",
error="warn",
verbose=self.verbose,
)
except Exception as e:
if self.verbose > 0:
print(
"[BatchDataFrameBuilder] GPU check failed,"
f" defaulting to CPU: {e}"
)
if gpu_ok and cudf:
self.gpu_enabled = True
if self.verbose > 0:
print(
"[BatchDataFrameBuilder] GPU detected,"
" using cudf."
)
else:
if self.verbose > 0:
print(
"[BatchDataFrameBuilder] No GPU or cudf "
"not installed, using CPU (pandas)."
)
def _flush(self):
"""
Converts the current buffer to a DataFrame and stores
it, then resets the buffer. This is triggered
automatically by :meth:`add_row` or :meth:`finalize`.
"""
if not self._rows:
return
if self.verbose > 1:
print(
f"[BatchDataFrameBuilder] Flushing "
f"{len(self._rows)} rows to DataFrame."
)
if self.gpu_enabled:
chunk_df = cudf.DataFrame(self._rows)
else:
chunk_df = pd.DataFrame(self._rows)
self._dfs.append(chunk_df)
self._rows = []
[docs]
class WorkflowOptimizer:
"""
WorkflowOptimizer is a decorator class designed to optimize the execution of
computationally intensive functions by enabling parallelization, managing CPU
and memory resources, and performing cleanup tasks. It provides flexibility
through various parameters that allow users to customize optimization
strategies according to their workflow requirements.
.. math::
T_{\text{total}} = T_{\text{start}} + T_{\text{execution}} +
T_{\text{cleanup}}
Here, :math:`T_{\text{total}}` is the total workflow time,
:math:`T_{\text{start}}` is the initialization time,
:math:`T_{\text{execution}}` is the main execution time, and
:math:`T_{\text{cleanup}}` is the cleanup time.
Parameters
----------
parallelize : bool, optional
Flag to enable or disable parallel processing. If set to ``True``, the
decorator will attempt to parallelize the execution of the decorated
function using multiprocessing. Default is ``True``.
memory_cleanup : bool, optional
Whether to clean up system memory after the execution of the decorated
function. This includes triggering garbage collection and clearing GPU
caches if applicable. Default is ``False``.
log_level : int, optional
Level of logging verbosity. Accepts standard logging levels such as
``logging.INFO``, ``logging.DEBUG``, etc. Default is ``logging.INFO``.
optimize_cpu : bool, optional
Whether to optimize CPU usage by setting CPU affinity to restrict the
process to specific CPU cores. If ``True``, the decorator will bind the
process to the cores specified in ``cpu_cores``. Default is ``True``.
num_processes : int, optional
The number of parallel processes to use when ``parallelize`` is enabled.
If not specified, it defaults to the minimum of the number of available
CPU cores and the length of the ``data`` iterable passed to the function.
Default is ``None``.
cpu_cores : list or None, optional
A list of specific CPU cores to bind the process to for optimized CPU
usage. If ``None``, the process is allowed to run on all available CPU
cores. Example: ``[0, 1, 2, 3]``. Default is ``None``.
verbose : bool, optional
Whether to print detailed logs during execution. If set to ``False``,
only essential information will be logged based on the ``log_level``.
Default is ``True``.
Examples
--------
>>> from geoprior.utils.sys_utils import WorkflowOptimizer
>>> import time
>>>
>>> @WorkflowOptimizer(
... parallelize=True,
... memory_cleanup=True,
... log_level=logging.DEBUG,
... num_processes=4,
... cpu_cores=[0, 1, 2, 3],
... verbose=True
... )
>>> def process_data(data_chunk):
... '''Simulate a time-consuming data processing function.'''
... time.sleep(1) # Simulate a time-consuming task
... return f"Processed {data_chunk}"
...
>>> data_chunks = ['chunk1', 'chunk2', 'chunk3', 'chunk4']
>>> results = process_data(data=data_chunks)
>>> print(results)
['Processed chunk1', 'Processed chunk2', 'Processed chunk3',
'Processed chunk4']
Notes
-----
The decorator checks for the presence of a ``data`` keyword argument to
decide whether parallelization should be applied. When
``parallelize=True``, the decorated function should be compatible with
multiprocessing, meaning it should be picklable. Memory cleanup can be
useful in long-running workflows, and CPU affinity may improve
performance by reducing context switching and cache misses. Logging
behavior follows the standard Python logging model.
See Also
--------
multiprocessing.Pool: Provides a pool of worker processes.
psutil.Process: Allows manipulation of system processes.
"""
[docs]
def __init__(
self,
parallelize: bool = True,
memory_cleanup: bool = False,
log_level: int = logging.INFO,
optimize_cpu: bool = True,
num_processes: int | None = None,
cpu_cores: list[int] | None = None,
verbose: bool = True,
):
self.parallelize = parallelize
self.memory_cleanup = memory_cleanup
self.log_level = log_level
self.optimize_cpu = optimize_cpu
self.num_processes = num_processes
self.cpu_cores = cpu_cores
self.verbose = verbose
[docs]
def __call__(self, func):
"""
Makes the class instance callable so it can be used as a decorator.
Parameters
----------
func : function
The function to be decorated and optimized.
Returns
-------
wrapper : function
The wrapped function with optimization strategies applied.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Set up logging based on the specified log level
logger.setLevel(self.log_level)
# Record the start time
start_time = time.time()
if self.verbose:
logger.info(
f"Starting workflow optimization for `{func.__name__}`..."
)
# Apply CPU optimization if requested
if self.optimize_cpu and self.cpu_cores:
self._reset_cpu_affinity()
logger.info(
f"Optimizing CPU usage, restricted to cores {self.cpu_cores}"
)
# Apply parallelization strategy if enabled
if self.parallelize:
if "data" in kwargs:
data = kwargs["data"]
num_processes = self.num_processes or min(
multiprocessing.cpu_count(), len(data)
)
logger.info(
f"Parallelizing with {num_processes} processes."
)
# Ensure that the function is picklable for multiprocessing
if not hasattr(func, "__name__"):
raise ValueError(
f"Function {func} is not picklable. Ensure it"
" is a valid function for multiprocessing."
)
# Use multiprocessing Pool to parallelize tasks
with multiprocessing.Pool(
processes=num_processes
) as pool:
try:
# Apply function to each data chunk
results = pool.map(func, data)
except Exception as e:
logger.error(
f"Error during parallel execution: {e}"
)
results = None
else:
logger.info(
"No parallel data found, executing normally."
)
results = func(*args, **kwargs)
# If memory cleanup is requested, clean up after execution
if self.memory_cleanup:
self._clean_up_memory()
logger.info("Memory cleanup completed.")
# Log execution time
elapsed_time = time.time() - start_time
if self.verbose:
logger.info(
f"Workflow completed in {elapsed_time:.4f} seconds."
)
return results
return wrapper
@ensure_pkgs(
["psutil"],
extra="Sets the CPU affinity of the current process requires the `"
" `psutil to be installed.",
auto_install=True,
)
def _reset_cpu_affinity(self):
"""
Sets the CPU affinity of the current process to the specified
CPU cores.
If no specific cores are provided, it resets affinity to use
all available CPUs.
"""
import psutil
process = psutil.Process(os.getpid())
if self.cpu_cores:
process.cpu_affinity(self.cpu_cores)
if self.verbose:
logger.debug(
f"Set CPU affinity to cores: {self.cpu_cores}"
)
else:
process.cpu_affinity(
range(multiprocessing.cpu_count())
)
if self.verbose:
logger.debug(
"Reset CPU affinity to all available cores."
)
def _clean_up_memory(self, temp_dir=None):
"""Cleans up memory by clearing caches, releasing unused
resources, and deleting temporary files if a temporary
directory is specified."""
logger.info("Starting memory cleanup...")
_clean_up_memory(self.verbose)
[docs]
@ensure_pkgs(
["psutil"],
extra="`psutil` package is required for managing large dataset.",
auto_install=True,
)
def build_large_df(
forecast_results: list[dict],
dt_col: str,
tname: str,
spatial_cols: list[str] | None = None,
chunk_size: int | None = None,
verbose: int = 0,
) -> pd.DataFrame:
"""
Construct memory-optimized DataFrame from large forecast results
using chunked processing.
Implements dynamic chunk sizing and dtype optimization to handle
datasets exceeding available memory. Uses temporary storage and
parallel processing for efficient resource utilization.
If pyarrow is installed, the function uses parquet I/O; otherwise,
CSV files are used as a fallback.
Parameters
----------
forecast_results : List[Dict]
Input data as list of dictionary records. Each dictionary
represents a row with column-value pairs. Minimum 1000
entries recommended for chunking benefits.
dt_col : str
Name of temporal column. Accepts numeric years (e.g.,
``2023``) or datetime strings. Automatic type detection
with fallback to :class:`numpy.int32` for years >200000.
tname : str
Target variable prefix for prediction columns. Quantile columns are
expected in the form ``f"{tname}_q{quantile}"`` such as
``"subs_q10"``, while point predictions use
``f"{tname}_pred"``.
spatial_cols : List[str], optional
Geographic columns (e.g., ``['longitude', 'latitude']``).
Auto-detects categorical ( <10% unique values) vs continuous
spatial data, using :class:`pandas.Category` or
:class:`numpy.float32` dtypes respectively.
chunk_size : int, optional
Maximum rows per chunk. Auto-calculated using
.. math::
C_{optimal} = \\min\\left(10^5, \\frac{0.8M_{free}}{S_{row}}\\right)
where :math:`M_{free}` is available memory in bytes and
:math:`S_{row}` is the estimated row size, assumed to be about
1 KB by default.
verbose : int, default=0
Logging verbosity. Use ``0`` for silent mode, ``1`` for memory
reports, ``2`` for chunk diagnostics, and ``3`` for per-chunk
metrics.
Returns
-------
pd.DataFrame
Combined DataFrame with optimized dtypes, preserving
original column order. Memory footprint reduced by
40-60% compared to naive construction.
Examples
--------
>>> from geoprior.utils.sys_utils import build_large_df
>>> import numpy as np
# Basic usage with 1M rows
>>> data = [{'year': y, 'value_q50': np.random.randn()}
... for y in range(2010, 2020) for _ in range(100000)]
>>> df = build_large_df(data, dt_col='year', tname='value')
# With spatial columns
>>> geo_data = [{'lat': np.random.uniform(-90, 90),
... 'lon': np.random.uniform(-180, 180),
... 'pred': np.random.randn()}
... for _ in range(500000)]
>>> df = build_large_df(geo_data, dt_col='date', tname='pred',
... spatial_cols=['lat', 'lon'], verbose=2)
Notes
-----
Key implementation features include dynamic chunk adjustment using
:func:`psutil.virtual_memory`, concurrent chunk reading with
:class:`ThreadPoolExecutor` when many chunks are present, dtype
inference for temporal and spatial columns, and guaranteed tempfile
cleanup via ``try...finally`` blocks.
See Also
--------
pd.DataFrame : Base DataFrame construction
pd.concat : Chunk aggregation method
geoprior.nn.utils.generate_forecast : Primary data source
geoprior.utils.memory_optimizer.reduce_mem_usage : Detailed dtype optimization
"""
def _get_optimal_chunk_size(num_rows: int) -> int:
"""(No doc; private)"""
if chunk_size is not None:
return chunk_size
mem = psutil.virtual_memory()
free_mem = mem.available / (1024**3) # GB
total_mem = mem.total / (1024**3)
# Estimate memory usage (~1 KB per row)
row_size = 1024
safe_chunk = int(
(free_mem * 0.8 * 1024**3) / row_size
)
default_chunk = 100_000 if total_mem > 32 else 50_000
return min(default_chunk, safe_chunk, num_rows)
def _optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""(No doc; private)"""
# Handle dt_col
if pd.api.types.is_numeric_dtype(df[dt_col]):
if df[dt_col].max() > 2e5:
df[dt_col] = df[dt_col].astype(np.int32)
else:
try:
df[dt_col] = pd.to_datetime(df[dt_col])
except Exception as e:
if verbose >= 2:
print(
f"Could not convert {dt_col} to "
f"datetime: {str(e)}"
)
# Handle prediction columns
pred_cols = [
c
for c in df.columns
if c.startswith(f"{tname}_q")
or c.endswith("_pred")
]
for col in pred_cols:
df[col] = df[col].astype(np.float32)
# Handle spatial columns
if spatial_cols:
for col in spatial_cols:
if col in df.columns:
nunique = df[col].nunique()
ratio = nunique / len(df)
if ratio < 0.1:
df[col] = df[col].astype("category")
else:
df[col] = df[col].astype(np.float32)
return df
def _process_chunk(
chunk: list[dict], temp_dir: str, chunk_number: int
) -> str | None:
"""(No doc; private)"""
try:
chunk_df = pd.DataFrame(chunk)
chunk_df = _optimize_dtypes(chunk_df)
if _HAVE_PYARROW:
# Parquet I/O
chunk_path = os.path.join(
temp_dir, f"chunk_{chunk_number}.parquet"
)
chunk_df.to_parquet(
chunk_path, engine="pyarrow"
)
else:
# CSV fallback
chunk_path = os.path.join(
temp_dir, f"chunk_{chunk_number}.csv"
)
chunk_df.to_csv(chunk_path, index=False)
if verbose >= 3:
mem = psutil.virtual_memory()
used_gb = mem.used / (1024**3)
total_gb = mem.total / (1024**3)
print(
f"Chunk {chunk_number} processed "
f"({len(chunk_df)} rows) | "
f"Memory used: {used_gb:.1f}GB / "
f"{total_gb:.1f}GB"
)
return chunk_path
except Exception as e:
if verbose >= 1:
print(
f"Error processing chunk {chunk_number}: "
f"{str(e)}"
)
return None
# Main logic
# Early exit if empty
if not forecast_results:
if verbose >= 1:
print("No forecast results to process.")
return pd.DataFrame()
chunk_size_final = _get_optimal_chunk_size(
len(forecast_results)
)
if verbose >= 2:
print(
f"Processing {len(forecast_results):,} rows in "
f"chunks of {chunk_size_final:,}"
)
temp_dir = tempfile.mkdtemp()
chunk_paths = []
current_chunk = []
try:
# Build chunks
for _idx, entry in enumerate(forecast_results):
current_chunk.append(entry)
if len(current_chunk) >= chunk_size_final:
chunk_num = len(chunk_paths)
path = _process_chunk(
current_chunk, temp_dir, chunk_num
)
if path:
chunk_paths.append(path)
current_chunk = []
# Check memory
mem = psutil.virtual_memory()
if mem.percent > 90:
chunk_size_final = max(
1000, int(chunk_size_final * 0.8)
)
if verbose >= 2:
print(
"Reducing chunk size to "
f"{chunk_size_final} due to "
"memory pressure."
)
# Process remainder
if current_chunk:
chunk_num = len(chunk_paths)
path = _process_chunk(
current_chunk, temp_dir, chunk_num
)
if path:
chunk_paths.append(path)
# If no chunks written, return empty
if not chunk_paths:
return pd.DataFrame()
# Combine chunks
if len(chunk_paths) > 5 and os.cpu_count() > 1:
if _HAVE_PYARROW:
read_fn = partial(
pd.read_parquet, engine="pyarrow"
)
else:
read_fn = partial(pd.read_csv)
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
chunks = list(
executor.map(read_fn, chunk_paths)
)
else:
if _HAVE_PYARROW:
chunks = [
pd.read_parquet(p, engine="pyarrow")
for p in chunk_paths
]
else:
chunks = [pd.read_csv(p) for p in chunk_paths]
forecast_df = pd.concat(chunks, ignore_index=True)
finally:
# Cleanup
for path in chunk_paths:
try:
if os.path.exists(path):
os.remove(path)
except Exception as e:
if verbose >= 2:
print(f"Error deleting {path}: {str(e)}")
try:
os.rmdir(temp_dir)
except OSError:
pass
# Final optimization
forecast_df = _optimize_dtypes(forecast_df)
if verbose >= 1:
mem_usage = forecast_df.memory_usage(
deep=True
).sum() / (1024**2)
print(f"Final DataFrame size: {mem_usage:.2f} MB")
return forecast_df
[docs]
@ensure_pkgs(
["psutil"],
extra="`get_cpu_usage` requires the `psutil` "
"package for system resource monitoring",
auto_install=True,
)
def get_cpu_usage(per_cpu: bool = False) -> float | None:
"""
Returns the current CPU usage as a percentage, optionally providing
per-core usage for systems with multiple cores.
Parameters
----------
per_cpu : bool, default=False
If True, returns a list with the CPU usage percentage for each core.
If False, returns the overall CPU usage as a single percentage.
Returns
-------
usage : float or list of float, optional
If `per_cpu` is False, returns the overall CPU usage as a float percentage.
If `per_cpu` is True, returns a list with each entry corresponding to the
usage percentage of an individual core.
Notes
-----
This function uses the `psutil` library to retrieve CPU usage information
and requires an interval of 1 second to calculate the usage accurately.
Examples
--------
>>> from geoprior.utils.sys_utils import get_cpu_usage
>>> get_cpu_usage()
1.3
>>> get_cpu_usage(per_cpu=True)
[20.4, 25.1, 21.3, 24.5]
"""
try:
usage = psutil.cpu_percent(interval=1, percpu=per_cpu)
logger.debug(f"CPU usage retrieved: {usage}")
return usage
except Exception as e:
logger.error(f"Failed to retrieve CPU usage: {e}")
return None
[docs]
@ensure_pkgs(
["psutil"],
extra="`get_memory_usage` requires the `psutil`"
" package for system memory usage",
auto_install=True,
)
def get_memory_usage() -> tuple[float, float, float] | None:
"""
Retrieves system memory usage statistics, providing the total, used,
and available memory in megabytes (MB).
Returns
-------
memory : tuple of float
A tuple containing:
- `total_memory` : Total memory in MB.
- `used_memory` : Used memory in MB.
- `available_memory` : Available memory in MB.
Notes
-----
This function leverages the `psutil` library for retrieving memory usage
information. The conversion to MB is performed by dividing each value
by 1024^2.
Examples
--------
>>> from geoprior.utils.sys_utils import get_memory_usage
>>> total, used, available = get_memory_usage()
>>> print(f"Total: {total} MB, Used: {used} MB, Available: {available} MB")
Total: 8192 MB, Used: 4096 MB, Available: 4096 MB
"""
try:
mem = psutil.virtual_memory()
total_memory = mem.total / (1024**2) # Convert to MB
used_memory = mem.used / (1024**2) # Convert to MB
available_memory = mem.available / (
1024**2
) # Convert to MB
logger.debug(
f"Memory usage retrieved: Total: {total_memory} MB,"
" Used: {used_memory} MB, Available: {available_memory} MB"
)
memory_infos = {
"Total": f"{total_memory} MB",
"Used": f"{used_memory} MB",
"Available": f"{available_memory} MB",
}
return memory_infos
except Exception as e:
logger.error(f"Failed to retrieve memory usage: {e}")
return None
[docs]
@ensure_pkgs(
["psutil"],
extra="`get_disk_usage` requires the `psutil`"
" package for disk usage statistics ",
auto_install=True,
)
def get_disk_usage(
path: str = "/",
) -> tuple[float, float, float] | None:
"""
Returns disk usage statistics for a specified filesystem path,
including total, used, and free disk space in gigabytes (GB).
Parameters
----------
path : str, default='/'
The filesystem path for which to check disk usage statistics.
By default, it uses the root directory (`/`).
Returns
-------
disk_usage : tuple of float, optional
A tuple containing:
- `total_disk` : Total disk space in GB.
- `used_disk` : Used disk space in GB.
- `free_disk` : Free disk space in GB.
Notes
-----
Disk usage information is gathered using the `psutil` library. Disk space
is converted to gigabytes (GB) by dividing the values by 1024^3.
Raises
------
FileNotFoundError
If the specified path does not exist on the filesystem.
PermissionError
If the program does not have permission to access the specified path.
Examples
--------
>>> from geoprior.utils.sys_utils import get_disk_usage
>>> total, used, free = get_disk_usage(path="/")
>>> print(f"Total: {total} GB, Used: {used} GB, Free: {free} GB")
Total: 256 GB, Used: 128 GB, Free: 128 GB
"""
try:
usage = psutil.disk_usage(path)
total_disk = usage.total / (1024**3) # Convert to GB
used_disk = usage.used / (1024**3) # Convert to GB
free_disk = usage.free / (1024**3) # Convert to GB
logger.debug(
f"Disk usage for path '{path}': Total: {total_disk} GB,"
" Used: {used_disk} GB, Free: {free_disk} GB"
)
disk_usage_infos = {
"Total": f"{total_disk} GB",
"Used": f"{used_disk} GB",
"Free": f"{free_disk} GB",
}
return disk_usage_infos
except FileNotFoundError:
logger.error(f"Path not found: {path}")
return None
except PermissionError:
logger.error(f"Permission denied for path: {path}")
return None
except Exception as e:
logger.error(
f"Failed to retrieve disk usage for path '{path}': {e}"
)
return None
[docs]
def is_gpu_available() -> bool:
"""
Checks if a GPU is available for computation on the system, using the
PyTorch library if it is installed.
Returns
-------
available : bool
True if a GPU is available, False otherwise.
Notes
-----
This function relies on the `torch` library (PyTorch) to detect GPU
availability. If PyTorch is not installed, it logs a warning and
returns False.
Raises
------
ImportError
If PyTorch is not installed and thus the GPU availability
check cannot be performed.
Examples
--------
>>> from geoprior.utils.sys_utils import is_gpu_available
>>> is_gpu_available()
True
"""
try:
import torch
gpu_available = torch.cuda.is_available()
logger.debug(
f"GPU availability check: {gpu_available}"
)
return gpu_available
except ImportError:
logger.warning(
"PyTorch is not installed. GPU availability check cannot be performed."
)
return False
except Exception as e:
logger.error(f"Failed to check GPU availability: {e}")
return False
[docs]
@ensure_pkgs(
"torch",
"torch library is required for retrieving"
" detailed information about available GPUs.",
)
def get_gpu_info() -> dict[str, str] | None:
"""
Provides detailed information about available GPUs, including device name,
memory capacity, and CUDA version (if PyTorch is installed).
Returns
-------
gpu_info : dict or None
Dictionary containing GPU details, including:
- `device_count` : Number of available GPU devices.
- `device_name` : Name of the first GPU device.
- `memory_total` : Total memory of the first GPU device in GB.
- `cuda_version` : CUDA version, if available.
If no GPU is available or PyTorch is not installed, returns None.
Notes
-----
This function requires PyTorch to check for GPU availability. If PyTorch
is not installed, it logs a warning and returns None.
Raises
------
ImportError
If PyTorch is not installed on the system.
RuntimeError
If there is an issue retrieving GPU properties.
Examples
--------
>>> from geoprior.utils.sys_utils import get_gpu_info
>>> gpu_info = get_gpu_info()
>>> print(gpu_info)
{'device_count': '1', 'device_name': 'NVIDIA Tesla T4',
'memory_total': '15.99 GB', 'cuda_version': '11.1'}
"""
if not TORCH_AVAILABLE:
logger.warning(
"PyTorch not installed; detailed GPU information is unavailable."
)
return None
if not torch.cuda.is_available():
logger.info("No GPU is available on this system.")
return None
try:
gpu_info = {
"device_count": str(torch.cuda.device_count()),
"device_name": torch.cuda.get_device_name(0),
"memory_total": (
f"{torch.cuda.get_device_properties(0).total_memory / (1024**3):.2f} GB"
),
"cuda_version": torch.version.cuda or "N/A",
}
logger.debug(f"GPU information: {gpu_info}")
return gpu_info
except Exception as e:
logger.error(f"Error retrieving GPU information: {e}")
return None
[docs]
def system_uptime() -> str:
"""
Retrieves the system uptime, which is the duration the system has been
running since the last boot, in a human-readable format.
Returns
-------
uptime : str
System uptime in the format "Xd:Yh:Zm:Ws", where X, Y, Z, and W
represent days, hours, minutes, and seconds, respectively.
Notes
-----
This function is cross-platform and works on Windows, macOS (Darwin),
and Linux. It uses different commands to retrieve uptime based on the
operating system.
Raises
------
NotImplementedError
If the function is called on an unsupported operating system.
RuntimeError
If an error occurs while retrieving uptime.
Examples
--------
>>> from geoprior.utils.sys_utils import system_uptime
>>> system_uptime()
'2d:10h:33m:12s'
"""
try:
uptime_seconds = None
if platform.system() == "Windows":
uptime_seconds = int(
subprocess.check_output(
"net stats srv", shell=True
)
.decode()
.split("since")[1]
.strip()
.split()[0]
)
elif platform.system() == "Linux":
uptime_seconds = int(
float(
open("/proc/uptime").readline().split()[0]
)
)
elif platform.system() == "Darwin":
uptime_seconds = int(
subprocess.check_output(
"sysctl -n kern.boottime", shell=True
)
.decode()
.split(",")[0]
.split(" ")[4]
)
if uptime_seconds is not None:
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = (
f"{days}d:{hours}h:{minutes}m:{seconds}s"
)
logger.debug(f"System uptime: {uptime_str}")
return uptime_str
else:
raise NotImplementedError(
"Unsupported operating system."
)
except Exception as e:
logger.error(f"Failed to retrieve system uptime: {e}")
return "N/A"
[docs]
def is_port_open(port: int) -> bool:
"""
Checks if a specified network port is open or occupied on the local machine.
Parameters
----------
port : int
The port number to check for availability.
Returns
-------
bool
Returns True if the port is open (not in use), otherwise False.
Notes
-----
This function uses a socket connection to check if the specified port
is open. It is helpful in applications where network services or
applications need to bind to a specific port.
Raises
------
ValueError
If an invalid port number is provided.
Examples
--------
>>> from geoprior.utils.sys_utils import is_port_open
>>> is_port_open(8080)
False
"""
with socket.socket(
socket.AF_INET, socket.SOCK_STREAM
) as s:
result = s.connect_ex(("localhost", port))
is_open = result != 0
status = "open" if is_open else "occupied"
logger.debug(f"Port {port} is {status}.")
return is_open
[docs]
@ensure_pkgs(
["psutil"],
extra="`get_disk_usage` requires the `psutil`"
" package for summarizing of the current environment ",
auto_install=True,
)
def environment_summary() -> dict[str, str]:
"""
Provides a summary of the current computing environment, including
information on Python version, OS, CPU, memory, available GPU(s),
and a list of installed Python packages.
Returns
-------
env_info : dict
Dictionary containing environment details, including:
- `python_version` : The version of Python in use.
- `os` : Operating system name.
- `os_version` : Version of the operating system.
- `cpu_count` : Number of logical CPU cores.
- `memory` : Total system memory in GB.
- `device_count`, `device_name`, `memory_total`, `cuda_version`
(if available) : GPU details from `detailed_gpu_info`.
- `installed_packages` : List of installed Python packages
(first 10) in `name==version` format.
Notes
-----
The function attempts to load installed packages using `pkg_resources`.
If `pkg_resources` is not available, it defaults to "N/A" for
installed packages.
Raises
------
ImportError
If `pkg_resources` is not installed.
RuntimeError
If an error occurs while gathering environment information.
Examples
--------
>>> from geoprior.utils.sys_utils import environment_summary
>>> env_info = environment_summary()
>>> print(env_info)
{'python_version': '3.9.5', 'os': 'Linux', 'os_version': '5.4.0-80-generic',
'cpu_count': '4', 'memory': '15.5 GB', 'device_count': '1',
'device_name': 'NVIDIA Tesla T4', 'memory_total': '15.99 GB',
'cuda_version': '11.1', 'installed_packages': 'numpy==1.21.0, pandas==1.3.0, ...'}
"""
env_info = {
"python_version": platform.python_version(),
"os": platform.system(),
"os_version": platform.version(),
"cpu_count": str(psutil.cpu_count(logical=True)),
"memory": f"{psutil.virtual_memory().total / (1024**3):.2f} GB",
}
# GPU information if available
gpu_info = get_gpu_info()
if gpu_info:
env_info.update(gpu_info)
try:
# List installed packages if possible
import pkg_resources
installed_packages = [
f"{d.project_name}=={d.version}"
for d in pkg_resources.working_set
]
env_info["installed_packages"] = (
", ".join(installed_packages[:10]) + "..."
if len(installed_packages) > 10
else ", ".join(installed_packages)
)
except ImportError:
env_info["installed_packages"] = "N/A"
logger.debug(f"Environment summary: {env_info}")
return env_info
[docs]
def manage_env_variable(
var_name: str,
value: str | None = None,
default: str | None = None,
action: str = "get",
file_path: str | None = None,
overwrite: bool = False,
) -> str | None:
"""
Manages environment variables, allowing retrieval, setting, or loading
from a `.env` file.
Parameters
----------
var_name : str
The name of the environment variable to retrieve, set, or load.
value : str, optional
The value to set for the environment variable. Only used if `action`
is `"set"`. Default is None.
default : str, optional
The default value to return if the environment variable `var_name`
is not found when `action` is `"get"`. If None, returns None when the
variable is not found. Default is None.
action : str, default="get"
The action to perform. Options are:
- `"get"`: Retrieves the environment variable `var_name`.
- `"set"`: Sets the environment variable `var_name` to `value`.
- `"load"`: Loads environment variables from a `.env` file
specified by `file_path`.
file_path : str, optional
The path to the `.env` file to load variables from when `action`
is `"load"`. Required if `action` is `"load"`.
overwrite : bool, default=False
If True, allows overwriting existing environment variables when
`action` is `"load"` or `"set"`. If False, preserves the current
value of existing environment variables.
Returns
-------
result : str or None
- If `action` is `"get"`, returns the value of the environment
variable `var_name` or `default` if the variable is not set.
- If `action` is `"set"` or `"load"`, returns None.
Notes
-----
- This function is useful for managing configuration data securely by
utilizing environment variables.
- Loading from a `.env` file allows you to define multiple variables
in a single file, each defined in the `KEY=VALUE` format.
Raises
------
ValueError
If `action` is `"set"` and `value` is not provided, or if `action`
is `"load"` and `file_path` is not specified.
FileNotFoundError
If `action` is `"load"` and `file_path` does not exist.
Examples
--------
>>> from geoprior.utils.sys_utils import manage_env_variable
>>> manage_env_variable('HOME', action='get')
'/home/username'
>>> manage_env_variable('NEW_VAR', value='new_value', action='set')
>>> manage_env_variable('NEW_VAR', action='get')
'new_value'
>>> manage_env_variable('NON_EXISTENT_VAR', default='default_value', action='get')
'default_value'
>>> manage_env_variable(
var_name='', action='load', file_path='/path/to/.env', overwrite=True)
See Also
--------
os.getenv : Retrieves environment variables.
os.environ : Provides access to the environment variables.
"""
if action == "get":
# Get environment variable or return default if not found
return os.getenv(var_name, default)
elif action == "set":
if value is None:
raise ValueError(
"A value must be provided when action='set'."
)
if overwrite or var_name not in os.environ:
os.environ[var_name] = value
elif action == "load":
if file_path is None:
raise ValueError(
"file_path must be specified when action='load'."
)
if not os.path.isfile(file_path):
raise FileNotFoundError(
f"The file '{file_path}' was not found."
)
with open(file_path) as file:
for line in file:
line = line.strip()
if not line or line.startswith("#"):
continue # Skip comments and empty lines
if "=" not in line:
raise ValueError(
f"Invalid line in .env file: '{line}'"
)
file_var_name, file_value = map(
str.strip, line.split("=", 1)
)
if (
overwrite
or file_var_name not in os.environ
):
os.environ[file_var_name] = file_value
else:
raise ValueError(
f"Invalid action '{action}'. Expected 'get', 'set', or 'load'."
)
[docs]
def is_path_accessible(
path: str, permissions: str = "r"
) -> bool:
"""
Checks if a specified path is accessible with the given permissions.
Parameters
----------
path : str
The path to check for accessibility.
permissions : str, optional
The permission types to check: `'r'` for read, `'w'` for write,
`'x'` for execute. Multiple permissions can be specified, e.g.,
`"rw"`. Default is `"r"`.
Returns
-------
accessible : bool
True if the path is accessible with the specified permissions,
otherwise False.
Notes
-----
This function verifies file permissions in the current user context,
ensuring flexibility for multi-user environments.
Examples
--------
>>> from geoprior.utils.sys_utils import is_path_accessible
>>> is_path_accessible("/path/to/file", permissions="rw")
True
"""
if not os.path.exists(path):
return False
permission_checks = {
"r": os.R_OK,
"w": os.W_OK,
"x": os.X_OK,
}
# Validate the permissions argument
for perm in permissions:
if perm not in permission_checks:
raise ValueError(
f"Invalid permission '{perm}'. Use 'r', 'w', or 'x'."
)
# Check each specified permission
return all(
os.access(path, permission_checks[perm])
for perm in permissions
)
[docs]
@ensure_pkgs(
names="fcntl",
extra="`fcntl` is required for file locking/unlocking in Unix-based systems.",
auto_install=False,
dist_names="fcntl",
# `fcntl` is standard on Unix;let the user know if unavailable.
# so we set infer_dist_name to False
infer_dist_name=False,
)
def manage_file_lock(
file_path: str,
action: str = "lock",
blocking: bool = True,
exclusive: bool = True,
) -> int | None:
"""
Manages file locking and unlocking to prevent concurrent access.
This function allows both locking and unlocking actions on a file to
prevent or allow concurrent access. It opens the file and applies an
exclusive lock or shared lock, depending on the parameters specified.
Parameters
----------
file_path : str
Path to the file that needs to be locked or unlocked.
action : str, default="lock"
Specifies the action to perform: `"lock"` to acquire a lock,
or `"unlock"` to release a previously acquired lock.
blocking : bool, default=True
If True, the lock will block until it can be acquired. If False,
the lock will raise an exception if it cannot be acquired immediately.
exclusive : bool, default=True
If True, an exclusive lock is applied. If False, a shared lock is
applied (other processes can read the file simultaneously).
Returns
-------
file_descriptor : int or None
If `action` is `"lock"`, returns the file descriptor on success;
otherwise, None if `action` is `"unlock"` or if locking fails.
Notes
-----
This function uses the `fcntl` module for locking, which is only
available on Unix-based systems. The lock is maintained as long as the
file descriptor remains open.
- For `"lock"`, the function opens the file and applies a lock.
- For `"unlock"`, it removes the lock and closes the file descriptor.
Raises
------
ValueError
If the `action` parameter is not one of `"lock"` or `"unlock"`.
OSError or IOError
If locking or unlocking the file fails.
Examples
--------
>>> from geoprior.utils.sys_utils import manage_file_lock
>>> fd = manage_file_lock("/path/to/file", action="lock", blocking=True)
>>> if fd:
... print("File is locked.")
... manage_file_lock(fd, action="unlock")
... print("File is unlocked.")
See Also
--------
os.open : Opens a file descriptor.
fcntl.flock : Applies or removes file locks.
"""
import fcntl
if action not in {"lock", "unlock"}:
raise ValueError(
f"Invalid action '{action}'. Expected 'lock' or 'unlock'."
)
# Lock action: Open and lock the file
if action == "lock":
try:
fd = os.open(file_path, os.O_RDWR)
lock_type = (
fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
)
lock_flag = 0 if blocking else fcntl.LOCK_NB
fcntl.flock(fd, lock_type | lock_flag)
logger.debug(
f"Locked file '{file_path}' with fd {fd}"
)
return fd
except OSError as e:
logger.error(
f"Failed to lock file '{file_path}': {e}"
)
return None
# Unlock action: Unlock and close the file descriptor
elif action == "unlock":
try:
fcntl.flock(file_path, fcntl.LOCK_UN)
os.close(file_path)
logger.debug(
f"Unlocked file '{file_path}' with fd {file_path}"
)
return None
except OSError as e:
logger.error(
f"Failed to unlock file descriptor '{file_path}': {e}"
)
return None
[docs]
def get_system_info() -> dict[str, str]:
"""
Retrieves basic system information including OS, Python version,
CPU details, and GPU availability.
Returns
-------
system_info : dict
A dictionary containing basic system information:
- `os_name` : Name of the operating system.
- `os_version` : Version of the operating system.
- `python_version` : Python version.
- `cpu_count` : Number of logical CPUs.
- `gpu_available` : Whether a GPU is available (`True` or `False`).
Notes
-----
This function checks for GPU availability via PyTorch if installed,
otherwise it defaults to False.
Examples
--------
>>> from geoprior.utils.sys_utils import get_system_info
>>> get_system_info()
{'os_name': 'Linux', 'os_version': '5.4.0-81-generic', 'python_version': '3.8.5',
'cpu_count': '8', 'gpu_available': 'True'}
See Also
--------
get_python_version : Retrieves the current Python version.
"""
gpu_available = False
try:
import torch
gpu_available = torch.cuda.is_available()
except ImportError:
pass
system_info = {
"os_name": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_count": str(psutil.cpu_count(logical=True)),
"gpu_available": str(gpu_available),
}
return system_info
[docs]
def get_python_version() -> str:
"""
Returns the version of Python being used in the current environment.
Returns
-------
python_version : str
The version of Python currently in use.
Examples
--------
>>> from geoprior.utils.sys_utils import get_python_version
>>> get_python_version()
'3.8.5'
See Also
--------
get_system_info : Provides broader system information, including Python version.
"""
return platform.python_version()
[docs]
def get_installed_packages() -> list[str]:
"""
Lists all installed packages along with their versions in the current
Python environment.
Returns
-------
installed_packages : list of str
A list of installed packages and their versions in the format
`package_name==version`.
Notes
-----
This function is useful for dependency management and tracking installed
packages, especially in data science and production environments.
Examples
--------
>>> from geoprior.utils.sys_utils import get_installed_packages
>>> get_installed_packages()
['numpy==1.21.0', 'pandas==1.3.0', 'scikit-learn==0.24.2', ...]
See Also
--------
environment_summary : Summarizes the environment, including installed packages.
"""
try:
import pkg_resources
installed_packages = [
f"{d.project_name}=={d.version}"
for d in pkg_resources.working_set
]
return installed_packages
except ImportError:
logger.warning(
"pkg_resources is not available. Cannot list installed packages."
)
return []
[docs]
def run_command(
command: str, capture_output: bool = True
) -> str | None:
"""
Runs a shell command and optionally captures its output.
Parameters
----------
command : str
The shell command to execute.
capture_output : bool, default=True
If True, captures and returns the command’s output. If False,
runs the command without capturing output, which is useful for
commands that produce a large output or run interactively.
Returns
-------
output : str or None
Returns the command output as a string if `capture_output` is True.
If `capture_output` is False, returns None.
Notes
-----
This function uses `subprocess.run` to execute shell commands, which
allows for error handling and logging. For example,
``run_command("echo Hello World")`` returns ``"Hello World\\n"``
when ``capture_output=True``.
Raises
------
subprocess.CalledProcessError
If the command exits with a non-zero status and `capture_output` is
True.
"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=capture_output,
text=True,
check=True,
)
return result.stdout if capture_output else None
except subprocess.CalledProcessError as e:
print(f"Error executing command '{command}': {e}")
return None
[docs]
def create_temp_file(
suffix: str = "", prefix: str = "tmp"
) -> str:
"""
Creates a temporary file and returns its path.
Parameters
----------
suffix : str, optional
The suffix for the temporary file. Default is an empty string.
prefix : str, optional
The prefix for the temporary file. Default is `"tmp"`.
Returns
-------
file_path : str
The full path of the created temporary file.
Notes
-----
This function is useful for handling data temporarily in applications
where files need to be stored and accessed for a short time.
Examples
--------
>>> from geoprior.utils.sys_utils import create_temp_file
>>> temp_file = create_temp_file()
>>> print(temp_file)
'/tmp/tmpabcd1234'
See Also
--------
create_temp_dir : Creates a temporary directory.
"""
temp_file = tempfile.NamedTemporaryFile(
suffix=suffix, prefix=prefix, delete=False
)
temp_file.close()
return temp_file.name
[docs]
def create_temp_dir(prefix: str = "tmp") -> str:
"""
Creates a temporary directory and returns its path.
Parameters
----------
prefix : str, optional
The prefix for the temporary directory name. Default is `"tmp"`.
Returns
-------
dir_path : str
The full path of the created temporary directory.
Notes
-----
This function is helpful for managing temporary directories in
applications where short-term data storage is needed.
Examples
--------
>>> from geoprior.utils.sys_utils import create_temp_dir
>>> temp_dir = create_temp_dir()
>>> print(temp_dir)
'/tmp/tmpabcd1234'
See Also
--------
create_temp_file : Creates a temporary file.
"""
return tempfile.mkdtemp(prefix=prefix)
[docs]
def clean_temp_files(directory: str | None = None) -> None:
"""
Cleans up temporary files in a specified directory.
Parameters
----------
directory : str, optional
The directory to clean up. If None, cleans the default temporary
directory.
Returns
-------
None
Notes
-----
This function is particularly useful for freeing up disk space in
data-intensive applications.
Examples
--------
>>> from geoprior.utils.sys_utils import clean_temp_files
>>> clean_temp_files("/path/to/temp/dir")
"""
dir_to_clean = directory or tempfile.gettempdir()
for item in os.listdir(dir_to_clean):
item_path = os.path.join(dir_to_clean, item)
try:
if os.path.isfile(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
except Exception as e:
print(f"Failed to delete {item_path}: {e}")
[docs]
def is_package_installed(package_name: str) -> bool:
"""
Checks if a specific package is installed in the current Python
environment.
Parameters
----------
package_name : str
The name of the package to check.
Returns
-------
bool
True if the package is installed, otherwise False.
Examples
--------
>>> from geoprior.utils.sys_utils import is_package_installed
>>> is_package_installed("numpy")
True
"""
package_spec = importlib.util.find_spec(package_name)
return package_spec is not None
[docs]
def manage_temp(
suffix: str = "",
prefix: str = "tmp",
action: str = "create_file",
directory: str | None = None,
clean_all: bool = False,
) -> str | None:
"""
Manages temporary files and directories by creating, accessing, or
cleaning them as needed.
Parameters
----------
suffix : str, optional
Suffix for the temporary file or directory, used only if `action`
is `"create_file"` or `"create_dir"`. Default is an empty string.
prefix : str, optional
Prefix for the temporary file or directory, used only if `action`
is `"create_file"` or `"create_dir"`. Default is `"tmp"`.
action : str, default="create_file"
Specifies the operation to perform. Options include:
- `"create_file"`: Creates a temporary file and returns its path.
- `"create_dir"`: Creates a temporary directory and returns its path.
- `"clean"`: Cleans temporary files in the specified directory or
the system temp directory if none is provided.
directory : str, optional
Directory to clean when `action` is `"clean"`. If `None`, uses the
system’s default temporary directory. Ignored for file or directory
creation actions.
clean_all : bool, default=False
If `True`, removes all files and directories within the specified
directory. If `False`, only deletes files or directories created by
this process. Used only when `action` is `"clean"`.
Returns
-------
temp_path : str or None
- For `"create_file"` and `"create_dir"` actions, returns the path
of the created file or directory.
- For `"clean"`, returns None.
Raises
------
ValueError
If an invalid action is specified.
Notes
-----
This function is useful for managing temporary resources in data
processing tasks, where files or directories need to be created and
cleaned up after use.
Examples
--------
>>> from geoprior.utils.sys_utils import manage_temp
>>> temp_file = manage_temp(action="create_file")
>>> print(temp_file)
'/tmp/tmpabcd1234'
>>> temp_dir = manage_temp(action="create_dir", prefix="data_")
>>> print(temp_dir)
'/tmp/data_abcd1234'
>>> manage_temp(action="clean", directory="/path/to/temp", clean_all=True)
See Also
--------
tempfile : Module for creating temporary files and directories.
shutil : High-level file operations.
"""
# Create a temporary file and return its path
if action == "create_file":
temp_file = tempfile.NamedTemporaryFile(
suffix=suffix, prefix=prefix, delete=False
)
temp_file.close()
return temp_file.name
# Create a temporary directory and return its path
elif action == "create_dir":
return tempfile.mkdtemp(suffix=suffix, prefix=prefix)
# Clean up temporary files and directories
elif action == "clean":
dir_to_clean = directory or tempfile.gettempdir()
for item in os.listdir(dir_to_clean):
item_path = os.path.join(dir_to_clean, item)
try:
if os.path.isfile(
item_path
) or os.path.islink(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path) and clean_all:
shutil.rmtree(item_path)
except Exception as e:
print(f"Failed to delete {item_path}: {e}")
return None
# Handle invalid action
else:
raise ValueError(
f"Invalid action '{action}'. Expected 'create_file',"
" 'create_dir', or 'clean'."
)
[docs]
def check_port_in_use(port: int) -> bool:
"""
Checks if a port is currently in use, which is useful for server-based
applications.
Parameters
----------
port : int
The port number to check.
Returns
-------
bool
True if the port is in use, otherwise False.
Examples
--------
>>> from geoprior.utils.sys_utils import check_port_in_use
>>> check_port_in_use(8080)
False
"""
with socket.socket(
socket.AF_INET, socket.SOCK_STREAM
) as s:
return s.connect_ex(("localhost", port)) == 0
[docs]
def get_uptime() -> str:
"""
Returns the system uptime in a human-readable format.
Returns
-------
uptime : str
The system uptime formatted as `"Xd:Yh:Zm:Ws"`, where X, Y, Z, and W
are days, hours, minutes, and seconds respectively.
Notes
-----
This function is useful for monitoring or diagnosing long-running
processes on the system.
Examples
--------
>>> from geoprior.utils.sys_utils import get_uptime
>>> get_uptime()
'2d:5h:34m:12s'
"""
uptime_seconds = int(psutil.boot_time())
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{days}d:{hours}h:{minutes}m:{seconds}s"
# ----
[docs]
def parallelize_jobs(
function: Callable,
tasks: Sequence[dict[str, Any]] = (),
n_jobs: int | None = None,
executor_type: str = "process",
) -> list:
"""
Parallelize the execution of a callable across multiple processors,
supporting both positional and keyword arguments.
Parameters
----------
function : Callable[..., Any]
The function to execute in parallel. This function must be picklable
if using `executor_type='process'`.
tasks : Sequence[Dict[str, Any]], optional
A sequence of dictionaries, where each dictionary contains
two keys: 'args' (a tuple) for positional arguments,
and 'kwargs' (a dict) for keyword arguments, for one execution of
`function`. Defaults to an empty sequence.
n_jobs : Optional[int], optional
The number of jobs to run in parallel. `None` or `1` uses a single
processor, any positive integer specifies the
exact number of processors to use, `-1` uses all available processors.
Default is None (1 processor).
executor_type : str, optional
The type of executor to use. Can be 'process' for CPU-bound tasks or
'thread' for I/O-bound tasks. Default is 'process'.
Returns
-------
list
A list of results from the function executions.
Raises
------
ValueError
If `function` is not picklable when using 'process' as `executor_type`.
Examples
--------
>>> from geoprior.utils.sys_utils import parallelize_jobs
>>> def greet(name, greeting='Hello'):
... return f"{greeting}, {name}!"
>>> tasks = [
... {'args': ('John',), 'kwargs': {'greeting': 'Hi'}},
... {'args': ('Jane',), 'kwargs': {}}
... ]
>>> results = parallelize_jobs(greet, tasks, n_jobs=2)
>>> print(results)
['Hi, John!', 'Hello, Jane!']
"""
if executor_type == "process":
import_optional_dependency("cloudpickle")
import cloudpickle
try:
cloudpickle.dumps(function)
except cloudpickle.PicklingError:
raise ValueError(
"The function to be parallelized must be "
"picklable when using 'process' executor."
)
num_workers = (
multiprocessing.cpu_count()
if n_jobs == -1
else (1 if n_jobs is None else n_jobs)
)
ExecutorClass = (
ProcessPoolExecutor
if executor_type == "process"
else ThreadPoolExecutor
)
results = []
with ExecutorClass(max_workers=num_workers) as executor:
futures = [
executor.submit(
function,
*task.get("args", ()),
**task.get("kwargs", {}),
)
for task in tasks
]
for future in as_completed(futures):
results.append(future.result())
return results
[docs]
def find_by_regex(o, pattern, func=re.match, **kws):
"""Find pattern in object whatever an "iterable" or not.
when we talk about iterable, a string value is not included.
Parameters
-----------
o: str or iterable,
text litteral or an iterable object containing or not the specific
object to match.
pattern: str, default = '[_#&*@!_,;\s-]\s*'
The base pattern to split the text into a columns
func: re callable , default=re.match
regular expression search function. Can be
[re.match, re.findall, re.search ],or any other regular expression
function.
* ``re.match()``: function searches the regular expression pattern and
return the first occurrence. The Python RegEx Match method checks
for a match only at the beginning of the string. So, if a match is
found in the first line, it returns the match object. But if a match
is found in some other line, the Python RegEx Match function returns
null.
* ``re.search()``: function will search the regular expression pattern
and return the first occurrence. Unlike Python re.match(), it will
check all lines of the input string. The Python re.search() function
returns a match object when the pattern is found and “null” if
the pattern is not found
* ``re.findall()`` module is used to search for 'all' occurrences that
match a given pattern. In contrast, search() module will only
return the first occurrence that matches the specified pattern.
findall() will iterate over all the lines of the file and will
return all non-overlapping matches of pattern in a single step.
kws: dict,
Additional keywords arguments passed to functions :func:`re.match` or
:func:`re.search` or :func:`re.findall`.
Returns
-------
om: list
matched object put is the list
Example
--------
>>> from geoprior.utils.sys_utils import find_by_regex
>>> from geoprior.datasets import load_hlogs
>>> X0, _= load_hlogs (as_frame =True )
>>> columns = X0.columns
>>> str_columns =','.join (columns)
>>> find_by_regex (str_columns , pattern='depth', func=re.search)
... ['depth']
>>> find_by_regex(columns, pattern ='depth', func=re.search)
... ['depth_top', 'depth_bottom']
"""
om = []
if isinstance(o, str):
om = func(pattern=pattern, string=o, **kws)
if om:
om = om.group()
om = [om]
elif is_iterable(o):
o = list(o)
for s in o:
z = func(pattern=pattern, string=s, **kws)
if z:
om.append(s)
if func.__name__ == "findall":
om = list(itertools.chain(*om))
# keep None is nothing
# fit the corresponding pattern
if len(om) == 0 or om[0] is None:
om = None
return om
[docs]
def find_similar_string(
name: str,
container: list[str] | tuple[str, ...] | dict[Any, Any],
stripitems: str | list[str] | tuple[str, ...] = "_",
deep: bool = False,
) -> str | None:
"""
Find the most similar string in a container to the provided name.
This function searches for the most likely matching string in a container
based on the provided `name`. It sanitizes the `name` by stripping specified
characters and can perform a deep search to find partial matches.
Parameters
----------
name : str
The string to search for in the container.
container : list, tuple, or dict
The container with strings to search in.
stripitems : str or list of str, optional
Characters or strings to strip from `name` before searching. If a string,
multiple items can be separated by ':', ',', or ';'. Default is ``'_'``.
deep : bool, optional
If ``True``, performs a deeper search by checking if `name` is a substring
of any item in the container. Default is ``False``.
Returns
-------
result : str or None
The most similar string from the container, or ``None`` if no match is found.
Examples
--------
>>> from geoprior.utils.sys_utils import find_similar_string
>>> container = {'dipole': 1, 'quadrupole': 2}
>>> find_similar_string('dipole_', container)
'dipole'
>>> find_similar_string('dip', container, deep=True)
'dipole'
>>> find_similar_string('+dipole__', container, stripitems='+;__', deep=True)
'dipole'
Notes
-----
This function is useful when trying to find the closest matching string
in a container, especially when exact matches are not guaranteed due to
formatting inconsistencies or typos.
See Also
--------
str.strip : Returns a copy of the string with leading and trailing characters removed.
"""
# Validate inputs
if not isinstance(name, str):
raise TypeError(
"`name` must be a string, got {type(name).__name__}"
)
if not isinstance(container, list | tuple | dict):
raise TypeError(
"`container` must be a list, tuple,"
f" or dict, got {type(container).__name__}"
)
if not isinstance(stripitems, str | list | tuple):
raise TypeError(
f"`stripitems` must be a string or list/tuple of strings,"
f" got {type(stripitems).__name__}"
)
# Process stripitems
if isinstance(stripitems, str):
for sep in (":", ",", ";"):
if sep in stripitems:
stripitems = stripitems.split(sep)
break
else:
stripitems = [stripitems]
else:
stripitems = list(stripitems)
# Sanitize name
for s in stripitems:
name = name.strip(s)
# Prepare container keys
if isinstance(container, dict):
container_keys = [
key.lower() for key in container.keys()
]
keys_list = list(container.keys())
else:
container_keys = [
str(item).lower() for item in container
]
keys_list = list(container)
name_lower = name.lower()
try:
index = container_keys.index(name_lower)
result = keys_list[index]
return result
except ValueError:
pass # Not found, proceed
if deep:
for idx, item in enumerate(container_keys):
if name_lower in item:
result = keys_list[idx]
return result
return None
[docs]
def represent_callable(
obj: Callable,
skip: str | list[str] | None = None,
) -> str:
"""
Represent callable objects by formatting their signatures.
This function generates a string representation of a callable object's
signature, including parameters and default values. It supports classes,
functions, and instance methods.
Parameters
----------
obj : callable
The callable object to format.
skip : str or list of str, optional
Parameter names to skip in the representation. Useful for omitting
certain attributes.
Returns
-------
representation : str
A string representation of the callable object's signature.
Raises
------
TypeError
If `obj` is not a callable object.
Examples
--------
>>> from geoprior.utils.sys_utils import represent_callable
>>> def example_function(a, b=2):
... pass
>>> represent_callable(example_function)
'example_function(a, b=2)'
>>> class ExampleClass:
... def __init__(self, x, y=10):
... self.x = x
... self.y = y
>>> represent_callable(ExampleClass)
'ExampleClass(x, y=10)'
>>> instance = ExampleClass(5)
>>> represent_callable(instance)
'ExampleClass(x=5, y=10)'
Notes
-----
This function is useful for logging or displaying the parameters of
callable objects in a readable format.
See Also
--------
inspect.signature : Get a signature object for the callable.
"""
if not callable(obj) and not hasattr(obj, "__dict__"):
raise TypeError(
f"Object '{obj}' is not callable or does not have attributes."
)
if isinstance(skip, str):
skip = [skip]
elif skip is None:
skip = []
else:
skip = list(skip)
obj_name = (
obj.__name__
if hasattr(obj, "__name__")
else obj.__class__.__name__
)
try:
sig = inspect.signature(obj)
params = [
f"{name}={repr(param.default)}"
if param.default is not inspect.Parameter.empty
else name
for name, param in sig.parameters.items()
if name not in skip
]
representation = f"{obj_name}({', '.join(params)})"
except (TypeError, ValueError):
# If obj is an instance, get its __dict__ attributes
attrs = {
k: v
for k, v in vars(obj).items()
if not k.startswith("_") and k not in skip
}
# Limit the number of attributes displayed
if len(attrs) > 6:
displayed_attrs = (
list(attrs.items())[:3]
+ [("...", "...")]
+ list(attrs.items())[-3:]
)
else:
displayed_attrs = attrs.items()
params = [
f"{k}={repr(v)}" for k, v in displayed_attrs
]
representation = f"{obj_name}({', '.join(params)})"
return representation
[docs]
def safe_getattr(
obj: Any, name: str, default_value: Any | None = None
) -> Any:
"""
Safely get an attribute from an object, with a helpful error message.
This function attempts to retrieve an attribute from the given object.
If the attribute is not found, it can return a default value or raise
an AttributeError with a suggestion for a similar attribute.
Parameters
----------
obj : object
The object from which to retrieve the attribute.
name : str
The name of the attribute to retrieve.
default_value : any, optional
A default value to return if the attribute is not found. If ``None``,
an ``AttributeError`` will be raised.
Returns
-------
value : any
The value of the retrieved attribute or the default value.
Raises
------
AttributeError
If the attribute is not found and no default value is provided.
Examples
--------
>>> from geoprior.utils.sys_utils import safe_getattr
>>> class MyClass:
... def __init__(self, a, b):
... self.a = a
... self.b = b
>>> obj = MyClass(1, 2)
>>> safe_getattr(obj, 'a')
1
>>> safe_getattr(obj, 'c', default_value='default')
'default'
>>> safe_getattr(obj, 'c')
Traceback (most recent call last):
...
AttributeError: 'MyClass' object has no attribute 'c'. Did you mean 'a'?
Notes
-----
This function enhances the built-in `getattr` by providing helpful
suggestions when an attribute is not found.
See Also
--------
getattr : Built-in function to get an attribute from an object.
"""
if hasattr(obj, name):
return getattr(obj, name)
if default_value is not None:
return default_value
# Attempt to find a similar attribute name
similar_attr = find_similar_string(
name, vars(obj), deep=True
)
suggestion = (
f". Did you mean '{similar_attr}'?"
if similar_attr
else ""
)
raise AttributeError(
f"'{obj.__class__.__name__}' object has no attribute '{name}'{suggestion}"
)
class _SafeOptimize:
def __init__(
self,
func: Callable | None = None,
*,
parallelize: bool = True,
memory_cleanup: bool = False,
log_level: int = logging.INFO,
optimize_cpu: bool = True,
num_processes: int | None = None,
cpu_cores: list[int] | None = None,
verbose: bool = True,
mode: str = "strict",
):
self.func = func
self.parallelize = parallelize
self.memory_cleanup = memory_cleanup
self.log_level = log_level
self.optimize_cpu = optimize_cpu
self.num_processes = num_processes
self.cpu_cores = cpu_cores
self.verbose = verbose
self.mode = mode
def __call__(self, *args, **kwargs):
if (
self.func is None
and len(args) == 1
and callable(args[0])
):
# Decorator used without arguments
self.func = args[0]
return self._wrap_function(self.func)
elif self.func and callable(self.func):
# Function is already set, execute it
return self._wrap_function(self.func)(
*args, **kwargs
)
else:
# Decorator used with arguments
def wrapper(func):
self.func = func
return self._wrap_function(func)
return wrapper
def _wrap_function(self, func):
@functools.wraps(func)
def wrapped_function(*args, **kwargs):
return self._execute_function(
func, *args, **kwargs
)
return wrapped_function
def _execute_function(self, func, *args, **kwargs):
# Set up logging based on the specified log level
logger.setLevel(self.log_level)
# Record the start time
start_time = time.time()
if self.verbose:
logger.info(
f"Starting workflow optimization for '{func.__name__}'..."
)
# Apply CPU optimization if requested
if self.optimize_cpu and self.cpu_cores:
try:
_reset_cpu_affinity(self.cpu_cores)
logger.info(
f"Optimized CPU usage, restricted to cores "
f"{self.cpu_cores}."
)
except Exception as e:
logger.error(f"CPU optimization failed: {e}")
if self.mode == "strict":
raise
elif self.mode == "soft":
logger.warning(
"Falling back to default CPU settings."
)
# Check if the function is picklable
if self.parallelize:
parallelize_flag = True
if not _is_picklable(func, self.mode):
if self.mode == "strict":
raise pickle.PicklingError(
f"Function '{func.__name__}' or its arguments are "
"not picklable."
)
elif self.mode == "soft":
logger.warning(
f"Function '{func.__name__}' or its arguments are "
"not picklable. Falling back to sequential execution."
)
parallelize_flag = False
else:
parallelize_flag = False
# Apply parallelization strategy if enabled
if parallelize_flag:
try:
results = _parallelize_flow(
func, self.num_processes, *args, **kwargs
)
except Exception as e:
logger.error(
f"Parallel execution failed: {e}"
)
if self.mode == "strict":
raise
elif self.mode == "soft":
logger.warning(
"Falling back to sequential execution."
)
results = func(*args, **kwargs)
else:
# Execute function normally if parallelization is disabled
results = func(*args, **kwargs)
# If memory cleanup is requested, clean up after execution
if self.memory_cleanup:
_clean_up_memory(verbose=self.verbose)
logger.info("Memory cleanup completed.")
# Log execution time
elapsed_time = time.time() - start_time
if self.verbose:
logger.info(
f"Workflow '{func.__name__}' completed in "
f"{elapsed_time:.4f} seconds."
)
return results
[docs]
def safe_optimize(
func: Callable | None = None,
*,
parallelize: bool = True,
memory_cleanup: bool = False,
log_level: int = logging.INFO,
optimize_cpu: bool = True,
num_processes: int | None = None,
cpu_cores: list[int] | None = None,
verbose: bool = True,
mode: str = "strict",
) -> Callable:
"""
Optimizes the workflow by wrapping a function to measure execution time,
enable parallelization, manage resources, and perform memory cleanup and
acts similary like class-based decorator `WorflowOptimizer`.
Class-based decorators can sometimes encounter issues when trying to pickle
certain objects, especially in parallel execution contexts. This issue arises
because certain objects (such as file handles, open network connections,
or non-serializable class instances) cannot be passed between processes
in multiprocessing environments. By ensuring compatibility with these
contexts, `safe_optimize` helps mitigate such issues and optimize the
execution of computationally intensive workflows.
This decorator is particularly suitable for workflows involving large-scale
computations, such as data processing pipelines, machine learning model training,
or simulations, where parallel execution and resource optimization are crucial
for performance improvement.
Parameters
----------
parallelize : bool, optional
Flag to enable or disable parallel processing (default is ``True``).
memory_cleanup : bool, optional
Whether to clean up system memory after execution (default is ``False``).
log_level : int, optional
Level of logging (default is ``logging.INFO``). Set to
``logging.DEBUG`` for more detailed logs.
optimize_cpu : bool, optional
Whether to optimize CPU core usage (default is ``True``).
num_processes : Optional[int], optional
The number of parallel processes for execution (default is ``None``).
cpu_cores : Optional[List[int]], optional
Specify a list of CPU cores to restrict the process (default is ``None``).
verbose : bool, optional
Whether to print detailed logs during execution (default is ``True``).
mode : str, optional
Mode for handling pickling issues: ``'strict'`` to raise errors,
or ``'soft'`` to fallback to sequential execution with warnings
(default is ``'strict'``).
Returns
-------
decorator : Callable
The wrapped function that includes optimization strategies.
Raises
------
ValueError
If an unsupported mode is specified.
Examples
--------
>>> from geoprior.utils.sys_utils import safe_optimize
>>> @safe_optimize(
... parallelize=True,
... memory_cleanup=True,
... log_level=logging.DEBUG,
... optimize_cpu=True,
... num_processes=4,
... cpu_cores=[0, 1, 2, 3],
... verbose=True,
... mode='soft'
... )
... def process_data(data):
... # Your data processing logic here
... return [d * 2 for d in data]
>>> data = [1, 2, 3, 4, 5]
>>> results = process_data(data)
>>> print(results)
[2, 4, 6, 8, 10]
Notes
-----
- This decorator uses multiprocessing for parallel execution, which may not
be suitable for all environments, especially those that do not support
forking (e.g., some Windows configurations).
- Ensure that the decorated function and its arguments are picklable
when using parallelization.
- The `mode` parameter allows handling non-picklable objects gracefully.
See Also
--------
multiprocessing.Pool : For parallel task execution.
psutil : For system and process utilities.
functools.wraps : For preserving metadata of decorated functions.
"""
return _SafeOptimize(
func=func,
parallelize=parallelize,
memory_cleanup=memory_cleanup,
log_level=log_level,
optimize_cpu=optimize_cpu,
num_processes=num_processes,
cpu_cores=cpu_cores,
verbose=verbose,
mode=mode,
)
def _is_picklable(func: Callable, mode: str) -> bool:
"""
Check whether the function and its arguments are picklable.
Parameters
----------
func : Callable
The function to check for picklability.
mode : str
The mode of operation: 'strict' or 'soft'.
Returns
-------
bool
Returns ``True`` if the function and its arguments are picklable,
``False`` otherwise.
Raises
------
PicklingError
If the function or its arguments are not picklable and mode is 'strict'.
"""
try:
# Attempt to pickle the function
pickle.dumps(func)
return True
except pickle.PicklingError as e:
if is_module_installed("cloudpickle"):
import cloudpickle
try:
cloudpickle.dumps(func)
return True
except Exception as e:
logger.error(
f"Function '{func.__name__}' is not picklable: {e}"
)
if mode == "strict":
raise pickle.PicklingError(
f"Function '{func.__name__}' is not picklable. {e}"
)
elif mode == "soft":
logger.warning(
f"Function '{func.__name__}' is not picklable. "
"Falling back to sequential execution."
)
return False
else:
if mode == "strict":
raise pickle.PicklingError(
f"Function '{func.__name__}' is not picklable: {e}"
)
elif mode == "soft":
logger.warning(
f"Function '{func.__name__}' is not picklable: {e}. "
"Parallelization will be skipped."
)
return False
except Exception as e:
logger.error(
f"Unexpected error during pickling check: {e}"
)
if mode == "strict":
raise
elif mode == "soft":
logger.warning(
"Falling back to sequential execution."
)
return False
def _parallelize_flow(
func: Callable,
num_processes: int | None,
*args,
**kwargs,
):
"""
Parallelize the execution of a function across multiple processes.
Parameters
----------
func : Callable
The function to execute in parallel.
num_processes : Optional[int]
The number of parallel processes to use. If ``None``, defaults to the
number of CPU cores.
*args : tuple
Positional arguments to pass to the function.
**kwargs : dict
Keyword arguments to pass to the function.
Returns
-------
list
A list of results from each parallel execution.
Raises
------
TypeError
If the 'data' keyword argument is not a list or tuple.
"""
if "data" in kwargs:
data = kwargs["data"]
elif args:
data = args[0]
else:
logger.info(
"No data provided for parallel processing."
)
return func(*args, **kwargs)
if not isinstance(data, list | tuple):
raise TypeError(
f"'data' parameter should be a list or tuple, got "
f"{type(data).__name__!r} instead."
)
if is_module_installed("joblib"):
from joblib import Parallel, delayed
try:
# Run the function in parallel using joblib
results = Parallel(n_jobs=num_processes)(
delayed(func)(item, *args[1:], **kwargs)
for item in data
)
except Exception as e:
logger.error(f"Parallel execution failed: {e}")
logger.warning(
"Falling back to sequential execution."
)
results = func(*args, **kwargs)
else:
import multiprocessing
num_processes_ = num_processes or min(
multiprocessing.cpu_count(), len(data)
)
logger.info(
f"Parallelizing with {num_processes_} processes."
)
# Use multiprocessing Pool to parallelize tasks
with multiprocessing.Pool(
processes=num_processes_
) as pool:
results = pool.map(func, data)
return results
def _reset_cpu_affinity(cpu_cores: list[int]):
"""
Restrict the process to specific CPU cores.
Parameters
----------
cpu_cores : List[int]
A list of CPU core indices to bind the process to.
Raises
------
psutil.Error
If setting CPU affinity fails.
"""
try:
p = psutil.Process()
p.cpu_affinity(
cpu_cores
) # Set the process CPU affinity
except psutil.Error as e:
logger.error(
f"Failed to set CPU affinity to cores {cpu_cores}: {e}"
)
raise
def _delete_temp_files(temp_dir: str, verbose: bool = True):
"""
Deletes temporary files or directories created during the workflow.
Parameters
----------
temp_dir : str
The path to the temporary directory to be deleted.
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Uses ``shutil.rmtree`` to remove directories and their contents.
- Does nothing if the specified directory does not exist.
"""
path = Path(temp_dir)
if path.exists() and path.is_dir():
shutil.rmtree(temp_dir)
if verbose:
print(f"Deleted temporary directory: {temp_dir}")
logger.debug(
f"Deleted temporary directory: {temp_dir}"
)
else:
if verbose:
print(
f"No temporary files found to delete in '{temp_dir}'."
)
logger.debug(
f"No temporary files found to delete in '{temp_dir}'."
)
def _clear_unused_variables(verbose: bool = True):
"""
Deletes unused variables to free up memory using garbage collection.
Parameters
----------
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Invokes Python's garbage collector to clean up unreferenced objects.
"""
gc.collect()
if verbose:
print(
"Cleared unused variables (Garbage Collection)."
)
logger.debug(
"Cleared unused variables (Garbage Collection)."
)
def _clear_system_memory(verbose: bool = True):
"""
Frees up system memory by performing garbage collection and printing
memory usage.
Parameters
----------
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Uses `psutil` to monitor memory usage before and after cleanup.
"""
process = psutil.Process(os.getpid())
memory_before = process.memory_info().rss / (
1024**2
) # Convert to MB
if verbose:
print(
f"Current memory usage before cleanup: {memory_before:.2f} MB"
)
logger.debug(
f"Current memory usage before cleanup: {memory_before:.2f} MB"
)
gc.collect()
memory_after = process.memory_info().rss / (
1024**2
) # Convert to MB
if verbose:
print(
f"Memory usage after cleanup: {memory_after:.2f} MB"
)
logger.debug(
f"Memory usage after cleanup: {memory_after:.2f} MB"
)
def _clear_cuda_cache(verbose: bool = True):
"""
Clears CUDA memory cache if using PyTorch with CUDA.
Parameters
----------
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Requires PyTorch to be installed and CUDA to be available.
"""
try:
import torch # For PyTorch GPU memory management
if torch.cuda.is_available():
torch.cuda.empty_cache()
if verbose:
print("Cleared CUDA cache (PyTorch).")
logger.debug("Cleared CUDA cache (PyTorch).")
except ImportError:
if verbose:
print(
"PyTorch is not installed; skipping CUDA cache clearing."
)
logger.debug(
"PyTorch is not installed; skipping CUDA cache clearing."
)
def _clear_tensorflow_cache(verbose: bool = True):
"""
Clears TensorFlow GPU memory cache if applicable.
Parameters
----------
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Requires TensorFlow to be installed.
"""
try:
import tensorflow as tf # For TensorFlow GPU memory management
gpus = tf.config.list_physical_devices("GPU")
if gpus:
try:
for gpu in gpus:
tf.config.experimental.reset_memory_growth(
gpu
)
tf.keras.backend.clear_session()
if verbose:
print("Cleared GPU memory (TensorFlow).")
logger.debug(
"Cleared GPU memory (TensorFlow)."
)
except RuntimeError as e:
logger.error(
f"Error clearing TensorFlow GPU memory: {e}"
)
except ImportError:
if verbose:
print(
"TensorFlow is not installed; skipping GPU cache clearing."
)
logger.debug(
"TensorFlow is not installed; skipping GPU cache clearing."
)
def _clean_up_memory(verbose: bool = True):
"""
Cleans up memory by clearing caches, releasing unused resources.
Parameters
----------
verbose : bool, optional
Whether to log the action. Default is ``True``.
Notes
-----
- Clears CUDA caches for PyTorch and TensorFlow if available.
- Performs garbage collection and system memory cleanup.
"""
logger.info("Starting memory cleanup...")
if is_module_installed("torch"):
# Clear CUDA memory if using PyTorch with CUDA
_clear_cuda_cache(verbose)
if is_module_installed("tensorflow"):
# Clear TensorFlow GPU memory if applicable
_clear_tensorflow_cache(verbose)
# Clear unused variables in the Python environment
_clear_unused_variables(verbose)
# Attempt to free system memory
_clear_system_memory(verbose)
logger.info("Memory cleanup complete.")