Source code for mufasa.utils.memory

"""
This module provides tools to monitor memory usage and
estimate the memory allocations needed

"""

import psutil  # To detect system memory
import os
import threading
import functools
import time

from spectral_cube import DaskSpectralCube
from dask.array import Array as daArray

from .mufasa_log import get_logger
logger = get_logger(__name__)

[docs] def monitor_peak_memory(output_attr=None, key=None, unit='GB'): """ Decorator to monitor and record the peak memory usage of a function. This decorator measures the peak memory usage, including intermediate peaks, during the execution of a function. It supports multi-threaded tasks and can store results in an instance attribute or print them. Parameters ---------- output_attr : str or None, optional The name of the instance attribute to store the peak memory usage. If `None`, the memory usage is printed. If specified and a key is provided, the attribute is treated as a dictionary. Default is `None`. key : str or None, optional The dictionary key to use when storing memory usage in the attribute specified by `output_attr`. If `None`, the value is stored directly in the attribute. Default is `None`. unit : {'KB', 'MB', 'GB', 'TB'}, default='GB' The unit for reporting memory usage. If an unrecognized unit is given, a warning is logged, and the default ('GB') is used. Returns ------- callable A decorator that monitors peak memory usage for the decorated function. Raises ------ ValueError If `output_attr` is specified as a non-dictionary attribute and a `key` is also provided. Examples -------- Monitor memory usage and print results: >>> @monitor_peak_memory() ... def my_function(self): ... # Function logic here ... pass Store memory usage in an instance attribute: >>> @monitor_peak_memory(output_attr='memory_usage', key='task1') ... def my_function(self): ... # Function logic here ... pass >>> self.memory_usage['task1'] # Access the stored peak memory Use a custom unit for memory usage: >>> @monitor_peak_memory(unit='MB') ... def my_function(self): ... pass """ ipw = 3 if unit == 'KB': ipw = 1 elif unit == 'MB': ipw = 2 elif unit == 'TB': ipw = 4 elif unit != 'GB': logger.warning(f"unit {unit} is not reconized, defaulting to GB") unit = 'GB' def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): process = psutil.Process(os.getpid()) peak_memory = [0] # Use a list to allow modification within the thread # determine unit conversion def monitor(): """Continuously monitor memory usage and record the peak.""" while monitoring[0]: try: current_memory = process.memory_info().rss / (1024 ** ipw) # Memory in MB peak_memory[0] = max(peak_memory[0], current_memory) time.sleep(0.1) # Sample every 100ms except psutil.NoSuchProcess: break # Start monitoring in a separate thread monitoring = [True] monitor_thread = threading.Thread(target=monitor) monitor_thread.start() try: # Execute the function result = func(self, *args, **kwargs) finally: # Stop monitoring and wait for the thread to finish monitoring[0] = False monitor_thread.join() # Store peak memory in the specified attribute or print it if output_attr is not None: if not hasattr(self, output_attr): setattr(self, output_attr, {} if key else None) attr_value = getattr(self, output_attr) if isinstance(attr_value, dict) and key is not None: attr_value[key] = peak_memory[0] elif key is None: setattr(self, output_attr, peak_memory[0]) else: raise ValueError(f"{output_attr} must be a dictionary when key is specified.") else: print(f"Peak memory usage for '{func.__name__}': {peak_memory[0]:.2f} {unit}") return result return wrapper return decorator
[docs] def peak_memory(output_container=None): """ Decorator to monitor and display the peak memory usage of a function, including intermediate peaks, and handle multi-threaded tasks. Args: output_container (list or dict, optional): A mutable object where the peak memory usage will be stored. If None, it defaults to printing the peak memory usage. Returns: function: A wrapped function that reports peak memory usage. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): process = psutil.Process(os.getpid()) peak_memory = [0] # Use a list to allow modification within the thread def monitor(): """Continuously monitor memory usage and record the peak.""" while monitoring[0]: try: current_memory = process.memory_info().rss / (1024 ** 2) # Memory in MB peak_memory[0] = max(peak_memory[0], current_memory) time.sleep(0.1) # Sample every 100ms except psutil.NoSuchProcess: break # Start monitoring in a separate thread monitoring = [True] monitor_thread = threading.Thread(target=monitor) monitor_thread.start() try: # Execute the function result = func(*args, **kwargs) finally: # Stop monitoring and wait for the thread to finish monitoring[0] = False monitor_thread.join() # Store peak memory in the provided container or print it if output_container is not None: if isinstance(output_container, list) and len(output_container) > 0: output_container[0] = peak_memory[0] elif isinstance(output_container, dict): output_container['peak_memory'] = peak_memory[0] else: raise ValueError("output_container must be a list or dict.") else: print(f"Peak memory usage for '{func.__name__}': {peak_memory[0]:.2f} MB") return result return wrapper return decorator
[docs] def calculate_target_memory(multicore, use_total=False, max_usable_fraction=0.85): """ Calculate target memory per core for chunked computations. Parameters ---------- multicore : int Number of cores available for computation. Memory is divided evenly among these cores. use_total : bool, optional, default=False If `True`, calculate based on the total system memory. If `False`, use available memory instead. max_usable_fraction : float, optional, default=0.85 Maximum fraction of memory to allocate for computation. For example, `0.85` means 85% of memory is considered usable. Returns ------- target_memory_mb : float Target memory per core, in megabytes (MB). Examples -------- Calculate target memory per core using available system memory: >>> calculate_target_memory(multicore=4, use_total=False, max_usable_fraction=0.9) 4096.0 # Example value, system-dependent Notes ----- - If `use_total=True`, the calculation includes memory currently in use by other processes. - Ensure `multicore > 0` to avoid division errors. - The calculated memory assumes even distribution across all cores. """ memory_info = psutil.virtual_memory() memory_to_use = memory_info.total if use_total else memory_info.available usable_memory = memory_to_use * max_usable_fraction # Divide usable memory by the number of cores target_memory_mb = usable_memory / multicore / (1024 * 1024) # Convert to MB return target_memory_mb
[docs] def calculate_dask_memory_limit(n_workers): """ Mimic Dask's default memory limit setting. Parameters: - n_workers (int): Number of workers to divide the memory among. Returns: - memory_limit (float): Memory limit per worker in bytes. """ if n_workers <= 0: raise ValueError("Number of workers must be greater than 0.") # Get total system memory in bytes total_memory = psutil.virtual_memory().total # Divide by number of workers memory_limit_per_worker = total_memory / n_workers return memory_limit_per_worker
[docs] def get_system_free_memory(): mem = psutil.virtual_memory() return mem.available / 1e9 # Convert to GB
[docs] def get_size_mb(array): # calculate the size of a ndarray in MB size_in_bytes = array.size * array.itemsize return size_in_bytes / (1024 ** 2)
[docs] def tmp_save_gauge(cube, factor=20, max_mem_gb=0.3): """ Return whether or not it's worth DaskSpectralCube results temporary Based on how much free memory is currenlty left Note: when data is chunked properly, mufasa shouldn't need memory larger than 20 times cube size """ if isinstance(cube, DaskSpectralCube): data = cube._data elif isinstance(cube, daArray): data = cube else: raise TypeError(f"cube type {type(cube)} is invalid") data_size = get_size_mb(data) # in MB mem_free = get_system_free_memory()*1e3 # in MB # advice to save results temporary if return data_size*1e3 > max_mem_gb or (data_size * factor > mem_free)