Source code for geoprior.utils.sys_utils

# License: Apache-2.0
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>

"""
System utilities module for managing system-level operations.

This module provides utilities essential for system-level tasks such as
color management, regular expression searching, and projection validation,
along with other miscellaneous system operations.
"""

import functools
import gc
import importlib.util
import inspect
import itertools
import logging
import multiprocessing
import os
import pickle
import platform
import re
import shutil
import socket
import subprocess
import tempfile
import time
from collections.abc import Callable, Sequence
from concurrent.futures import (
    ProcessPoolExecutor,
    ThreadPoolExecutor,
    as_completed,
)
from functools import partial
from pathlib import Path
from typing import (
    Any,
)

import numpy as np
import pandas as pd

from ..api.util import get_table_size
from ..backends.selector import check_processor
from ..core.checks import is_iterable
from ..logging import get_logger
from .deps_utils import (
    ensure_pkgs,
    import_optional_dependency,
    is_module_installed,
)

try:
    import psutil
except:
    pass
try:
    import torch

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False
try:
    import pyarrow  # noqa

    _HAVE_PYARROW = True
except ImportError:
    _HAVE_PYARROW = False

TW = get_table_size()

logger = get_logger(__name__)

__all__ = [
    "BatchDataFrameBuilder",
    "WorkflowOptimizer",
    "check_port_in_use",
    "clean_temp_files",
    "create_temp_dir",
    "create_temp_file",
    "environment_summary",
    "find_by_regex",
    "find_similar_string",
    "get_cpu_usage",
    "get_disk_usage",
    "get_gpu_info",
    "get_installed_packages",
    "get_memory_usage",
    "get_python_version",
    "get_system_info",
    "get_uptime",
    "is_gpu_available",
    "is_package_installed",
    "is_path_accessible",
    "is_port_open",
    "manage_env_variable",
    "manage_file_lock",
    "manage_temp",
    "parallelize_jobs",
    "represent_callable",
    "run_command",
    "safe_getattr",
    "safe_optimize",
    "system_uptime",
    "build_large_df",
]

try:
    import cudf
except ImportError:
    cudf = None


[docs] class BatchDataFrameBuilder: r""" Manages incremental construction of a large DataFrame in controlled-size chunks. This can reduce peak memory usage and allow GPU-accelerated libraries (e.g., ``cudf``) if they are available and desired. The approach can be expressed mathematically as a chunking process that partitions an incoming stream of :math:`N` row-dictionaries into :math:`k` subsets of size :math:`m\ (\text{<=}\ \text{chunk\_size})`: .. math:: k = \left\lceil \frac{N}{m} \right\rceil Each subset is converted into a DataFrame, stored, and released from memory, and then concatenated at finalization time. Parameters ---------- chunk_size : int, optional The maximum number of rows to hold in the internal buffer before converting them into a DataFrame chunk. Default is 100000. processor : {'auto', 'cpu', 'gpu'}, optional Controls the engine used to build the DataFrame: - ``'cpu'`` : Always use pandas. - ``'gpu'`` : Attempt to use cudf (raise an error if not available). - ``'auto'`` : Use cudf if a GPU is detected and cudf is installed; otherwise fallback to pandas. verbose : int, optional Verbosity level. Default is 1: - 0 : Silent. - 1 : Basic information. - 2 : Debug / detailed printing. Notes ----- This object is intended for situations where the total row count can be very large, potentially in the millions. By breaking data into chunks, you can avoid excessive memory usage and keep the system more responsive. If `processor` is ``'auto'`` or ``'gpu'``, the module calls ``check_processor`` to verify GPU availability, then uses cudf if appropriate. .. note:: If the total data is larger than your available memory (whether RAM or GPU), consider writing out each chunk to disk as a partitioned file (e.g., Parquet or Feather) instead of storing them all in memory. Examples -------- >>> from geoprior.utils.sys_utils import BatchDataFrameBuilder >>> # Suppose we have a large list of dictionaries >>> data = [ ... {'colA': i, 'colB': i**2} for i in range(10**6) ... ] >>> with BatchDataFrameBuilder(chunk_size=50000, ... processor='auto', ... verbose=2) as builder: ... builder.add_rows(data) ... >>> # After exiting the context, the final DataFrame is >>> # automatically built and stored in builder.final_df >>> final_df = builder.final_df >>> print(final_df.shape) (1000000, 2) See Also -------- pandas.DataFrame : Core pandas DataFrame object. cudf.DataFrame : GPU DataFrame object from RAPIDS. check_processor : Utility for detecting GPU availability. """
[docs] def __init__( self, chunk_size=100_000, processor="auto", verbose=1 ): """ Initializes the builder, setting up chunk size, processor preference, and verbosity. Checks for GPU availability if requested. """ self.chunk_size = chunk_size self.processor = processor self.verbose = verbose # This list will collect row dictionaries until we # reach the chunk_size. self._rows = [] # We'll store chunked DataFrames here for concatenation. self._dfs = [] # Decide whether to use GPU or CPU by calling # check_processor and verifying cudf if needed. self.gpu_enabled = False self._initialize_processor() # Will hold the final DataFrame once we exit context. self.final_df = None
[docs] def __enter__(self): """ Enters the context manager. Returns `self` so we can use it in a with-statement scope. """ return self
[docs] def add_row(self, row: dict): r""" Adds a single row to the internal buffer. This method appends the given dictionary `row` to the in-memory buffer. If the buffer reaches `self.chunk_size`, it is automatically flushed. Parameters ---------- row : dict A row in dictionary form, where keys correspond to column names and values represent the row data. Notes ----- Internally calls :meth:`_flush` once the buffer has reached its maximum size. """ self._rows.append(row) if len(self._rows) >= self.chunk_size: self._flush()
[docs] def add_rows(self, rows: list): r""" Adds multiple rows to the internal buffer. This method iterates over the list of dictionaries `rows`. For each element, :meth:`add_row` is called, which may trigger a flush if the buffer is full. Parameters ---------- rows : list of dict Each dictionary should have the same structure as a typical row in the final DataFrame. Notes ----- This method is merely a convenience layer over :meth:`add_row`. """ for row in rows: self.add_row(row)
[docs] def finalize(self): r""" Flushes remaining rows and concatenates all chunks. Once the remaining rows in `_rows` are processed into a chunk, this method concatenates all stored chunk DataFrames (either pandas or cudf) into one final DataFrame. The resulting DataFrame is returned. Returns ------- DataFrame The final DataFrame, which may be a pandas DataFrame or a cudf DataFrame (if `processor` is set to allow GPU usage and cudf is available). Notes ----- After concatenation, all chunk DataFrames are cleared from memory. This method is called automatically upon exiting the context (i.e., in :meth:`__exit__`). """ self._flush() if not self._dfs: if self.gpu_enabled: return cudf.DataFrame() return pd.DataFrame() if self.verbose > 0: print( f"[BatchDataFrameBuilder] Concatenating " f"{len(self._dfs)} chunk(s)." ) if self.gpu_enabled: final_df = cudf.concat( self._dfs, ignore_index=True ) else: final_df = pd.concat(self._dfs, ignore_index=True) self._dfs.clear() return final_df
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """ Exits the context manager. Automatically finalizes the DataFrame by calling :meth:`finalize`, storing the result in `self.final_df`. """ self.final_df = self.finalize()
def _initialize_processor(self): """ Internal helper to set up GPU or CPU usage according to `self.processor`. Uses the `check_processor` function and verifies the presence of cudf if GPU is requested. """ # from typing import Literal # from subprocess import PIPE, CalledProcessError, run # from pathlib import Path if self.processor == "cpu": if self.verbose > 0: print( "[BatchDataFrameBuilder] Using CPU (pandas)." ) return if self.processor == "gpu": try: gpu_ok = check_processor( authorized="gpu", error="raise", verbose=self.verbose, ) if not cudf: raise ImportError( "cudf is not installed, but 'gpu' " "processor was requested." ) if gpu_ok: self.gpu_enabled = True if self.verbose > 0: print( "[BatchDataFrameBuilder] " "Using GPU (cudf)." ) except (RuntimeError, ImportError) as e: raise RuntimeError( f"GPU required but not available: {e}" ) return # 'auto' gpu_ok = False try: gpu_ok = check_processor( authorized="auto", error="warn", verbose=self.verbose, ) except Exception as e: if self.verbose > 0: print( "[BatchDataFrameBuilder] GPU check failed," f" defaulting to CPU: {e}" ) if gpu_ok and cudf: self.gpu_enabled = True if self.verbose > 0: print( "[BatchDataFrameBuilder] GPU detected," " using cudf." ) else: if self.verbose > 0: print( "[BatchDataFrameBuilder] No GPU or cudf " "not installed, using CPU (pandas)." ) def _flush(self): """ Converts the current buffer to a DataFrame and stores it, then resets the buffer. This is triggered automatically by :meth:`add_row` or :meth:`finalize`. """ if not self._rows: return if self.verbose > 1: print( f"[BatchDataFrameBuilder] Flushing " f"{len(self._rows)} rows to DataFrame." ) if self.gpu_enabled: chunk_df = cudf.DataFrame(self._rows) else: chunk_df = pd.DataFrame(self._rows) self._dfs.append(chunk_df) self._rows = []
[docs] class WorkflowOptimizer: """ WorkflowOptimizer is a decorator class designed to optimize the execution of computationally intensive functions by enabling parallelization, managing CPU and memory resources, and performing cleanup tasks. It provides flexibility through various parameters that allow users to customize optimization strategies according to their workflow requirements. .. math:: T_{\text{total}} = T_{\text{start}} + T_{\text{execution}} + T_{\text{cleanup}} Here, :math:`T_{\text{total}}` is the total workflow time, :math:`T_{\text{start}}` is the initialization time, :math:`T_{\text{execution}}` is the main execution time, and :math:`T_{\text{cleanup}}` is the cleanup time. Parameters ---------- parallelize : bool, optional Flag to enable or disable parallel processing. If set to ``True``, the decorator will attempt to parallelize the execution of the decorated function using multiprocessing. Default is ``True``. memory_cleanup : bool, optional Whether to clean up system memory after the execution of the decorated function. This includes triggering garbage collection and clearing GPU caches if applicable. Default is ``False``. log_level : int, optional Level of logging verbosity. Accepts standard logging levels such as ``logging.INFO``, ``logging.DEBUG``, etc. Default is ``logging.INFO``. optimize_cpu : bool, optional Whether to optimize CPU usage by setting CPU affinity to restrict the process to specific CPU cores. If ``True``, the decorator will bind the process to the cores specified in ``cpu_cores``. Default is ``True``. num_processes : int, optional The number of parallel processes to use when ``parallelize`` is enabled. If not specified, it defaults to the minimum of the number of available CPU cores and the length of the ``data`` iterable passed to the function. Default is ``None``. cpu_cores : list or None, optional A list of specific CPU cores to bind the process to for optimized CPU usage. If ``None``, the process is allowed to run on all available CPU cores. Example: ``[0, 1, 2, 3]``. Default is ``None``. verbose : bool, optional Whether to print detailed logs during execution. If set to ``False``, only essential information will be logged based on the ``log_level``. Default is ``True``. Examples -------- >>> from geoprior.utils.sys_utils import WorkflowOptimizer >>> import time >>> >>> @WorkflowOptimizer( ... parallelize=True, ... memory_cleanup=True, ... log_level=logging.DEBUG, ... num_processes=4, ... cpu_cores=[0, 1, 2, 3], ... verbose=True ... ) >>> def process_data(data_chunk): ... '''Simulate a time-consuming data processing function.''' ... time.sleep(1) # Simulate a time-consuming task ... return f"Processed {data_chunk}" ... >>> data_chunks = ['chunk1', 'chunk2', 'chunk3', 'chunk4'] >>> results = process_data(data=data_chunks) >>> print(results) ['Processed chunk1', 'Processed chunk2', 'Processed chunk3', 'Processed chunk4'] Notes ----- The decorator checks for the presence of a ``data`` keyword argument to decide whether parallelization should be applied. When ``parallelize=True``, the decorated function should be compatible with multiprocessing, meaning it should be picklable. Memory cleanup can be useful in long-running workflows, and CPU affinity may improve performance by reducing context switching and cache misses. Logging behavior follows the standard Python logging model. See Also -------- multiprocessing.Pool: Provides a pool of worker processes. psutil.Process: Allows manipulation of system processes. """
[docs] def __init__( self, parallelize: bool = True, memory_cleanup: bool = False, log_level: int = logging.INFO, optimize_cpu: bool = True, num_processes: int | None = None, cpu_cores: list[int] | None = None, verbose: bool = True, ): self.parallelize = parallelize self.memory_cleanup = memory_cleanup self.log_level = log_level self.optimize_cpu = optimize_cpu self.num_processes = num_processes self.cpu_cores = cpu_cores self.verbose = verbose
[docs] def __call__(self, func): """ Makes the class instance callable so it can be used as a decorator. Parameters ---------- func : function The function to be decorated and optimized. Returns ------- wrapper : function The wrapped function with optimization strategies applied. """ @functools.wraps(func) def wrapper(*args, **kwargs): # Set up logging based on the specified log level logger.setLevel(self.log_level) # Record the start time start_time = time.time() if self.verbose: logger.info( f"Starting workflow optimization for `{func.__name__}`..." ) # Apply CPU optimization if requested if self.optimize_cpu and self.cpu_cores: self._reset_cpu_affinity() logger.info( f"Optimizing CPU usage, restricted to cores {self.cpu_cores}" ) # Apply parallelization strategy if enabled if self.parallelize: if "data" in kwargs: data = kwargs["data"] num_processes = self.num_processes or min( multiprocessing.cpu_count(), len(data) ) logger.info( f"Parallelizing with {num_processes} processes." ) # Ensure that the function is picklable for multiprocessing if not hasattr(func, "__name__"): raise ValueError( f"Function {func} is not picklable. Ensure it" " is a valid function for multiprocessing." ) # Use multiprocessing Pool to parallelize tasks with multiprocessing.Pool( processes=num_processes ) as pool: try: # Apply function to each data chunk results = pool.map(func, data) except Exception as e: logger.error( f"Error during parallel execution: {e}" ) results = None else: logger.info( "No parallel data found, executing normally." ) results = func(*args, **kwargs) # If memory cleanup is requested, clean up after execution if self.memory_cleanup: self._clean_up_memory() logger.info("Memory cleanup completed.") # Log execution time elapsed_time = time.time() - start_time if self.verbose: logger.info( f"Workflow completed in {elapsed_time:.4f} seconds." ) return results return wrapper
@ensure_pkgs( ["psutil"], extra="Sets the CPU affinity of the current process requires the `" " `psutil to be installed.", auto_install=True, ) def _reset_cpu_affinity(self): """ Sets the CPU affinity of the current process to the specified CPU cores. If no specific cores are provided, it resets affinity to use all available CPUs. """ import psutil process = psutil.Process(os.getpid()) if self.cpu_cores: process.cpu_affinity(self.cpu_cores) if self.verbose: logger.debug( f"Set CPU affinity to cores: {self.cpu_cores}" ) else: process.cpu_affinity( range(multiprocessing.cpu_count()) ) if self.verbose: logger.debug( "Reset CPU affinity to all available cores." ) def _clean_up_memory(self, temp_dir=None): """Cleans up memory by clearing caches, releasing unused resources, and deleting temporary files if a temporary directory is specified.""" logger.info("Starting memory cleanup...") _clean_up_memory(self.verbose)
[docs] @ensure_pkgs( ["psutil"], extra="`psutil` package is required for managing large dataset.", auto_install=True, ) def build_large_df( forecast_results: list[dict], dt_col: str, tname: str, spatial_cols: list[str] | None = None, chunk_size: int | None = None, verbose: int = 0, ) -> pd.DataFrame: """ Construct memory-optimized DataFrame from large forecast results using chunked processing. Implements dynamic chunk sizing and dtype optimization to handle datasets exceeding available memory. Uses temporary storage and parallel processing for efficient resource utilization. If pyarrow is installed, the function uses parquet I/O; otherwise, CSV files are used as a fallback. Parameters ---------- forecast_results : List[Dict] Input data as list of dictionary records. Each dictionary represents a row with column-value pairs. Minimum 1000 entries recommended for chunking benefits. dt_col : str Name of temporal column. Accepts numeric years (e.g., ``2023``) or datetime strings. Automatic type detection with fallback to :class:`numpy.int32` for years >200000. tname : str Target variable prefix for prediction columns. Quantile columns are expected in the form ``f"{tname}_q{quantile}"`` such as ``"subs_q10"``, while point predictions use ``f"{tname}_pred"``. spatial_cols : List[str], optional Geographic columns (e.g., ``['longitude', 'latitude']``). Auto-detects categorical ( <10% unique values) vs continuous spatial data, using :class:`pandas.Category` or :class:`numpy.float32` dtypes respectively. chunk_size : int, optional Maximum rows per chunk. Auto-calculated using .. math:: C_{optimal} = \\min\\left(10^5, \\frac{0.8M_{free}}{S_{row}}\\right) where :math:`M_{free}` is available memory in bytes and :math:`S_{row}` is the estimated row size, assumed to be about 1 KB by default. verbose : int, default=0 Logging verbosity. Use ``0`` for silent mode, ``1`` for memory reports, ``2`` for chunk diagnostics, and ``3`` for per-chunk metrics. Returns ------- pd.DataFrame Combined DataFrame with optimized dtypes, preserving original column order. Memory footprint reduced by 40-60% compared to naive construction. Examples -------- >>> from geoprior.utils.sys_utils import build_large_df >>> import numpy as np # Basic usage with 1M rows >>> data = [{'year': y, 'value_q50': np.random.randn()} ... for y in range(2010, 2020) for _ in range(100000)] >>> df = build_large_df(data, dt_col='year', tname='value') # With spatial columns >>> geo_data = [{'lat': np.random.uniform(-90, 90), ... 'lon': np.random.uniform(-180, 180), ... 'pred': np.random.randn()} ... for _ in range(500000)] >>> df = build_large_df(geo_data, dt_col='date', tname='pred', ... spatial_cols=['lat', 'lon'], verbose=2) Notes ----- Key implementation features include dynamic chunk adjustment using :func:`psutil.virtual_memory`, concurrent chunk reading with :class:`ThreadPoolExecutor` when many chunks are present, dtype inference for temporal and spatial columns, and guaranteed tempfile cleanup via ``try...finally`` blocks. See Also -------- pd.DataFrame : Base DataFrame construction pd.concat : Chunk aggregation method geoprior.nn.utils.generate_forecast : Primary data source geoprior.utils.memory_optimizer.reduce_mem_usage : Detailed dtype optimization """ def _get_optimal_chunk_size(num_rows: int) -> int: """(No doc; private)""" if chunk_size is not None: return chunk_size mem = psutil.virtual_memory() free_mem = mem.available / (1024**3) # GB total_mem = mem.total / (1024**3) # Estimate memory usage (~1 KB per row) row_size = 1024 safe_chunk = int( (free_mem * 0.8 * 1024**3) / row_size ) default_chunk = 100_000 if total_mem > 32 else 50_000 return min(default_chunk, safe_chunk, num_rows) def _optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame: """(No doc; private)""" # Handle dt_col if pd.api.types.is_numeric_dtype(df[dt_col]): if df[dt_col].max() > 2e5: df[dt_col] = df[dt_col].astype(np.int32) else: try: df[dt_col] = pd.to_datetime(df[dt_col]) except Exception as e: if verbose >= 2: print( f"Could not convert {dt_col} to " f"datetime: {str(e)}" ) # Handle prediction columns pred_cols = [ c for c in df.columns if c.startswith(f"{tname}_q") or c.endswith("_pred") ] for col in pred_cols: df[col] = df[col].astype(np.float32) # Handle spatial columns if spatial_cols: for col in spatial_cols: if col in df.columns: nunique = df[col].nunique() ratio = nunique / len(df) if ratio < 0.1: df[col] = df[col].astype("category") else: df[col] = df[col].astype(np.float32) return df def _process_chunk( chunk: list[dict], temp_dir: str, chunk_number: int ) -> str | None: """(No doc; private)""" try: chunk_df = pd.DataFrame(chunk) chunk_df = _optimize_dtypes(chunk_df) if _HAVE_PYARROW: # Parquet I/O chunk_path = os.path.join( temp_dir, f"chunk_{chunk_number}.parquet" ) chunk_df.to_parquet( chunk_path, engine="pyarrow" ) else: # CSV fallback chunk_path = os.path.join( temp_dir, f"chunk_{chunk_number}.csv" ) chunk_df.to_csv(chunk_path, index=False) if verbose >= 3: mem = psutil.virtual_memory() used_gb = mem.used / (1024**3) total_gb = mem.total / (1024**3) print( f"Chunk {chunk_number} processed " f"({len(chunk_df)} rows) | " f"Memory used: {used_gb:.1f}GB / " f"{total_gb:.1f}GB" ) return chunk_path except Exception as e: if verbose >= 1: print( f"Error processing chunk {chunk_number}: " f"{str(e)}" ) return None # Main logic # Early exit if empty if not forecast_results: if verbose >= 1: print("No forecast results to process.") return pd.DataFrame() chunk_size_final = _get_optimal_chunk_size( len(forecast_results) ) if verbose >= 2: print( f"Processing {len(forecast_results):,} rows in " f"chunks of {chunk_size_final:,}" ) temp_dir = tempfile.mkdtemp() chunk_paths = [] current_chunk = [] try: # Build chunks for _idx, entry in enumerate(forecast_results): current_chunk.append(entry) if len(current_chunk) >= chunk_size_final: chunk_num = len(chunk_paths) path = _process_chunk( current_chunk, temp_dir, chunk_num ) if path: chunk_paths.append(path) current_chunk = [] # Check memory mem = psutil.virtual_memory() if mem.percent > 90: chunk_size_final = max( 1000, int(chunk_size_final * 0.8) ) if verbose >= 2: print( "Reducing chunk size to " f"{chunk_size_final} due to " "memory pressure." ) # Process remainder if current_chunk: chunk_num = len(chunk_paths) path = _process_chunk( current_chunk, temp_dir, chunk_num ) if path: chunk_paths.append(path) # If no chunks written, return empty if not chunk_paths: return pd.DataFrame() # Combine chunks if len(chunk_paths) > 5 and os.cpu_count() > 1: if _HAVE_PYARROW: read_fn = partial( pd.read_parquet, engine="pyarrow" ) else: read_fn = partial(pd.read_csv) with ThreadPoolExecutor( max_workers=os.cpu_count() ) as executor: chunks = list( executor.map(read_fn, chunk_paths) ) else: if _HAVE_PYARROW: chunks = [ pd.read_parquet(p, engine="pyarrow") for p in chunk_paths ] else: chunks = [pd.read_csv(p) for p in chunk_paths] forecast_df = pd.concat(chunks, ignore_index=True) finally: # Cleanup for path in chunk_paths: try: if os.path.exists(path): os.remove(path) except Exception as e: if verbose >= 2: print(f"Error deleting {path}: {str(e)}") try: os.rmdir(temp_dir) except OSError: pass # Final optimization forecast_df = _optimize_dtypes(forecast_df) if verbose >= 1: mem_usage = forecast_df.memory_usage( deep=True ).sum() / (1024**2) print(f"Final DataFrame size: {mem_usage:.2f} MB") return forecast_df
[docs] @ensure_pkgs( ["psutil"], extra="`get_cpu_usage` requires the `psutil` " "package for system resource monitoring", auto_install=True, ) def get_cpu_usage(per_cpu: bool = False) -> float | None: """ Returns the current CPU usage as a percentage, optionally providing per-core usage for systems with multiple cores. Parameters ---------- per_cpu : bool, default=False If True, returns a list with the CPU usage percentage for each core. If False, returns the overall CPU usage as a single percentage. Returns ------- usage : float or list of float, optional If `per_cpu` is False, returns the overall CPU usage as a float percentage. If `per_cpu` is True, returns a list with each entry corresponding to the usage percentage of an individual core. Notes ----- This function uses the `psutil` library to retrieve CPU usage information and requires an interval of 1 second to calculate the usage accurately. Examples -------- >>> from geoprior.utils.sys_utils import get_cpu_usage >>> get_cpu_usage() 1.3 >>> get_cpu_usage(per_cpu=True) [20.4, 25.1, 21.3, 24.5] """ try: usage = psutil.cpu_percent(interval=1, percpu=per_cpu) logger.debug(f"CPU usage retrieved: {usage}") return usage except Exception as e: logger.error(f"Failed to retrieve CPU usage: {e}") return None
[docs] @ensure_pkgs( ["psutil"], extra="`get_memory_usage` requires the `psutil`" " package for system memory usage", auto_install=True, ) def get_memory_usage() -> tuple[float, float, float] | None: """ Retrieves system memory usage statistics, providing the total, used, and available memory in megabytes (MB). Returns ------- memory : tuple of float A tuple containing: - `total_memory` : Total memory in MB. - `used_memory` : Used memory in MB. - `available_memory` : Available memory in MB. Notes ----- This function leverages the `psutil` library for retrieving memory usage information. The conversion to MB is performed by dividing each value by 1024^2. Examples -------- >>> from geoprior.utils.sys_utils import get_memory_usage >>> total, used, available = get_memory_usage() >>> print(f"Total: {total} MB, Used: {used} MB, Available: {available} MB") Total: 8192 MB, Used: 4096 MB, Available: 4096 MB """ try: mem = psutil.virtual_memory() total_memory = mem.total / (1024**2) # Convert to MB used_memory = mem.used / (1024**2) # Convert to MB available_memory = mem.available / ( 1024**2 ) # Convert to MB logger.debug( f"Memory usage retrieved: Total: {total_memory} MB," " Used: {used_memory} MB, Available: {available_memory} MB" ) memory_infos = { "Total": f"{total_memory} MB", "Used": f"{used_memory} MB", "Available": f"{available_memory} MB", } return memory_infos except Exception as e: logger.error(f"Failed to retrieve memory usage: {e}") return None
[docs] @ensure_pkgs( ["psutil"], extra="`get_disk_usage` requires the `psutil`" " package for disk usage statistics ", auto_install=True, ) def get_disk_usage( path: str = "/", ) -> tuple[float, float, float] | None: """ Returns disk usage statistics for a specified filesystem path, including total, used, and free disk space in gigabytes (GB). Parameters ---------- path : str, default='/' The filesystem path for which to check disk usage statistics. By default, it uses the root directory (`/`). Returns ------- disk_usage : tuple of float, optional A tuple containing: - `total_disk` : Total disk space in GB. - `used_disk` : Used disk space in GB. - `free_disk` : Free disk space in GB. Notes ----- Disk usage information is gathered using the `psutil` library. Disk space is converted to gigabytes (GB) by dividing the values by 1024^3. Raises ------ FileNotFoundError If the specified path does not exist on the filesystem. PermissionError If the program does not have permission to access the specified path. Examples -------- >>> from geoprior.utils.sys_utils import get_disk_usage >>> total, used, free = get_disk_usage(path="/") >>> print(f"Total: {total} GB, Used: {used} GB, Free: {free} GB") Total: 256 GB, Used: 128 GB, Free: 128 GB """ try: usage = psutil.disk_usage(path) total_disk = usage.total / (1024**3) # Convert to GB used_disk = usage.used / (1024**3) # Convert to GB free_disk = usage.free / (1024**3) # Convert to GB logger.debug( f"Disk usage for path '{path}': Total: {total_disk} GB," " Used: {used_disk} GB, Free: {free_disk} GB" ) disk_usage_infos = { "Total": f"{total_disk} GB", "Used": f"{used_disk} GB", "Free": f"{free_disk} GB", } return disk_usage_infos except FileNotFoundError: logger.error(f"Path not found: {path}") return None except PermissionError: logger.error(f"Permission denied for path: {path}") return None except Exception as e: logger.error( f"Failed to retrieve disk usage for path '{path}': {e}" ) return None
[docs] def is_gpu_available() -> bool: """ Checks if a GPU is available for computation on the system, using the PyTorch library if it is installed. Returns ------- available : bool True if a GPU is available, False otherwise. Notes ----- This function relies on the `torch` library (PyTorch) to detect GPU availability. If PyTorch is not installed, it logs a warning and returns False. Raises ------ ImportError If PyTorch is not installed and thus the GPU availability check cannot be performed. Examples -------- >>> from geoprior.utils.sys_utils import is_gpu_available >>> is_gpu_available() True """ try: import torch gpu_available = torch.cuda.is_available() logger.debug( f"GPU availability check: {gpu_available}" ) return gpu_available except ImportError: logger.warning( "PyTorch is not installed. GPU availability check cannot be performed." ) return False except Exception as e: logger.error(f"Failed to check GPU availability: {e}") return False
[docs] @ensure_pkgs( "torch", "torch library is required for retrieving" " detailed information about available GPUs.", ) def get_gpu_info() -> dict[str, str] | None: """ Provides detailed information about available GPUs, including device name, memory capacity, and CUDA version (if PyTorch is installed). Returns ------- gpu_info : dict or None Dictionary containing GPU details, including: - `device_count` : Number of available GPU devices. - `device_name` : Name of the first GPU device. - `memory_total` : Total memory of the first GPU device in GB. - `cuda_version` : CUDA version, if available. If no GPU is available or PyTorch is not installed, returns None. Notes ----- This function requires PyTorch to check for GPU availability. If PyTorch is not installed, it logs a warning and returns None. Raises ------ ImportError If PyTorch is not installed on the system. RuntimeError If there is an issue retrieving GPU properties. Examples -------- >>> from geoprior.utils.sys_utils import get_gpu_info >>> gpu_info = get_gpu_info() >>> print(gpu_info) {'device_count': '1', 'device_name': 'NVIDIA Tesla T4', 'memory_total': '15.99 GB', 'cuda_version': '11.1'} """ if not TORCH_AVAILABLE: logger.warning( "PyTorch not installed; detailed GPU information is unavailable." ) return None if not torch.cuda.is_available(): logger.info("No GPU is available on this system.") return None try: gpu_info = { "device_count": str(torch.cuda.device_count()), "device_name": torch.cuda.get_device_name(0), "memory_total": ( f"{torch.cuda.get_device_properties(0).total_memory / (1024**3):.2f} GB" ), "cuda_version": torch.version.cuda or "N/A", } logger.debug(f"GPU information: {gpu_info}") return gpu_info except Exception as e: logger.error(f"Error retrieving GPU information: {e}") return None
[docs] def system_uptime() -> str: """ Retrieves the system uptime, which is the duration the system has been running since the last boot, in a human-readable format. Returns ------- uptime : str System uptime in the format "Xd:Yh:Zm:Ws", where X, Y, Z, and W represent days, hours, minutes, and seconds, respectively. Notes ----- This function is cross-platform and works on Windows, macOS (Darwin), and Linux. It uses different commands to retrieve uptime based on the operating system. Raises ------ NotImplementedError If the function is called on an unsupported operating system. RuntimeError If an error occurs while retrieving uptime. Examples -------- >>> from geoprior.utils.sys_utils import system_uptime >>> system_uptime() '2d:10h:33m:12s' """ try: uptime_seconds = None if platform.system() == "Windows": uptime_seconds = int( subprocess.check_output( "net stats srv", shell=True ) .decode() .split("since")[1] .strip() .split()[0] ) elif platform.system() == "Linux": uptime_seconds = int( float( open("/proc/uptime").readline().split()[0] ) ) elif platform.system() == "Darwin": uptime_seconds = int( subprocess.check_output( "sysctl -n kern.boottime", shell=True ) .decode() .split(",")[0] .split(" ")[4] ) if uptime_seconds is not None: days, remainder = divmod(uptime_seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) uptime_str = ( f"{days}d:{hours}h:{minutes}m:{seconds}s" ) logger.debug(f"System uptime: {uptime_str}") return uptime_str else: raise NotImplementedError( "Unsupported operating system." ) except Exception as e: logger.error(f"Failed to retrieve system uptime: {e}") return "N/A"
[docs] def is_port_open(port: int) -> bool: """ Checks if a specified network port is open or occupied on the local machine. Parameters ---------- port : int The port number to check for availability. Returns ------- bool Returns True if the port is open (not in use), otherwise False. Notes ----- This function uses a socket connection to check if the specified port is open. It is helpful in applications where network services or applications need to bind to a specific port. Raises ------ ValueError If an invalid port number is provided. Examples -------- >>> from geoprior.utils.sys_utils import is_port_open >>> is_port_open(8080) False """ with socket.socket( socket.AF_INET, socket.SOCK_STREAM ) as s: result = s.connect_ex(("localhost", port)) is_open = result != 0 status = "open" if is_open else "occupied" logger.debug(f"Port {port} is {status}.") return is_open
[docs] @ensure_pkgs( ["psutil"], extra="`get_disk_usage` requires the `psutil`" " package for summarizing of the current environment ", auto_install=True, ) def environment_summary() -> dict[str, str]: """ Provides a summary of the current computing environment, including information on Python version, OS, CPU, memory, available GPU(s), and a list of installed Python packages. Returns ------- env_info : dict Dictionary containing environment details, including: - `python_version` : The version of Python in use. - `os` : Operating system name. - `os_version` : Version of the operating system. - `cpu_count` : Number of logical CPU cores. - `memory` : Total system memory in GB. - `device_count`, `device_name`, `memory_total`, `cuda_version` (if available) : GPU details from `detailed_gpu_info`. - `installed_packages` : List of installed Python packages (first 10) in `name==version` format. Notes ----- The function attempts to load installed packages using `pkg_resources`. If `pkg_resources` is not available, it defaults to "N/A" for installed packages. Raises ------ ImportError If `pkg_resources` is not installed. RuntimeError If an error occurs while gathering environment information. Examples -------- >>> from geoprior.utils.sys_utils import environment_summary >>> env_info = environment_summary() >>> print(env_info) {'python_version': '3.9.5', 'os': 'Linux', 'os_version': '5.4.0-80-generic', 'cpu_count': '4', 'memory': '15.5 GB', 'device_count': '1', 'device_name': 'NVIDIA Tesla T4', 'memory_total': '15.99 GB', 'cuda_version': '11.1', 'installed_packages': 'numpy==1.21.0, pandas==1.3.0, ...'} """ env_info = { "python_version": platform.python_version(), "os": platform.system(), "os_version": platform.version(), "cpu_count": str(psutil.cpu_count(logical=True)), "memory": f"{psutil.virtual_memory().total / (1024**3):.2f} GB", } # GPU information if available gpu_info = get_gpu_info() if gpu_info: env_info.update(gpu_info) try: # List installed packages if possible import pkg_resources installed_packages = [ f"{d.project_name}=={d.version}" for d in pkg_resources.working_set ] env_info["installed_packages"] = ( ", ".join(installed_packages[:10]) + "..." if len(installed_packages) > 10 else ", ".join(installed_packages) ) except ImportError: env_info["installed_packages"] = "N/A" logger.debug(f"Environment summary: {env_info}") return env_info
[docs] def manage_env_variable( var_name: str, value: str | None = None, default: str | None = None, action: str = "get", file_path: str | None = None, overwrite: bool = False, ) -> str | None: """ Manages environment variables, allowing retrieval, setting, or loading from a `.env` file. Parameters ---------- var_name : str The name of the environment variable to retrieve, set, or load. value : str, optional The value to set for the environment variable. Only used if `action` is `"set"`. Default is None. default : str, optional The default value to return if the environment variable `var_name` is not found when `action` is `"get"`. If None, returns None when the variable is not found. Default is None. action : str, default="get" The action to perform. Options are: - `"get"`: Retrieves the environment variable `var_name`. - `"set"`: Sets the environment variable `var_name` to `value`. - `"load"`: Loads environment variables from a `.env` file specified by `file_path`. file_path : str, optional The path to the `.env` file to load variables from when `action` is `"load"`. Required if `action` is `"load"`. overwrite : bool, default=False If True, allows overwriting existing environment variables when `action` is `"load"` or `"set"`. If False, preserves the current value of existing environment variables. Returns ------- result : str or None - If `action` is `"get"`, returns the value of the environment variable `var_name` or `default` if the variable is not set. - If `action` is `"set"` or `"load"`, returns None. Notes ----- - This function is useful for managing configuration data securely by utilizing environment variables. - Loading from a `.env` file allows you to define multiple variables in a single file, each defined in the `KEY=VALUE` format. Raises ------ ValueError If `action` is `"set"` and `value` is not provided, or if `action` is `"load"` and `file_path` is not specified. FileNotFoundError If `action` is `"load"` and `file_path` does not exist. Examples -------- >>> from geoprior.utils.sys_utils import manage_env_variable >>> manage_env_variable('HOME', action='get') '/home/username' >>> manage_env_variable('NEW_VAR', value='new_value', action='set') >>> manage_env_variable('NEW_VAR', action='get') 'new_value' >>> manage_env_variable('NON_EXISTENT_VAR', default='default_value', action='get') 'default_value' >>> manage_env_variable( var_name='', action='load', file_path='/path/to/.env', overwrite=True) See Also -------- os.getenv : Retrieves environment variables. os.environ : Provides access to the environment variables. """ if action == "get": # Get environment variable or return default if not found return os.getenv(var_name, default) elif action == "set": if value is None: raise ValueError( "A value must be provided when action='set'." ) if overwrite or var_name not in os.environ: os.environ[var_name] = value elif action == "load": if file_path is None: raise ValueError( "file_path must be specified when action='load'." ) if not os.path.isfile(file_path): raise FileNotFoundError( f"The file '{file_path}' was not found." ) with open(file_path) as file: for line in file: line = line.strip() if not line or line.startswith("#"): continue # Skip comments and empty lines if "=" not in line: raise ValueError( f"Invalid line in .env file: '{line}'" ) file_var_name, file_value = map( str.strip, line.split("=", 1) ) if ( overwrite or file_var_name not in os.environ ): os.environ[file_var_name] = file_value else: raise ValueError( f"Invalid action '{action}'. Expected 'get', 'set', or 'load'." )
[docs] def is_path_accessible( path: str, permissions: str = "r" ) -> bool: """ Checks if a specified path is accessible with the given permissions. Parameters ---------- path : str The path to check for accessibility. permissions : str, optional The permission types to check: `'r'` for read, `'w'` for write, `'x'` for execute. Multiple permissions can be specified, e.g., `"rw"`. Default is `"r"`. Returns ------- accessible : bool True if the path is accessible with the specified permissions, otherwise False. Notes ----- This function verifies file permissions in the current user context, ensuring flexibility for multi-user environments. Examples -------- >>> from geoprior.utils.sys_utils import is_path_accessible >>> is_path_accessible("/path/to/file", permissions="rw") True """ if not os.path.exists(path): return False permission_checks = { "r": os.R_OK, "w": os.W_OK, "x": os.X_OK, } # Validate the permissions argument for perm in permissions: if perm not in permission_checks: raise ValueError( f"Invalid permission '{perm}'. Use 'r', 'w', or 'x'." ) # Check each specified permission return all( os.access(path, permission_checks[perm]) for perm in permissions )
[docs] @ensure_pkgs( names="fcntl", extra="`fcntl` is required for file locking/unlocking in Unix-based systems.", auto_install=False, dist_names="fcntl", # `fcntl` is standard on Unix;let the user know if unavailable. # so we set infer_dist_name to False infer_dist_name=False, ) def manage_file_lock( file_path: str, action: str = "lock", blocking: bool = True, exclusive: bool = True, ) -> int | None: """ Manages file locking and unlocking to prevent concurrent access. This function allows both locking and unlocking actions on a file to prevent or allow concurrent access. It opens the file and applies an exclusive lock or shared lock, depending on the parameters specified. Parameters ---------- file_path : str Path to the file that needs to be locked or unlocked. action : str, default="lock" Specifies the action to perform: `"lock"` to acquire a lock, or `"unlock"` to release a previously acquired lock. blocking : bool, default=True If True, the lock will block until it can be acquired. If False, the lock will raise an exception if it cannot be acquired immediately. exclusive : bool, default=True If True, an exclusive lock is applied. If False, a shared lock is applied (other processes can read the file simultaneously). Returns ------- file_descriptor : int or None If `action` is `"lock"`, returns the file descriptor on success; otherwise, None if `action` is `"unlock"` or if locking fails. Notes ----- This function uses the `fcntl` module for locking, which is only available on Unix-based systems. The lock is maintained as long as the file descriptor remains open. - For `"lock"`, the function opens the file and applies a lock. - For `"unlock"`, it removes the lock and closes the file descriptor. Raises ------ ValueError If the `action` parameter is not one of `"lock"` or `"unlock"`. OSError or IOError If locking or unlocking the file fails. Examples -------- >>> from geoprior.utils.sys_utils import manage_file_lock >>> fd = manage_file_lock("/path/to/file", action="lock", blocking=True) >>> if fd: ... print("File is locked.") ... manage_file_lock(fd, action="unlock") ... print("File is unlocked.") See Also -------- os.open : Opens a file descriptor. fcntl.flock : Applies or removes file locks. """ import fcntl if action not in {"lock", "unlock"}: raise ValueError( f"Invalid action '{action}'. Expected 'lock' or 'unlock'." ) # Lock action: Open and lock the file if action == "lock": try: fd = os.open(file_path, os.O_RDWR) lock_type = ( fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH ) lock_flag = 0 if blocking else fcntl.LOCK_NB fcntl.flock(fd, lock_type | lock_flag) logger.debug( f"Locked file '{file_path}' with fd {fd}" ) return fd except OSError as e: logger.error( f"Failed to lock file '{file_path}': {e}" ) return None # Unlock action: Unlock and close the file descriptor elif action == "unlock": try: fcntl.flock(file_path, fcntl.LOCK_UN) os.close(file_path) logger.debug( f"Unlocked file '{file_path}' with fd {file_path}" ) return None except OSError as e: logger.error( f"Failed to unlock file descriptor '{file_path}': {e}" ) return None
[docs] def get_system_info() -> dict[str, str]: """ Retrieves basic system information including OS, Python version, CPU details, and GPU availability. Returns ------- system_info : dict A dictionary containing basic system information: - `os_name` : Name of the operating system. - `os_version` : Version of the operating system. - `python_version` : Python version. - `cpu_count` : Number of logical CPUs. - `gpu_available` : Whether a GPU is available (`True` or `False`). Notes ----- This function checks for GPU availability via PyTorch if installed, otherwise it defaults to False. Examples -------- >>> from geoprior.utils.sys_utils import get_system_info >>> get_system_info() {'os_name': 'Linux', 'os_version': '5.4.0-81-generic', 'python_version': '3.8.5', 'cpu_count': '8', 'gpu_available': 'True'} See Also -------- get_python_version : Retrieves the current Python version. """ gpu_available = False try: import torch gpu_available = torch.cuda.is_available() except ImportError: pass system_info = { "os_name": platform.system(), "os_version": platform.version(), "python_version": platform.python_version(), "cpu_count": str(psutil.cpu_count(logical=True)), "gpu_available": str(gpu_available), } return system_info
[docs] def get_python_version() -> str: """ Returns the version of Python being used in the current environment. Returns ------- python_version : str The version of Python currently in use. Examples -------- >>> from geoprior.utils.sys_utils import get_python_version >>> get_python_version() '3.8.5' See Also -------- get_system_info : Provides broader system information, including Python version. """ return platform.python_version()
[docs] def get_installed_packages() -> list[str]: """ Lists all installed packages along with their versions in the current Python environment. Returns ------- installed_packages : list of str A list of installed packages and their versions in the format `package_name==version`. Notes ----- This function is useful for dependency management and tracking installed packages, especially in data science and production environments. Examples -------- >>> from geoprior.utils.sys_utils import get_installed_packages >>> get_installed_packages() ['numpy==1.21.0', 'pandas==1.3.0', 'scikit-learn==0.24.2', ...] See Also -------- environment_summary : Summarizes the environment, including installed packages. """ try: import pkg_resources installed_packages = [ f"{d.project_name}=={d.version}" for d in pkg_resources.working_set ] return installed_packages except ImportError: logger.warning( "pkg_resources is not available. Cannot list installed packages." ) return []
[docs] def run_command( command: str, capture_output: bool = True ) -> str | None: """ Runs a shell command and optionally captures its output. Parameters ---------- command : str The shell command to execute. capture_output : bool, default=True If True, captures and returns the command’s output. If False, runs the command without capturing output, which is useful for commands that produce a large output or run interactively. Returns ------- output : str or None Returns the command output as a string if `capture_output` is True. If `capture_output` is False, returns None. Notes ----- This function uses `subprocess.run` to execute shell commands, which allows for error handling and logging. For example, ``run_command("echo Hello World")`` returns ``"Hello World\\n"`` when ``capture_output=True``. Raises ------ subprocess.CalledProcessError If the command exits with a non-zero status and `capture_output` is True. """ try: result = subprocess.run( command, shell=True, capture_output=capture_output, text=True, check=True, ) return result.stdout if capture_output else None except subprocess.CalledProcessError as e: print(f"Error executing command '{command}': {e}") return None
[docs] def create_temp_file( suffix: str = "", prefix: str = "tmp" ) -> str: """ Creates a temporary file and returns its path. Parameters ---------- suffix : str, optional The suffix for the temporary file. Default is an empty string. prefix : str, optional The prefix for the temporary file. Default is `"tmp"`. Returns ------- file_path : str The full path of the created temporary file. Notes ----- This function is useful for handling data temporarily in applications where files need to be stored and accessed for a short time. Examples -------- >>> from geoprior.utils.sys_utils import create_temp_file >>> temp_file = create_temp_file() >>> print(temp_file) '/tmp/tmpabcd1234' See Also -------- create_temp_dir : Creates a temporary directory. """ temp_file = tempfile.NamedTemporaryFile( suffix=suffix, prefix=prefix, delete=False ) temp_file.close() return temp_file.name
[docs] def create_temp_dir(prefix: str = "tmp") -> str: """ Creates a temporary directory and returns its path. Parameters ---------- prefix : str, optional The prefix for the temporary directory name. Default is `"tmp"`. Returns ------- dir_path : str The full path of the created temporary directory. Notes ----- This function is helpful for managing temporary directories in applications where short-term data storage is needed. Examples -------- >>> from geoprior.utils.sys_utils import create_temp_dir >>> temp_dir = create_temp_dir() >>> print(temp_dir) '/tmp/tmpabcd1234' See Also -------- create_temp_file : Creates a temporary file. """ return tempfile.mkdtemp(prefix=prefix)
[docs] def clean_temp_files(directory: str | None = None) -> None: """ Cleans up temporary files in a specified directory. Parameters ---------- directory : str, optional The directory to clean up. If None, cleans the default temporary directory. Returns ------- None Notes ----- This function is particularly useful for freeing up disk space in data-intensive applications. Examples -------- >>> from geoprior.utils.sys_utils import clean_temp_files >>> clean_temp_files("/path/to/temp/dir") """ dir_to_clean = directory or tempfile.gettempdir() for item in os.listdir(dir_to_clean): item_path = os.path.join(dir_to_clean, item) try: if os.path.isfile(item_path): os.unlink(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path) except Exception as e: print(f"Failed to delete {item_path}: {e}")
[docs] def is_package_installed(package_name: str) -> bool: """ Checks if a specific package is installed in the current Python environment. Parameters ---------- package_name : str The name of the package to check. Returns ------- bool True if the package is installed, otherwise False. Examples -------- >>> from geoprior.utils.sys_utils import is_package_installed >>> is_package_installed("numpy") True """ package_spec = importlib.util.find_spec(package_name) return package_spec is not None
[docs] def manage_temp( suffix: str = "", prefix: str = "tmp", action: str = "create_file", directory: str | None = None, clean_all: bool = False, ) -> str | None: """ Manages temporary files and directories by creating, accessing, or cleaning them as needed. Parameters ---------- suffix : str, optional Suffix for the temporary file or directory, used only if `action` is `"create_file"` or `"create_dir"`. Default is an empty string. prefix : str, optional Prefix for the temporary file or directory, used only if `action` is `"create_file"` or `"create_dir"`. Default is `"tmp"`. action : str, default="create_file" Specifies the operation to perform. Options include: - `"create_file"`: Creates a temporary file and returns its path. - `"create_dir"`: Creates a temporary directory and returns its path. - `"clean"`: Cleans temporary files in the specified directory or the system temp directory if none is provided. directory : str, optional Directory to clean when `action` is `"clean"`. If `None`, uses the system’s default temporary directory. Ignored for file or directory creation actions. clean_all : bool, default=False If `True`, removes all files and directories within the specified directory. If `False`, only deletes files or directories created by this process. Used only when `action` is `"clean"`. Returns ------- temp_path : str or None - For `"create_file"` and `"create_dir"` actions, returns the path of the created file or directory. - For `"clean"`, returns None. Raises ------ ValueError If an invalid action is specified. Notes ----- This function is useful for managing temporary resources in data processing tasks, where files or directories need to be created and cleaned up after use. Examples -------- >>> from geoprior.utils.sys_utils import manage_temp >>> temp_file = manage_temp(action="create_file") >>> print(temp_file) '/tmp/tmpabcd1234' >>> temp_dir = manage_temp(action="create_dir", prefix="data_") >>> print(temp_dir) '/tmp/data_abcd1234' >>> manage_temp(action="clean", directory="/path/to/temp", clean_all=True) See Also -------- tempfile : Module for creating temporary files and directories. shutil : High-level file operations. """ # Create a temporary file and return its path if action == "create_file": temp_file = tempfile.NamedTemporaryFile( suffix=suffix, prefix=prefix, delete=False ) temp_file.close() return temp_file.name # Create a temporary directory and return its path elif action == "create_dir": return tempfile.mkdtemp(suffix=suffix, prefix=prefix) # Clean up temporary files and directories elif action == "clean": dir_to_clean = directory or tempfile.gettempdir() for item in os.listdir(dir_to_clean): item_path = os.path.join(dir_to_clean, item) try: if os.path.isfile( item_path ) or os.path.islink(item_path): os.unlink(item_path) elif os.path.isdir(item_path) and clean_all: shutil.rmtree(item_path) except Exception as e: print(f"Failed to delete {item_path}: {e}") return None # Handle invalid action else: raise ValueError( f"Invalid action '{action}'. Expected 'create_file'," " 'create_dir', or 'clean'." )
[docs] def check_port_in_use(port: int) -> bool: """ Checks if a port is currently in use, which is useful for server-based applications. Parameters ---------- port : int The port number to check. Returns ------- bool True if the port is in use, otherwise False. Examples -------- >>> from geoprior.utils.sys_utils import check_port_in_use >>> check_port_in_use(8080) False """ with socket.socket( socket.AF_INET, socket.SOCK_STREAM ) as s: return s.connect_ex(("localhost", port)) == 0
[docs] def get_uptime() -> str: """ Returns the system uptime in a human-readable format. Returns ------- uptime : str The system uptime formatted as `"Xd:Yh:Zm:Ws"`, where X, Y, Z, and W are days, hours, minutes, and seconds respectively. Notes ----- This function is useful for monitoring or diagnosing long-running processes on the system. Examples -------- >>> from geoprior.utils.sys_utils import get_uptime >>> get_uptime() '2d:5h:34m:12s' """ uptime_seconds = int(psutil.boot_time()) days, remainder = divmod(uptime_seconds, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) return f"{days}d:{hours}h:{minutes}m:{seconds}s"
# ----
[docs] def parallelize_jobs( function: Callable, tasks: Sequence[dict[str, Any]] = (), n_jobs: int | None = None, executor_type: str = "process", ) -> list: """ Parallelize the execution of a callable across multiple processors, supporting both positional and keyword arguments. Parameters ---------- function : Callable[..., Any] The function to execute in parallel. This function must be picklable if using `executor_type='process'`. tasks : Sequence[Dict[str, Any]], optional A sequence of dictionaries, where each dictionary contains two keys: 'args' (a tuple) for positional arguments, and 'kwargs' (a dict) for keyword arguments, for one execution of `function`. Defaults to an empty sequence. n_jobs : Optional[int], optional The number of jobs to run in parallel. `None` or `1` uses a single processor, any positive integer specifies the exact number of processors to use, `-1` uses all available processors. Default is None (1 processor). executor_type : str, optional The type of executor to use. Can be 'process' for CPU-bound tasks or 'thread' for I/O-bound tasks. Default is 'process'. Returns ------- list A list of results from the function executions. Raises ------ ValueError If `function` is not picklable when using 'process' as `executor_type`. Examples -------- >>> from geoprior.utils.sys_utils import parallelize_jobs >>> def greet(name, greeting='Hello'): ... return f"{greeting}, {name}!" >>> tasks = [ ... {'args': ('John',), 'kwargs': {'greeting': 'Hi'}}, ... {'args': ('Jane',), 'kwargs': {}} ... ] >>> results = parallelize_jobs(greet, tasks, n_jobs=2) >>> print(results) ['Hi, John!', 'Hello, Jane!'] """ if executor_type == "process": import_optional_dependency("cloudpickle") import cloudpickle try: cloudpickle.dumps(function) except cloudpickle.PicklingError: raise ValueError( "The function to be parallelized must be " "picklable when using 'process' executor." ) num_workers = ( multiprocessing.cpu_count() if n_jobs == -1 else (1 if n_jobs is None else n_jobs) ) ExecutorClass = ( ProcessPoolExecutor if executor_type == "process" else ThreadPoolExecutor ) results = [] with ExecutorClass(max_workers=num_workers) as executor: futures = [ executor.submit( function, *task.get("args", ()), **task.get("kwargs", {}), ) for task in tasks ] for future in as_completed(futures): results.append(future.result()) return results
[docs] def find_by_regex(o, pattern, func=re.match, **kws): """Find pattern in object whatever an "iterable" or not. when we talk about iterable, a string value is not included. Parameters ----------- o: str or iterable, text litteral or an iterable object containing or not the specific object to match. pattern: str, default = '[_#&*@!_,;\s-]\s*' The base pattern to split the text into a columns func: re callable , default=re.match regular expression search function. Can be [re.match, re.findall, re.search ],or any other regular expression function. * ``re.match()``: function searches the regular expression pattern and return the first occurrence. The Python RegEx Match method checks for a match only at the beginning of the string. So, if a match is found in the first line, it returns the match object. But if a match is found in some other line, the Python RegEx Match function returns null. * ``re.search()``: function will search the regular expression pattern and return the first occurrence. Unlike Python re.match(), it will check all lines of the input string. The Python re.search() function returns a match object when the pattern is found and “null” if the pattern is not found * ``re.findall()`` module is used to search for 'all' occurrences that match a given pattern. In contrast, search() module will only return the first occurrence that matches the specified pattern. findall() will iterate over all the lines of the file and will return all non-overlapping matches of pattern in a single step. kws: dict, Additional keywords arguments passed to functions :func:`re.match` or :func:`re.search` or :func:`re.findall`. Returns ------- om: list matched object put is the list Example -------- >>> from geoprior.utils.sys_utils import find_by_regex >>> from geoprior.datasets import load_hlogs >>> X0, _= load_hlogs (as_frame =True ) >>> columns = X0.columns >>> str_columns =','.join (columns) >>> find_by_regex (str_columns , pattern='depth', func=re.search) ... ['depth'] >>> find_by_regex(columns, pattern ='depth', func=re.search) ... ['depth_top', 'depth_bottom'] """ om = [] if isinstance(o, str): om = func(pattern=pattern, string=o, **kws) if om: om = om.group() om = [om] elif is_iterable(o): o = list(o) for s in o: z = func(pattern=pattern, string=s, **kws) if z: om.append(s) if func.__name__ == "findall": om = list(itertools.chain(*om)) # keep None is nothing # fit the corresponding pattern if len(om) == 0 or om[0] is None: om = None return om
[docs] def find_similar_string( name: str, container: list[str] | tuple[str, ...] | dict[Any, Any], stripitems: str | list[str] | tuple[str, ...] = "_", deep: bool = False, ) -> str | None: """ Find the most similar string in a container to the provided name. This function searches for the most likely matching string in a container based on the provided `name`. It sanitizes the `name` by stripping specified characters and can perform a deep search to find partial matches. Parameters ---------- name : str The string to search for in the container. container : list, tuple, or dict The container with strings to search in. stripitems : str or list of str, optional Characters or strings to strip from `name` before searching. If a string, multiple items can be separated by ':', ',', or ';'. Default is ``'_'``. deep : bool, optional If ``True``, performs a deeper search by checking if `name` is a substring of any item in the container. Default is ``False``. Returns ------- result : str or None The most similar string from the container, or ``None`` if no match is found. Examples -------- >>> from geoprior.utils.sys_utils import find_similar_string >>> container = {'dipole': 1, 'quadrupole': 2} >>> find_similar_string('dipole_', container) 'dipole' >>> find_similar_string('dip', container, deep=True) 'dipole' >>> find_similar_string('+dipole__', container, stripitems='+;__', deep=True) 'dipole' Notes ----- This function is useful when trying to find the closest matching string in a container, especially when exact matches are not guaranteed due to formatting inconsistencies or typos. See Also -------- str.strip : Returns a copy of the string with leading and trailing characters removed. """ # Validate inputs if not isinstance(name, str): raise TypeError( "`name` must be a string, got {type(name).__name__}" ) if not isinstance(container, list | tuple | dict): raise TypeError( "`container` must be a list, tuple," f" or dict, got {type(container).__name__}" ) if not isinstance(stripitems, str | list | tuple): raise TypeError( f"`stripitems` must be a string or list/tuple of strings," f" got {type(stripitems).__name__}" ) # Process stripitems if isinstance(stripitems, str): for sep in (":", ",", ";"): if sep in stripitems: stripitems = stripitems.split(sep) break else: stripitems = [stripitems] else: stripitems = list(stripitems) # Sanitize name for s in stripitems: name = name.strip(s) # Prepare container keys if isinstance(container, dict): container_keys = [ key.lower() for key in container.keys() ] keys_list = list(container.keys()) else: container_keys = [ str(item).lower() for item in container ] keys_list = list(container) name_lower = name.lower() try: index = container_keys.index(name_lower) result = keys_list[index] return result except ValueError: pass # Not found, proceed if deep: for idx, item in enumerate(container_keys): if name_lower in item: result = keys_list[idx] return result return None
[docs] def represent_callable( obj: Callable, skip: str | list[str] | None = None, ) -> str: """ Represent callable objects by formatting their signatures. This function generates a string representation of a callable object's signature, including parameters and default values. It supports classes, functions, and instance methods. Parameters ---------- obj : callable The callable object to format. skip : str or list of str, optional Parameter names to skip in the representation. Useful for omitting certain attributes. Returns ------- representation : str A string representation of the callable object's signature. Raises ------ TypeError If `obj` is not a callable object. Examples -------- >>> from geoprior.utils.sys_utils import represent_callable >>> def example_function(a, b=2): ... pass >>> represent_callable(example_function) 'example_function(a, b=2)' >>> class ExampleClass: ... def __init__(self, x, y=10): ... self.x = x ... self.y = y >>> represent_callable(ExampleClass) 'ExampleClass(x, y=10)' >>> instance = ExampleClass(5) >>> represent_callable(instance) 'ExampleClass(x=5, y=10)' Notes ----- This function is useful for logging or displaying the parameters of callable objects in a readable format. See Also -------- inspect.signature : Get a signature object for the callable. """ if not callable(obj) and not hasattr(obj, "__dict__"): raise TypeError( f"Object '{obj}' is not callable or does not have attributes." ) if isinstance(skip, str): skip = [skip] elif skip is None: skip = [] else: skip = list(skip) obj_name = ( obj.__name__ if hasattr(obj, "__name__") else obj.__class__.__name__ ) try: sig = inspect.signature(obj) params = [ f"{name}={repr(param.default)}" if param.default is not inspect.Parameter.empty else name for name, param in sig.parameters.items() if name not in skip ] representation = f"{obj_name}({', '.join(params)})" except (TypeError, ValueError): # If obj is an instance, get its __dict__ attributes attrs = { k: v for k, v in vars(obj).items() if not k.startswith("_") and k not in skip } # Limit the number of attributes displayed if len(attrs) > 6: displayed_attrs = ( list(attrs.items())[:3] + [("...", "...")] + list(attrs.items())[-3:] ) else: displayed_attrs = attrs.items() params = [ f"{k}={repr(v)}" for k, v in displayed_attrs ] representation = f"{obj_name}({', '.join(params)})" return representation
[docs] def safe_getattr( obj: Any, name: str, default_value: Any | None = None ) -> Any: """ Safely get an attribute from an object, with a helpful error message. This function attempts to retrieve an attribute from the given object. If the attribute is not found, it can return a default value or raise an AttributeError with a suggestion for a similar attribute. Parameters ---------- obj : object The object from which to retrieve the attribute. name : str The name of the attribute to retrieve. default_value : any, optional A default value to return if the attribute is not found. If ``None``, an ``AttributeError`` will be raised. Returns ------- value : any The value of the retrieved attribute or the default value. Raises ------ AttributeError If the attribute is not found and no default value is provided. Examples -------- >>> from geoprior.utils.sys_utils import safe_getattr >>> class MyClass: ... def __init__(self, a, b): ... self.a = a ... self.b = b >>> obj = MyClass(1, 2) >>> safe_getattr(obj, 'a') 1 >>> safe_getattr(obj, 'c', default_value='default') 'default' >>> safe_getattr(obj, 'c') Traceback (most recent call last): ... AttributeError: 'MyClass' object has no attribute 'c'. Did you mean 'a'? Notes ----- This function enhances the built-in `getattr` by providing helpful suggestions when an attribute is not found. See Also -------- getattr : Built-in function to get an attribute from an object. """ if hasattr(obj, name): return getattr(obj, name) if default_value is not None: return default_value # Attempt to find a similar attribute name similar_attr = find_similar_string( name, vars(obj), deep=True ) suggestion = ( f". Did you mean '{similar_attr}'?" if similar_attr else "" ) raise AttributeError( f"'{obj.__class__.__name__}' object has no attribute '{name}'{suggestion}" )
class _SafeOptimize: def __init__( self, func: Callable | None = None, *, parallelize: bool = True, memory_cleanup: bool = False, log_level: int = logging.INFO, optimize_cpu: bool = True, num_processes: int | None = None, cpu_cores: list[int] | None = None, verbose: bool = True, mode: str = "strict", ): self.func = func self.parallelize = parallelize self.memory_cleanup = memory_cleanup self.log_level = log_level self.optimize_cpu = optimize_cpu self.num_processes = num_processes self.cpu_cores = cpu_cores self.verbose = verbose self.mode = mode def __call__(self, *args, **kwargs): if ( self.func is None and len(args) == 1 and callable(args[0]) ): # Decorator used without arguments self.func = args[0] return self._wrap_function(self.func) elif self.func and callable(self.func): # Function is already set, execute it return self._wrap_function(self.func)( *args, **kwargs ) else: # Decorator used with arguments def wrapper(func): self.func = func return self._wrap_function(func) return wrapper def _wrap_function(self, func): @functools.wraps(func) def wrapped_function(*args, **kwargs): return self._execute_function( func, *args, **kwargs ) return wrapped_function def _execute_function(self, func, *args, **kwargs): # Set up logging based on the specified log level logger.setLevel(self.log_level) # Record the start time start_time = time.time() if self.verbose: logger.info( f"Starting workflow optimization for '{func.__name__}'..." ) # Apply CPU optimization if requested if self.optimize_cpu and self.cpu_cores: try: _reset_cpu_affinity(self.cpu_cores) logger.info( f"Optimized CPU usage, restricted to cores " f"{self.cpu_cores}." ) except Exception as e: logger.error(f"CPU optimization failed: {e}") if self.mode == "strict": raise elif self.mode == "soft": logger.warning( "Falling back to default CPU settings." ) # Check if the function is picklable if self.parallelize: parallelize_flag = True if not _is_picklable(func, self.mode): if self.mode == "strict": raise pickle.PicklingError( f"Function '{func.__name__}' or its arguments are " "not picklable." ) elif self.mode == "soft": logger.warning( f"Function '{func.__name__}' or its arguments are " "not picklable. Falling back to sequential execution." ) parallelize_flag = False else: parallelize_flag = False # Apply parallelization strategy if enabled if parallelize_flag: try: results = _parallelize_flow( func, self.num_processes, *args, **kwargs ) except Exception as e: logger.error( f"Parallel execution failed: {e}" ) if self.mode == "strict": raise elif self.mode == "soft": logger.warning( "Falling back to sequential execution." ) results = func(*args, **kwargs) else: # Execute function normally if parallelization is disabled results = func(*args, **kwargs) # If memory cleanup is requested, clean up after execution if self.memory_cleanup: _clean_up_memory(verbose=self.verbose) logger.info("Memory cleanup completed.") # Log execution time elapsed_time = time.time() - start_time if self.verbose: logger.info( f"Workflow '{func.__name__}' completed in " f"{elapsed_time:.4f} seconds." ) return results
[docs] def safe_optimize( func: Callable | None = None, *, parallelize: bool = True, memory_cleanup: bool = False, log_level: int = logging.INFO, optimize_cpu: bool = True, num_processes: int | None = None, cpu_cores: list[int] | None = None, verbose: bool = True, mode: str = "strict", ) -> Callable: """ Optimizes the workflow by wrapping a function to measure execution time, enable parallelization, manage resources, and perform memory cleanup and acts similary like class-based decorator `WorflowOptimizer`. Class-based decorators can sometimes encounter issues when trying to pickle certain objects, especially in parallel execution contexts. This issue arises because certain objects (such as file handles, open network connections, or non-serializable class instances) cannot be passed between processes in multiprocessing environments. By ensuring compatibility with these contexts, `safe_optimize` helps mitigate such issues and optimize the execution of computationally intensive workflows. This decorator is particularly suitable for workflows involving large-scale computations, such as data processing pipelines, machine learning model training, or simulations, where parallel execution and resource optimization are crucial for performance improvement. Parameters ---------- parallelize : bool, optional Flag to enable or disable parallel processing (default is ``True``). memory_cleanup : bool, optional Whether to clean up system memory after execution (default is ``False``). log_level : int, optional Level of logging (default is ``logging.INFO``). Set to ``logging.DEBUG`` for more detailed logs. optimize_cpu : bool, optional Whether to optimize CPU core usage (default is ``True``). num_processes : Optional[int], optional The number of parallel processes for execution (default is ``None``). cpu_cores : Optional[List[int]], optional Specify a list of CPU cores to restrict the process (default is ``None``). verbose : bool, optional Whether to print detailed logs during execution (default is ``True``). mode : str, optional Mode for handling pickling issues: ``'strict'`` to raise errors, or ``'soft'`` to fallback to sequential execution with warnings (default is ``'strict'``). Returns ------- decorator : Callable The wrapped function that includes optimization strategies. Raises ------ ValueError If an unsupported mode is specified. Examples -------- >>> from geoprior.utils.sys_utils import safe_optimize >>> @safe_optimize( ... parallelize=True, ... memory_cleanup=True, ... log_level=logging.DEBUG, ... optimize_cpu=True, ... num_processes=4, ... cpu_cores=[0, 1, 2, 3], ... verbose=True, ... mode='soft' ... ) ... def process_data(data): ... # Your data processing logic here ... return [d * 2 for d in data] >>> data = [1, 2, 3, 4, 5] >>> results = process_data(data) >>> print(results) [2, 4, 6, 8, 10] Notes ----- - This decorator uses multiprocessing for parallel execution, which may not be suitable for all environments, especially those that do not support forking (e.g., some Windows configurations). - Ensure that the decorated function and its arguments are picklable when using parallelization. - The `mode` parameter allows handling non-picklable objects gracefully. See Also -------- multiprocessing.Pool : For parallel task execution. psutil : For system and process utilities. functools.wraps : For preserving metadata of decorated functions. """ return _SafeOptimize( func=func, parallelize=parallelize, memory_cleanup=memory_cleanup, log_level=log_level, optimize_cpu=optimize_cpu, num_processes=num_processes, cpu_cores=cpu_cores, verbose=verbose, mode=mode, )
def _is_picklable(func: Callable, mode: str) -> bool: """ Check whether the function and its arguments are picklable. Parameters ---------- func : Callable The function to check for picklability. mode : str The mode of operation: 'strict' or 'soft'. Returns ------- bool Returns ``True`` if the function and its arguments are picklable, ``False`` otherwise. Raises ------ PicklingError If the function or its arguments are not picklable and mode is 'strict'. """ try: # Attempt to pickle the function pickle.dumps(func) return True except pickle.PicklingError as e: if is_module_installed("cloudpickle"): import cloudpickle try: cloudpickle.dumps(func) return True except Exception as e: logger.error( f"Function '{func.__name__}' is not picklable: {e}" ) if mode == "strict": raise pickle.PicklingError( f"Function '{func.__name__}' is not picklable. {e}" ) elif mode == "soft": logger.warning( f"Function '{func.__name__}' is not picklable. " "Falling back to sequential execution." ) return False else: if mode == "strict": raise pickle.PicklingError( f"Function '{func.__name__}' is not picklable: {e}" ) elif mode == "soft": logger.warning( f"Function '{func.__name__}' is not picklable: {e}. " "Parallelization will be skipped." ) return False except Exception as e: logger.error( f"Unexpected error during pickling check: {e}" ) if mode == "strict": raise elif mode == "soft": logger.warning( "Falling back to sequential execution." ) return False def _parallelize_flow( func: Callable, num_processes: int | None, *args, **kwargs, ): """ Parallelize the execution of a function across multiple processes. Parameters ---------- func : Callable The function to execute in parallel. num_processes : Optional[int] The number of parallel processes to use. If ``None``, defaults to the number of CPU cores. *args : tuple Positional arguments to pass to the function. **kwargs : dict Keyword arguments to pass to the function. Returns ------- list A list of results from each parallel execution. Raises ------ TypeError If the 'data' keyword argument is not a list or tuple. """ if "data" in kwargs: data = kwargs["data"] elif args: data = args[0] else: logger.info( "No data provided for parallel processing." ) return func(*args, **kwargs) if not isinstance(data, list | tuple): raise TypeError( f"'data' parameter should be a list or tuple, got " f"{type(data).__name__!r} instead." ) if is_module_installed("joblib"): from joblib import Parallel, delayed try: # Run the function in parallel using joblib results = Parallel(n_jobs=num_processes)( delayed(func)(item, *args[1:], **kwargs) for item in data ) except Exception as e: logger.error(f"Parallel execution failed: {e}") logger.warning( "Falling back to sequential execution." ) results = func(*args, **kwargs) else: import multiprocessing num_processes_ = num_processes or min( multiprocessing.cpu_count(), len(data) ) logger.info( f"Parallelizing with {num_processes_} processes." ) # Use multiprocessing Pool to parallelize tasks with multiprocessing.Pool( processes=num_processes_ ) as pool: results = pool.map(func, data) return results def _reset_cpu_affinity(cpu_cores: list[int]): """ Restrict the process to specific CPU cores. Parameters ---------- cpu_cores : List[int] A list of CPU core indices to bind the process to. Raises ------ psutil.Error If setting CPU affinity fails. """ try: p = psutil.Process() p.cpu_affinity( cpu_cores ) # Set the process CPU affinity except psutil.Error as e: logger.error( f"Failed to set CPU affinity to cores {cpu_cores}: {e}" ) raise def _delete_temp_files(temp_dir: str, verbose: bool = True): """ Deletes temporary files or directories created during the workflow. Parameters ---------- temp_dir : str The path to the temporary directory to be deleted. verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Uses ``shutil.rmtree`` to remove directories and their contents. - Does nothing if the specified directory does not exist. """ path = Path(temp_dir) if path.exists() and path.is_dir(): shutil.rmtree(temp_dir) if verbose: print(f"Deleted temporary directory: {temp_dir}") logger.debug( f"Deleted temporary directory: {temp_dir}" ) else: if verbose: print( f"No temporary files found to delete in '{temp_dir}'." ) logger.debug( f"No temporary files found to delete in '{temp_dir}'." ) def _clear_unused_variables(verbose: bool = True): """ Deletes unused variables to free up memory using garbage collection. Parameters ---------- verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Invokes Python's garbage collector to clean up unreferenced objects. """ gc.collect() if verbose: print( "Cleared unused variables (Garbage Collection)." ) logger.debug( "Cleared unused variables (Garbage Collection)." ) def _clear_system_memory(verbose: bool = True): """ Frees up system memory by performing garbage collection and printing memory usage. Parameters ---------- verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Uses `psutil` to monitor memory usage before and after cleanup. """ process = psutil.Process(os.getpid()) memory_before = process.memory_info().rss / ( 1024**2 ) # Convert to MB if verbose: print( f"Current memory usage before cleanup: {memory_before:.2f} MB" ) logger.debug( f"Current memory usage before cleanup: {memory_before:.2f} MB" ) gc.collect() memory_after = process.memory_info().rss / ( 1024**2 ) # Convert to MB if verbose: print( f"Memory usage after cleanup: {memory_after:.2f} MB" ) logger.debug( f"Memory usage after cleanup: {memory_after:.2f} MB" ) def _clear_cuda_cache(verbose: bool = True): """ Clears CUDA memory cache if using PyTorch with CUDA. Parameters ---------- verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Requires PyTorch to be installed and CUDA to be available. """ try: import torch # For PyTorch GPU memory management if torch.cuda.is_available(): torch.cuda.empty_cache() if verbose: print("Cleared CUDA cache (PyTorch).") logger.debug("Cleared CUDA cache (PyTorch).") except ImportError: if verbose: print( "PyTorch is not installed; skipping CUDA cache clearing." ) logger.debug( "PyTorch is not installed; skipping CUDA cache clearing." ) def _clear_tensorflow_cache(verbose: bool = True): """ Clears TensorFlow GPU memory cache if applicable. Parameters ---------- verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Requires TensorFlow to be installed. """ try: import tensorflow as tf # For TensorFlow GPU memory management gpus = tf.config.list_physical_devices("GPU") if gpus: try: for gpu in gpus: tf.config.experimental.reset_memory_growth( gpu ) tf.keras.backend.clear_session() if verbose: print("Cleared GPU memory (TensorFlow).") logger.debug( "Cleared GPU memory (TensorFlow)." ) except RuntimeError as e: logger.error( f"Error clearing TensorFlow GPU memory: {e}" ) except ImportError: if verbose: print( "TensorFlow is not installed; skipping GPU cache clearing." ) logger.debug( "TensorFlow is not installed; skipping GPU cache clearing." ) def _clean_up_memory(verbose: bool = True): """ Cleans up memory by clearing caches, releasing unused resources. Parameters ---------- verbose : bool, optional Whether to log the action. Default is ``True``. Notes ----- - Clears CUDA caches for PyTorch and TensorFlow if available. - Performs garbage collection and system memory cleanup. """ logger.info("Starting memory cleanup...") if is_module_installed("torch"): # Clear CUDA memory if using PyTorch with CUDA _clear_cuda_cache(verbose) if is_module_installed("tensorflow"): # Clear TensorFlow GPU memory if applicable _clear_tensorflow_cache(verbose) # Clear unused variables in the Python environment _clear_unused_variables(verbose) # Attempt to free system memory _clear_system_memory(verbose) logger.info("Memory cleanup complete.")