geoprior.utils.sys_utils#

System utilities module for managing system-level operations.

This module provides utilities essential for system-level tasks such as color management, regular expression searching, and projection validation, along with other miscellaneous system operations.

Functions

build_large_df(forecast_results, dt_col, tname)

Construct memory-optimized DataFrame from large forecast results using chunked processing.

check_port_in_use(port)

Checks if a port is currently in use, which is useful for server-based applications.

clean_temp_files([directory])

Cleans up temporary files in a specified directory.

create_temp_dir([prefix])

Creates a temporary directory and returns its path.

create_temp_file([suffix, prefix])

Creates a temporary file and returns its path.

environment_summary()

Provides a summary of the current computing environment, including information on Python version, OS, CPU, memory, available GPU(s), and a list of installed Python packages.

find_by_regex(o, pattern[, func])

Find pattern in object whatever an "iterable" or not.

find_similar_string(name, container[, ...])

Find the most similar string in a container to the provided name.

get_cpu_usage([per_cpu])

Returns the current CPU usage as a percentage, optionally providing per-core usage for systems with multiple cores.

get_disk_usage([path])

Returns disk usage statistics for a specified filesystem path, including total, used, and free disk space in gigabytes (GB).

get_gpu_info()

Provides detailed information about available GPUs, including device name, memory capacity, and CUDA version (if PyTorch is installed).

get_installed_packages()

Lists all installed packages along with their versions in the current Python environment.

get_memory_usage()

Retrieves system memory usage statistics, providing the total, used, and available memory in megabytes (MB).

get_python_version()

Returns the version of Python being used in the current environment.

get_system_info()

Retrieves basic system information including OS, Python version, CPU details, and GPU availability.

get_uptime()

Returns the system uptime in a human-readable format.

is_gpu_available()

Checks if a GPU is available for computation on the system, using the PyTorch library if it is installed.

is_package_installed(package_name)

Checks if a specific package is installed in the current Python environment.

is_path_accessible(path[, permissions])

Checks if a specified path is accessible with the given permissions.

is_port_open(port)

Checks if a specified network port is open or occupied on the local machine.

manage_env_variable(var_name[, value, ...])

Manages environment variables, allowing retrieval, setting, or loading from a .env file.

manage_file_lock(file_path[, action, ...])

Manages file locking and unlocking to prevent concurrent access.

manage_temp([suffix, prefix, action, ...])

Manages temporary files and directories by creating, accessing, or cleaning them as needed.

parallelize_jobs(function[, tasks, n_jobs, ...])

Parallelize the execution of a callable across multiple processors, supporting both positional and keyword arguments.

represent_callable(obj[, skip])

Represent callable objects by formatting their signatures.

run_command(command[, capture_output])

Runs a shell command and optionally captures its output.

safe_getattr(obj, name[, default_value])

Safely get an attribute from an object, with a helpful error message.

safe_optimize([func, parallelize, ...])

Optimizes the workflow by wrapping a function to measure execution time, enable parallelization, manage resources, and perform memory cleanup and acts similary like class-based decorator WorflowOptimizer.

system_uptime()

Retrieves the system uptime, which is the duration the system has been running since the last boot, in a human-readable format.

Classes

BatchDataFrameBuilder([chunk_size, ...])

Manages incremental construction of a large DataFrame in controlled-size chunks.

WorkflowOptimizer([parallelize, ...])

WorkflowOptimizer is a decorator class designed to optimize the execution of computationally intensive functions by enabling parallelization, managing CPU and memory resources, and performing cleanup tasks.

class geoprior.utils.sys_utils.BatchDataFrameBuilder(chunk_size=100000, processor='auto', verbose=1)[source]#

Bases: object

Manages incremental construction of a large DataFrame in controlled-size chunks. This can reduce peak memory usage and allow GPU-accelerated libraries (e.g., cudf) if they are available and desired.

The approach can be expressed mathematically as a chunking process that partitions an incoming stream of \(N\) row-dictionaries into \(k\) subsets of size \(m\ (\text{<=}\ \text{chunk\_size})\):

(1)#\[k = \left\lceil \frac{N}{m} \right\rceil\]

Each subset is converted into a DataFrame, stored, and released from memory, and then concatenated at finalization time.

Parameters:
  • chunk_size (int, optional) – The maximum number of rows to hold in the internal buffer before converting them into a DataFrame chunk. Default is 100000.

  • processor ({'auto', 'cpu', 'gpu'}, optional) –

    Controls the engine used to build the DataFrame:

    • 'cpu' : Always use pandas.

    • 'gpu' : Attempt to use cudf (raise an error if not available).

    • 'auto' : Use cudf if a GPU is detected and cudf is installed; otherwise fallback to pandas.

  • verbose (int, optional) –

    Verbosity level. Default is 1:

    • 0 : Silent.

    • 1 : Basic information.

    • 2 : Debug / detailed printing.

Notes

This object is intended for situations where the total row count can be very large, potentially in the millions. By breaking data into chunks, you can avoid excessive memory usage and keep the system more responsive. If processor is 'auto' or 'gpu', the module calls check_processor to verify GPU availability, then uses cudf if appropriate.

Note

If the total data is larger than your available memory (whether RAM or GPU), consider writing out each chunk to disk as a partitioned file (e.g., Parquet or Feather) instead of storing them all in memory.

Examples

>>> from geoprior.utils.sys_utils import BatchDataFrameBuilder
>>> # Suppose we have a large list of dictionaries
>>> data = [
...     {'colA': i, 'colB': i**2} for i in range(10**6)
... ]
>>> with BatchDataFrameBuilder(chunk_size=50000,
...                            processor='auto',
...                            verbose=2) as builder:
...     builder.add_rows(data)
...
>>> # After exiting the context, the final DataFrame is
>>> # automatically built and stored in builder.final_df
>>> final_df = builder.final_df
>>> print(final_df.shape)
(1000000, 2)

See also

pandas.DataFrame

Core pandas DataFrame object.

cudf.DataFrame

GPU DataFrame object from RAPIDS.

check_processor

Utility for detecting GPU availability.

__init__(chunk_size=100000, processor='auto', verbose=1)[source]#

Initializes the builder, setting up chunk size, processor preference, and verbosity. Checks for GPU availability if requested.

__enter__()[source]#

Enters the context manager. Returns self so we can use it in a with-statement scope.

add_row(row)[source]#

Adds a single row to the internal buffer.

This method appends the given dictionary row to the in-memory buffer. If the buffer reaches self.chunk_size, it is automatically flushed.

Parameters:

row (dict) – A row in dictionary form, where keys correspond to column names and values represent the row data.

Notes

Internally calls _flush() once the buffer has reached its maximum size.

add_rows(rows)[source]#

Adds multiple rows to the internal buffer.

This method iterates over the list of dictionaries rows. For each element, add_row() is called, which may trigger a flush if the buffer is full.

Parameters:

rows (list of dict) – Each dictionary should have the same structure as a typical row in the final DataFrame.

Notes

This method is merely a convenience layer over add_row().

finalize()[source]#

Flushes remaining rows and concatenates all chunks.

Once the remaining rows in _rows are processed into a chunk, this method concatenates all stored chunk DataFrames (either pandas or cudf) into one final DataFrame. The resulting DataFrame is returned.

Returns:

The final DataFrame, which may be a pandas DataFrame or a cudf DataFrame (if processor is set to allow GPU usage and cudf is available).

Return type:

DataFrame

Notes

After concatenation, all chunk DataFrames are cleared from memory. This method is called automatically upon exiting the context (i.e., in __exit__()).

__exit__(exc_type, exc_val, exc_tb)[source]#

Exits the context manager. Automatically finalizes the DataFrame by calling finalize(), storing the result in self.final_df.

class geoprior.utils.sys_utils.WorkflowOptimizer(parallelize=True, memory_cleanup=False, log_level=20, optimize_cpu=True, num_processes=None, cpu_cores=None, verbose=True)[source]#

Bases: object

WorkflowOptimizer is a decorator class designed to optimize the execution of computationally intensive functions by enabling parallelization, managing CPU and memory resources, and performing cleanup tasks. It provides flexibility through various parameters that allow users to customize optimization strategies according to their workflow requirements.

(2)#\[T_{ ext{total}} = T_{ ext{start}} + T_{ ext{execution}} + T_{ ext{cleanup}}\]

Here, \(T_{ ext{total}}\) is the total workflow time, \(T_{ ext{start}}\) is the initialization time, \(T_{ ext{execution}}\) is the main execution time, and \(T_{ ext{cleanup}}\) is the cleanup time.

Parameters:
  • parallelize (bool, optional) – Flag to enable or disable parallel processing. If set to True, the decorator will attempt to parallelize the execution of the decorated function using multiprocessing. Default is True.

  • memory_cleanup (bool, optional) – Whether to clean up system memory after the execution of the decorated function. This includes triggering garbage collection and clearing GPU caches if applicable. Default is False.

  • log_level (int, optional) – Level of logging verbosity. Accepts standard logging levels such as logging.INFO, logging.DEBUG, etc. Default is logging.INFO.

  • optimize_cpu (bool, optional) – Whether to optimize CPU usage by setting CPU affinity to restrict the process to specific CPU cores. If True, the decorator will bind the process to the cores specified in cpu_cores. Default is True.

  • num_processes (int, optional) – The number of parallel processes to use when parallelize is enabled. If not specified, it defaults to the minimum of the number of available CPU cores and the length of the data iterable passed to the function. Default is None.

  • cpu_cores (list or None, optional) – A list of specific CPU cores to bind the process to for optimized CPU usage. If None, the process is allowed to run on all available CPU cores. Example: [0, 1, 2, 3]. Default is None.

  • verbose (bool, optional) – Whether to print detailed logs during execution. If set to False, only essential information will be logged based on the log_level. Default is True.

Examples

>>> from geoprior.utils.sys_utils import WorkflowOptimizer
>>> import time
>>>
>>> @WorkflowOptimizer(
...     parallelize=True,
...     memory_cleanup=True,
...     log_level=logging.DEBUG,
...     num_processes=4,
...     cpu_cores=[0, 1, 2, 3],
...     verbose=True
... )
>>> def process_data(data_chunk):
...     '''Simulate a time-consuming data processing function.'''
...     time.sleep(1)  # Simulate a time-consuming task
...     return f"Processed {data_chunk}"
...
>>> data_chunks = ['chunk1', 'chunk2', 'chunk3', 'chunk4']
>>> results = process_data(data=data_chunks)
>>> print(results)
['Processed chunk1', 'Processed chunk2', 'Processed chunk3',
'Processed chunk4']

Notes

The decorator checks for the presence of a data keyword argument to decide whether parallelization should be applied. When parallelize=True, the decorated function should be compatible with multiprocessing, meaning it should be picklable. Memory cleanup can be useful in long-running workflows, and CPU affinity may improve performance by reducing context switching and cache misses. Logging behavior follows the standard Python logging model.

See also

multiprocessing.Pool

Provides a pool of worker processes.

psutil.Process

Allows manipulation of system processes.

__init__(parallelize=True, memory_cleanup=False, log_level=20, optimize_cpu=True, num_processes=None, cpu_cores=None, verbose=True)[source]#
Parameters:
  • parallelize (bool)

  • memory_cleanup (bool)

  • log_level (int)

  • optimize_cpu (bool)

  • num_processes (int | None)

  • cpu_cores (list[int] | None)

  • verbose (bool)

__call__(func)[source]#

Makes the class instance callable so it can be used as a decorator.

Parameters:

func (function) – The function to be decorated and optimized.

Returns:

wrapper – The wrapped function with optimization strategies applied.

Return type:

function

geoprior.utils.sys_utils.check_port_in_use(port)[source]#

Checks if a port is currently in use, which is useful for server-based applications.

Parameters:

port (int) – The port number to check.

Returns:

True if the port is in use, otherwise False.

Return type:

bool

Examples

>>> from geoprior.utils.sys_utils import check_port_in_use
>>> check_port_in_use(8080)
False
geoprior.utils.sys_utils.clean_temp_files(directory=None)[source]#

Cleans up temporary files in a specified directory.

Parameters:

directory (str, optional) – The directory to clean up. If None, cleans the default temporary directory.

Return type:

None

Notes

This function is particularly useful for freeing up disk space in data-intensive applications.

Examples

>>> from geoprior.utils.sys_utils import clean_temp_files
>>> clean_temp_files("/path/to/temp/dir")
geoprior.utils.sys_utils.create_temp_dir(prefix='tmp')[source]#

Creates a temporary directory and returns its path.

Parameters:

prefix (str, optional) – The prefix for the temporary directory name. Default is “tmp”.

Returns:

dir_path – The full path of the created temporary directory.

Return type:

str

Notes

This function is helpful for managing temporary directories in applications where short-term data storage is needed.

Examples

>>> from geoprior.utils.sys_utils import create_temp_dir
>>> temp_dir = create_temp_dir()
>>> print(temp_dir)
'/tmp/tmpabcd1234'

See also

create_temp_file

Creates a temporary file.

geoprior.utils.sys_utils.create_temp_file(suffix='', prefix='tmp')[source]#

Creates a temporary file and returns its path.

Parameters:
  • suffix (str, optional) – The suffix for the temporary file. Default is an empty string.

  • prefix (str, optional) – The prefix for the temporary file. Default is “tmp”.

Returns:

file_path – The full path of the created temporary file.

Return type:

str

Notes

This function is useful for handling data temporarily in applications where files need to be stored and accessed for a short time.

Examples

>>> from geoprior.utils.sys_utils import create_temp_file
>>> temp_file = create_temp_file()
>>> print(temp_file)
'/tmp/tmpabcd1234'

See also

create_temp_dir

Creates a temporary directory.

geoprior.utils.sys_utils.environment_summary()[source]#

Provides a summary of the current computing environment, including information on Python version, OS, CPU, memory, available GPU(s), and a list of installed Python packages.

Returns:

env_info – Dictionary containing environment details, including:

  • python_version : The version of Python in use.

  • os : Operating system name.

  • os_version : Version of the operating system.

  • cpu_count : Number of logical CPU cores.

  • memory : Total system memory in GB.

  • device_count, device_name, memory_total, cuda_version (if available) : GPU details from detailed_gpu_info.

  • installed_packages : List of installed Python packages (first 10) in name==version format.

Return type:

dict

Notes

The function attempts to load installed packages using pkg_resources. If pkg_resources is not available, it defaults to “N/A” for installed packages.

Raises:
  • ImportError – If pkg_resources is not installed.

  • RuntimeError – If an error occurs while gathering environment information.

Return type:

dict[str, str]

Examples

>>> from geoprior.utils.sys_utils import environment_summary
>>> env_info = environment_summary()
>>> print(env_info)
{'python_version': '3.9.5', 'os': 'Linux', 'os_version': '5.4.0-80-generic',
 'cpu_count': '4', 'memory': '15.5 GB', 'device_count': '1',
 'device_name': 'NVIDIA Tesla T4', 'memory_total': '15.99 GB',
 'cuda_version': '11.1', 'installed_packages': 'numpy==1.21.0, pandas==1.3.0, ...'}
geoprior.utils.sys_utils.find_by_regex(o, pattern, func=<function match>, **kws)[source]#

Find pattern in object whatever an “iterable” or not.

when we talk about iterable, a string value is not included.

Parameters:
  • o (str or iterable,) – text litteral or an iterable object containing or not the specific object to match.

  • pattern (str, default = '[_#&*@!_,;\s-]\s*') – The base pattern to split the text into a columns

  • func (re callable , default re.match) –

    regular expression search function. Can be [re.match, re.findall, re.search ],or any other regular expression function.

    • re.match(): function searches the regular expression pattern and

      return the first occurrence. The Python RegEx Match method checks for a match only at the beginning of the string. So, if a match is found in the first line, it returns the match object. But if a match is found in some other line, the Python RegEx Match function returns null.

    • re.search(): function will search the regular expression pattern

      and return the first occurrence. Unlike Python re.match(), it will check all lines of the input string. The Python re.search() function returns a match object when the pattern is found and “null” if the pattern is not found

    • re.findall() module is used to search for ‘all’ occurrences that

      match a given pattern. In contrast, search() module will only return the first occurrence that matches the specified pattern. findall() will iterate over all the lines of the file and will return all non-overlapping matches of pattern in a single step.

  • kws (dict,) – Additional keywords arguments passed to functions re.match() or re.search() or re.findall().

Returns:

om – matched object put is the list

Return type:

list

Example

>>> from geoprior.utils.sys_utils import find_by_regex
>>> from geoprior.datasets import load_hlogs
>>> X0, _= load_hlogs (as_frame =True )
>>> columns = X0.columns
>>> str_columns =','.join (columns)
>>> find_by_regex (str_columns , pattern='depth', func=re.search)
... ['depth']
>>> find_by_regex(columns, pattern ='depth', func=re.search)
... ['depth_top', 'depth_bottom']
geoprior.utils.sys_utils.find_similar_string(name, container, stripitems='_', deep=False)[source]#

Find the most similar string in a container to the provided name.

This function searches for the most likely matching string in a container based on the provided name. It sanitizes the name by stripping specified characters and can perform a deep search to find partial matches.

Parameters:
  • name (str) – The string to search for in the container.

  • container (list, tuple, or dict) – The container with strings to search in.

  • stripitems (str or list of str, optional) – Characters or strings to strip from name before searching. If a string, multiple items can be separated by ‘:’, ‘,’, or ‘;’. Default is '_'.

  • deep (bool, optional) – If True, performs a deeper search by checking if name is a substring of any item in the container. Default is False.

Returns:

result – The most similar string from the container, or None if no match is found.

Return type:

str or None

Examples

>>> from geoprior.utils.sys_utils import find_similar_string
>>> container = {'dipole': 1, 'quadrupole': 2}
>>> find_similar_string('dipole_', container)
'dipole'
>>> find_similar_string('dip', container, deep=True)
'dipole'
>>> find_similar_string('+dipole__', container, stripitems='+;__', deep=True)
'dipole'

Notes

This function is useful when trying to find the closest matching string in a container, especially when exact matches are not guaranteed due to formatting inconsistencies or typos.

See also

str.strip

Returns a copy of the string with leading and trailing characters removed.

geoprior.utils.sys_utils.get_cpu_usage(per_cpu=False)[source]#

Returns the current CPU usage as a percentage, optionally providing per-core usage for systems with multiple cores.

Parameters:

per_cpu (bool, default False) – If True, returns a list with the CPU usage percentage for each core. If False, returns the overall CPU usage as a single percentage.

Returns:

usage – If per_cpu is False, returns the overall CPU usage as a float percentage. If per_cpu is True, returns a list with each entry corresponding to the usage percentage of an individual core.

Return type:

float or list of float, optional

Notes

This function uses the psutil library to retrieve CPU usage information and requires an interval of 1 second to calculate the usage accurately.

Examples

>>> from geoprior.utils.sys_utils import get_cpu_usage
>>> get_cpu_usage()
1.3
>>> get_cpu_usage(per_cpu=True)
[20.4, 25.1, 21.3, 24.5]
geoprior.utils.sys_utils.get_disk_usage(path='/')[source]#

Returns disk usage statistics for a specified filesystem path, including total, used, and free disk space in gigabytes (GB).

Parameters:

path (str, default '/') – The filesystem path for which to check disk usage statistics. By default, it uses the root directory (/).

Returns:

disk_usage – A tuple containing:

  • total_disk : Total disk space in GB.

  • used_disk : Used disk space in GB.

  • free_disk : Free disk space in GB.

Return type:

tuple of float, optional

Notes

Disk usage information is gathered using the psutil library. Disk space is converted to gigabytes (GB) by dividing the values by 1024^3.

Raises:
  • FileNotFoundError – If the specified path does not exist on the filesystem.

  • PermissionError – If the program does not have permission to access the specified path.

Parameters:

path (str)

Return type:

tuple[float, float, float] | None

Examples

>>> from geoprior.utils.sys_utils import get_disk_usage
>>> total, used, free = get_disk_usage(path="/")
>>> print(f"Total: {total} GB, Used: {used} GB, Free: {free} GB")
Total: 256 GB, Used: 128 GB, Free: 128 GB
geoprior.utils.sys_utils.get_gpu_info()[source]#

Provides detailed information about available GPUs, including device name, memory capacity, and CUDA version (if PyTorch is installed).

Returns:

gpu_info – Dictionary containing GPU details, including:

  • device_count : Number of available GPU devices.

  • device_name : Name of the first GPU device.

  • memory_total : Total memory of the first GPU device in GB.

  • cuda_version : CUDA version, if available.

If no GPU is available or PyTorch is not installed, returns None.

Return type:

dict or None

Notes

This function requires PyTorch to check for GPU availability. If PyTorch is not installed, it logs a warning and returns None.

Raises:
  • ImportError – If PyTorch is not installed on the system.

  • RuntimeError – If there is an issue retrieving GPU properties.

Return type:

dict[str, str] | None

Examples

>>> from geoprior.utils.sys_utils import get_gpu_info
>>> gpu_info = get_gpu_info()
>>> print(gpu_info)
{'device_count': '1', 'device_name': 'NVIDIA Tesla T4',
 'memory_total': '15.99 GB', 'cuda_version': '11.1'}
geoprior.utils.sys_utils.get_installed_packages()[source]#

Lists all installed packages along with their versions in the current Python environment.

Returns:

installed_packages – A list of installed packages and their versions in the format package_name==version.

Return type:

list of str

Notes

This function is useful for dependency management and tracking installed packages, especially in data science and production environments.

Examples

>>> from geoprior.utils.sys_utils import get_installed_packages
>>> get_installed_packages()
['numpy==1.21.0', 'pandas==1.3.0', 'scikit-learn==0.24.2', ...]

See also

environment_summary

Summarizes the environment, including installed packages.

geoprior.utils.sys_utils.get_memory_usage()[source]#

Retrieves system memory usage statistics, providing the total, used, and available memory in megabytes (MB).

Returns:

memory – A tuple containing:

  • total_memory : Total memory in MB.

  • used_memory : Used memory in MB.

  • available_memory : Available memory in MB.

Return type:

tuple of float

Notes

This function leverages the psutil library for retrieving memory usage information. The conversion to MB is performed by dividing each value by 1024^2.

Examples

>>> from geoprior.utils.sys_utils import get_memory_usage
>>> total, used, available = get_memory_usage()
>>> print(f"Total: {total} MB, Used: {used} MB, Available: {available} MB")
Total: 8192 MB, Used: 4096 MB, Available: 4096 MB
geoprior.utils.sys_utils.get_python_version()[source]#

Returns the version of Python being used in the current environment.

Returns:

python_version – The version of Python currently in use.

Return type:

str

Examples

>>> from geoprior.utils.sys_utils import get_python_version
>>> get_python_version()
'3.8.5'

See also

get_system_info

Provides broader system information, including Python version.

geoprior.utils.sys_utils.get_system_info()[source]#

Retrieves basic system information including OS, Python version, CPU details, and GPU availability.

Returns:

system_info – A dictionary containing basic system information:

  • os_name : Name of the operating system.

  • os_version : Version of the operating system.

  • python_version : Python version.

  • cpu_count : Number of logical CPUs.

  • gpu_available : Whether a GPU is available (True or False).

Return type:

dict

Notes

This function checks for GPU availability via PyTorch if installed, otherwise it defaults to False.

Examples

>>> from geoprior.utils.sys_utils import get_system_info
>>> get_system_info()
{'os_name': 'Linux', 'os_version': '5.4.0-81-generic', 'python_version': '3.8.5',
 'cpu_count': '8', 'gpu_available': 'True'}

See also

get_python_version

Retrieves the current Python version.

geoprior.utils.sys_utils.get_uptime()[source]#

Returns the system uptime in a human-readable format.

Returns:

uptime – The system uptime formatted as “Xd:Yh:Zm:Ws”, where X, Y, Z, and W are days, hours, minutes, and seconds respectively.

Return type:

str

Notes

This function is useful for monitoring or diagnosing long-running processes on the system.

Examples

>>> from geoprior.utils.sys_utils import get_uptime
>>> get_uptime()
'2d:5h:34m:12s'
geoprior.utils.sys_utils.is_gpu_available()[source]#

Checks if a GPU is available for computation on the system, using the PyTorch library if it is installed.

Returns:

available – True if a GPU is available, False otherwise.

Return type:

bool

Notes

This function relies on the torch library (PyTorch) to detect GPU availability. If PyTorch is not installed, it logs a warning and returns False.

Raises:

ImportError – If PyTorch is not installed and thus the GPU availability check cannot be performed.

Return type:

bool

Examples

>>> from geoprior.utils.sys_utils import is_gpu_available
>>> is_gpu_available()
True
geoprior.utils.sys_utils.is_package_installed(package_name)[source]#

Checks if a specific package is installed in the current Python environment.

Parameters:

package_name (str) – The name of the package to check.

Returns:

True if the package is installed, otherwise False.

Return type:

bool

Examples

>>> from geoprior.utils.sys_utils import is_package_installed
>>> is_package_installed("numpy")
True
geoprior.utils.sys_utils.is_path_accessible(path, permissions='r')[source]#

Checks if a specified path is accessible with the given permissions.

Parameters:
  • path (str) – The path to check for accessibility.

  • permissions (str, optional) – The permission types to check: ‘r’ for read, ‘w’ for write, ‘x’ for execute. Multiple permissions can be specified, e.g., “rw”. Default is “r”.

Returns:

accessible – True if the path is accessible with the specified permissions, otherwise False.

Return type:

bool

Notes

This function verifies file permissions in the current user context, ensuring flexibility for multi-user environments.

Examples

>>> from geoprior.utils.sys_utils import is_path_accessible
>>> is_path_accessible("/path/to/file", permissions="rw")
True
geoprior.utils.sys_utils.is_port_open(port)[source]#

Checks if a specified network port is open or occupied on the local machine.

Parameters:

port (int) – The port number to check for availability.

Returns:

Returns True if the port is open (not in use), otherwise False.

Return type:

bool

Notes

This function uses a socket connection to check if the specified port is open. It is helpful in applications where network services or applications need to bind to a specific port.

Raises:

ValueError – If an invalid port number is provided.

Parameters:

port (int)

Return type:

bool

Examples

>>> from geoprior.utils.sys_utils import is_port_open
>>> is_port_open(8080)
False
geoprior.utils.sys_utils.manage_env_variable(var_name, value=None, default=None, action='get', file_path=None, overwrite=False)[source]#

Manages environment variables, allowing retrieval, setting, or loading from a .env file.

Parameters:
  • var_name (str) – The name of the environment variable to retrieve, set, or load.

  • value (str, optional) – The value to set for the environment variable. Only used if action is “set”. Default is None.

  • default (str, optional) – The default value to return if the environment variable var_name is not found when action is “get”. If None, returns None when the variable is not found. Default is None.

  • action (str, default "get") –

    The action to perform. Options are:
    • ”get”: Retrieves the environment variable var_name.

    • ”set”: Sets the environment variable var_name to value.

    • ”load”: Loads environment variables from a .env file specified by file_path.

  • file_path (str, optional) – The path to the .env file to load variables from when action is “load”. Required if action is “load”.

  • overwrite (bool, default False) – If True, allows overwriting existing environment variables when action is “load” or “set”. If False, preserves the current value of existing environment variables.

Returns:

result

  • If action is “get”, returns the value of the environment variable var_name or default if the variable is not set.

  • If action is “set” or “load”, returns None.

Return type:

str or None

Notes

  • This function is useful for managing configuration data securely by utilizing environment variables.

  • Loading from a .env file allows you to define multiple variables in a single file, each defined in the KEY=VALUE format.

Raises:
  • ValueError – If action is “set” and value is not provided, or if action is “load” and file_path is not specified.

  • FileNotFoundError – If action is “load” and file_path does not exist.

Parameters:
  • var_name (str)

  • value (str | None)

  • default (str | None)

  • action (str)

  • file_path (str | None)

  • overwrite (bool)

Return type:

str | None

Examples

>>> from geoprior.utils.sys_utils import manage_env_variable
>>> manage_env_variable('HOME', action='get')
'/home/username'
>>> manage_env_variable('NEW_VAR', value='new_value', action='set')
>>> manage_env_variable('NEW_VAR', action='get')
'new_value'
>>> manage_env_variable('NON_EXISTENT_VAR', default='default_value', action='get')
'default_value'
>>> manage_env_variable(
    var_name='', action='load', file_path='/path/to/.env', overwrite=True)

See also

os.getenv

Retrieves environment variables.

os.environ

Provides access to the environment variables.

geoprior.utils.sys_utils.manage_file_lock(file_path, action='lock', blocking=True, exclusive=True)[source]#

Manages file locking and unlocking to prevent concurrent access.

This function allows both locking and unlocking actions on a file to prevent or allow concurrent access. It opens the file and applies an exclusive lock or shared lock, depending on the parameters specified.

Parameters:
  • file_path (str) – Path to the file that needs to be locked or unlocked.

  • action (str, default "lock") – Specifies the action to perform: “lock” to acquire a lock, or “unlock” to release a previously acquired lock.

  • blocking (bool, default True) – If True, the lock will block until it can be acquired. If False, the lock will raise an exception if it cannot be acquired immediately.

  • exclusive (bool, default True) – If True, an exclusive lock is applied. If False, a shared lock is applied (other processes can read the file simultaneously).

Returns:

file_descriptor – If action is “lock”, returns the file descriptor on success; otherwise, None if action is “unlock” or if locking fails.

Return type:

int or None

Notes

This function uses the fcntl module for locking, which is only available on Unix-based systems. The lock is maintained as long as the file descriptor remains open.

  • For “lock”, the function opens the file and applies a lock.

  • For “unlock”, it removes the lock and closes the file descriptor.

Raises:
  • ValueError – If the action parameter is not one of “lock” or “unlock”.

  • OSError – If locking or unlocking the file fails.

Parameters:
Return type:

int | None

Examples

>>> from geoprior.utils.sys_utils import manage_file_lock
>>> fd = manage_file_lock("/path/to/file", action="lock", blocking=True)
>>> if fd:
...     print("File is locked.")
...     manage_file_lock(fd, action="unlock")
...     print("File is unlocked.")

See also

os.open

Opens a file descriptor.

fcntl.flock

Applies or removes file locks.

geoprior.utils.sys_utils.manage_temp(suffix='', prefix='tmp', action='create_file', directory=None, clean_all=False)[source]#

Manages temporary files and directories by creating, accessing, or cleaning them as needed.

Parameters:
  • suffix (str, optional) – Suffix for the temporary file or directory, used only if action is “create_file” or “create_dir”. Default is an empty string.

  • prefix (str, optional) – Prefix for the temporary file or directory, used only if action is “create_file” or “create_dir”. Default is “tmp”.

  • action (str, default "create_file") –

    Specifies the operation to perform. Options include:
    • ”create_file”: Creates a temporary file and returns its path.

    • ”create_dir”: Creates a temporary directory and returns its path.

    • ”clean”: Cleans temporary files in the specified directory or the system temp directory if none is provided.

  • directory (str, optional) – Directory to clean when action is “clean”. If None, uses the system’s default temporary directory. Ignored for file or directory creation actions.

  • clean_all (bool, default False) – If True, removes all files and directories within the specified directory. If False, only deletes files or directories created by this process. Used only when action is “clean”.

Returns:

temp_path

  • For “create_file” and “create_dir” actions, returns the path of the created file or directory.

  • For “clean”, returns None.

Return type:

str or None

Raises:

ValueError – If an invalid action is specified.

Notes

This function is useful for managing temporary resources in data processing tasks, where files or directories need to be created and cleaned up after use.

Examples

>>> from geoprior.utils.sys_utils import manage_temp
>>> temp_file = manage_temp(action="create_file")
>>> print(temp_file)
'/tmp/tmpabcd1234'
>>> temp_dir = manage_temp(action="create_dir", prefix="data_")
>>> print(temp_dir)
'/tmp/data_abcd1234'
>>> manage_temp(action="clean", directory="/path/to/temp", clean_all=True)

See also

tempfile

Module for creating temporary files and directories.

shutil

High-level file operations.

geoprior.utils.sys_utils.parallelize_jobs(function, tasks=(), n_jobs=None, executor_type='process')[source]#

Parallelize the execution of a callable across multiple processors, supporting both positional and keyword arguments.

Parameters:
  • function (Callable[..., Any]) – The function to execute in parallel. This function must be picklable if using executor_type=’process’.

  • tasks (Sequence[Dict[str, Any]], optional) – A sequence of dictionaries, where each dictionary contains two keys: ‘args’ (a tuple) for positional arguments, and ‘kwargs’ (a dict) for keyword arguments, for one execution of function. Defaults to an empty sequence.

  • n_jobs (Optional[int], optional) – The number of jobs to run in parallel. None or 1 uses a single processor, any positive integer specifies the exact number of processors to use, -1 uses all available processors. Default is None (1 processor).

  • executor_type (str, optional) – The type of executor to use. Can be ‘process’ for CPU-bound tasks or ‘thread’ for I/O-bound tasks. Default is ‘process’.

Returns:

A list of results from the function executions.

Return type:

list

Raises:

ValueError – If function is not picklable when using ‘process’ as executor_type.

Examples

>>> from geoprior.utils.sys_utils import parallelize_jobs
>>> def greet(name, greeting='Hello'):
...     return f"{greeting}, {name}!"
>>> tasks = [
...     {'args': ('John',), 'kwargs': {'greeting': 'Hi'}},
...     {'args': ('Jane',), 'kwargs': {}}
... ]
>>> results = parallelize_jobs(greet, tasks, n_jobs=2)
>>> print(results)
['Hi, John!', 'Hello, Jane!']
geoprior.utils.sys_utils.represent_callable(obj, skip=None)[source]#

Represent callable objects by formatting their signatures.

This function generates a string representation of a callable object’s signature, including parameters and default values. It supports classes, functions, and instance methods.

Parameters:
  • obj (callable) – The callable object to format.

  • skip (str or list of str, optional) – Parameter names to skip in the representation. Useful for omitting certain attributes.

Returns:

representation – A string representation of the callable object’s signature.

Return type:

str

Raises:

TypeError – If obj is not a callable object.

Examples

>>> from geoprior.utils.sys_utils import represent_callable
>>> def example_function(a, b=2):
...     pass
>>> represent_callable(example_function)
'example_function(a, b=2)'
>>> class ExampleClass:
...     def __init__(self, x, y=10):
...         self.x = x
...         self.y = y
>>> represent_callable(ExampleClass)
'ExampleClass(x, y=10)'
>>> instance = ExampleClass(5)
>>> represent_callable(instance)
'ExampleClass(x=5, y=10)'

Notes

This function is useful for logging or displaying the parameters of callable objects in a readable format.

See also

inspect.signature

Get a signature object for the callable.

geoprior.utils.sys_utils.run_command(command, capture_output=True)[source]#

Runs a shell command and optionally captures its output.

Parameters:
  • command (str) – The shell command to execute.

  • capture_output (bool, default True) – If True, captures and returns the command’s output. If False, runs the command without capturing output, which is useful for commands that produce a large output or run interactively.

Returns:

output – Returns the command output as a string if capture_output is True. If capture_output is False, returns None.

Return type:

str or None

Notes

This function uses subprocess.run to execute shell commands, which allows for error handling and logging. For example, run_command("echo Hello World") returns "Hello World\n" when capture_output=True.

Raises:

subprocess.CalledProcessError – If the command exits with a non-zero status and capture_output is True.

Parameters:
  • command (str)

  • capture_output (bool)

Return type:

str | None

geoprior.utils.sys_utils.safe_getattr(obj, name, default_value=None)[source]#

Safely get an attribute from an object, with a helpful error message.

This function attempts to retrieve an attribute from the given object. If the attribute is not found, it can return a default value or raise an AttributeError with a suggestion for a similar attribute.

Parameters:
  • obj (object) – The object from which to retrieve the attribute.

  • name (str) – The name of the attribute to retrieve.

  • default_value (any, optional) – A default value to return if the attribute is not found. If None, an AttributeError will be raised.

Returns:

value – The value of the retrieved attribute or the default value.

Return type:

any

Raises:

AttributeError – If the attribute is not found and no default value is provided.

Examples

>>> from geoprior.utils.sys_utils import safe_getattr
>>> class MyClass:
...     def __init__(self, a, b):
...         self.a = a
...         self.b = b
>>> obj = MyClass(1, 2)
>>> safe_getattr(obj, 'a')
1
>>> safe_getattr(obj, 'c', default_value='default')
'default'
>>> safe_getattr(obj, 'c')
Traceback (most recent call last):
    ...
AttributeError: 'MyClass' object has no attribute 'c'. Did you mean 'a'?

Notes

This function enhances the built-in getattr by providing helpful suggestions when an attribute is not found.

See also

getattr

Built-in function to get an attribute from an object.

geoprior.utils.sys_utils.safe_optimize(func=None, *, parallelize=True, memory_cleanup=False, log_level=20, optimize_cpu=True, num_processes=None, cpu_cores=None, verbose=True, mode='strict')[source]#

Optimizes the workflow by wrapping a function to measure execution time, enable parallelization, manage resources, and perform memory cleanup and acts similary like class-based decorator WorflowOptimizer.

Class-based decorators can sometimes encounter issues when trying to pickle certain objects, especially in parallel execution contexts. This issue arises because certain objects (such as file handles, open network connections, or non-serializable class instances) cannot be passed between processes in multiprocessing environments. By ensuring compatibility with these contexts, safe_optimize helps mitigate such issues and optimize the execution of computationally intensive workflows.

This decorator is particularly suitable for workflows involving large-scale computations, such as data processing pipelines, machine learning model training, or simulations, where parallel execution and resource optimization are crucial for performance improvement.

Parameters:
  • parallelize (bool, optional) – Flag to enable or disable parallel processing (default is True).

  • memory_cleanup (bool, optional) – Whether to clean up system memory after execution (default is False).

  • log_level (int, optional) – Level of logging (default is logging.INFO). Set to logging.DEBUG for more detailed logs.

  • optimize_cpu (bool, optional) – Whether to optimize CPU core usage (default is True).

  • num_processes (Optional[int], optional) – The number of parallel processes for execution (default is None).

  • cpu_cores (Optional[List[int]], optional) – Specify a list of CPU cores to restrict the process (default is None).

  • verbose (bool, optional) – Whether to print detailed logs during execution (default is True).

  • mode (str, optional) – Mode for handling pickling issues: 'strict' to raise errors, or 'soft' to fallback to sequential execution with warnings (default is 'strict').

  • func (Callable | None)

Returns:

decorator – The wrapped function that includes optimization strategies.

Return type:

Callable

Raises:

ValueError – If an unsupported mode is specified.

Examples

>>> from geoprior.utils.sys_utils import safe_optimize
>>> @safe_optimize(
...     parallelize=True,
...     memory_cleanup=True,
...     log_level=logging.DEBUG,
...     optimize_cpu=True,
...     num_processes=4,
...     cpu_cores=[0, 1, 2, 3],
...     verbose=True,
...     mode='soft'
... )
... def process_data(data):
...     # Your data processing logic here
...     return [d * 2 for d in data]
>>> data = [1, 2, 3, 4, 5]
>>> results = process_data(data)
>>> print(results)
[2, 4, 6, 8, 10]

Notes

  • This decorator uses multiprocessing for parallel execution, which may not be suitable for all environments, especially those that do not support forking (e.g., some Windows configurations).

  • Ensure that the decorated function and its arguments are picklable when using parallelization.

  • The mode parameter allows handling non-picklable objects gracefully.

See also

multiprocessing.Pool

For parallel task execution.

psutil

For system and process utilities.

functools.wraps

For preserving metadata of decorated functions.

geoprior.utils.sys_utils.system_uptime()[source]#

Retrieves the system uptime, which is the duration the system has been running since the last boot, in a human-readable format.

Returns:

uptime – System uptime in the format “Xd:Yh:Zm:Ws”, where X, Y, Z, and W represent days, hours, minutes, and seconds, respectively.

Return type:

str

Notes

This function is cross-platform and works on Windows, macOS (Darwin), and Linux. It uses different commands to retrieve uptime based on the operating system.

Raises:
Return type:

str

Examples

>>> from geoprior.utils.sys_utils import system_uptime
>>> system_uptime()
'2d:10h:33m:12s'
geoprior.utils.sys_utils.build_large_df(forecast_results, dt_col, tname, spatial_cols=None, chunk_size=None, verbose=0)[source]#

Construct memory-optimized DataFrame from large forecast results using chunked processing.

Implements dynamic chunk sizing and dtype optimization to handle datasets exceeding available memory. Uses temporary storage and parallel processing for efficient resource utilization.

If pyarrow is installed, the function uses parquet I/O; otherwise,

CSV files are used as a fallback.

Parameters:
  • forecast_results (List[Dict]) – Input data as list of dictionary records. Each dictionary represents a row with column-value pairs. Minimum 1000 entries recommended for chunking benefits.

  • dt_col (str) – Name of temporal column. Accepts numeric years (e.g., 2023) or datetime strings. Automatic type detection with fallback to numpy.int32 for years >200000.

  • tname (str) – Target variable prefix for prediction columns. Quantile columns are expected in the form f"{tname}_q{quantile}" such as "subs_q10", while point predictions use f"{tname}_pred".

  • spatial_cols (List[str], optional) – Geographic columns (e.g., ['longitude', 'latitude']). Auto-detects categorical ( <10% unique values) vs continuous spatial data, using pandas.Category or numpy.float32 dtypes respectively.

  • chunk_size (int, optional) –

    Maximum rows per chunk. Auto-calculated using

    (3)\[C_{optimal} = \min\left(10^5, \frac{0.8M_{free}}{S_{row}}\right)\]

    where \(M_{free}\) is available memory in bytes and \(S_{row}\) is the estimated row size, assumed to be about 1 KB by default.

  • verbose (int, default 0) – Logging verbosity. Use 0 for silent mode, 1 for memory reports, 2 for chunk diagnostics, and 3 for per-chunk metrics.

Returns:

Combined DataFrame with optimized dtypes, preserving original column order. Memory footprint reduced by 40-60% compared to naive construction.

Return type:

pd.DataFrame

Examples

>>> from geoprior.utils.sys_utils import build_large_df
>>> import numpy as np

# Basic usage with 1M rows >>> data = [{‘year’: y, ‘value_q50’: np.random.randn()} … for y in range(2010, 2020) for _ in range(100000)] >>> df = build_large_df(data, dt_col=’year’, tname=’value’)

# With spatial columns >>> geo_data = [{‘lat’: np.random.uniform(-90, 90), … ‘lon’: np.random.uniform(-180, 180), … ‘pred’: np.random.randn()} … for _ in range(500000)] >>> df = build_large_df(geo_data, dt_col=’date’, tname=’pred’, … spatial_cols=[‘lat’, ‘lon’], verbose=2)

Notes

Key implementation features include dynamic chunk adjustment using psutil.virtual_memory(), concurrent chunk reading with ThreadPoolExecutor when many chunks are present, dtype inference for temporal and spatial columns, and guaranteed tempfile cleanup via try...finally blocks.

See also

pd.DataFrame

Base DataFrame construction

pd.concat

Chunk aggregation method

geoprior.nn.utils.generate_forecast

Primary data source

geoprior.utils.memory_optimizer.reduce_mem_usage

Detailed dtype optimization