"""
The `mufasa.signals` module provides robust tools for preprocessing spectral data, including RMS estimation,
signal masking, spatial trimming, and moment calculations.
"""
__author__ = "Mike Chen"
import gc
#=======================================================================================================================
import numpy as np
from spectral_cube.masks import LazyComparisonMask
from skimage.morphology import binary_dilation, remove_small_objects, binary_erosion, disk, binary_opening, ball
import astropy.units as u
#=======================================================================================================================
# Import in-house functions
from .utils.mufasa_log import get_logger
logger = get_logger(__name__)
#=======================================================================================================================
[docs]
def get_snr(cube, rms=None, **kwargs):
"""
Calculate the peak signal-to-noise ratio of the cube.
Parameters
----------
cube : SpectralCube
SpectralCube object.
rms : float or None, optional
Root mean square of the map. If None, calculate using `get_rms_robust()`.
**kwargs
Additional arguments for RMS calculation.
Returns
-------
Quantity
Peak signal-to-noise ratio.
"""
if rms is None:
rms = get_rms_robust(cube, **kwargs)
return cube.max(axis=0) / rms #, how='ray'
[docs]
def get_rms_robust(cube, sigma_cut=2.5, expand=20, trim=None, method='robust', return_sigmask=False, **kwargs):
"""
Make a robust RMS estimate.
Parameters
----------
cube : SpectralCube
SpectralCube object.
sigma_cut : float, optional
Signal-to-noise ratio to mask out regions containing signal.
expand : int, optional
Number of spectral channels to expand the signal mask.
trim : int or None, optional
Number of pixels to trim from the edges of the cube.
method : {'robust', 'signal_mask'}, optional
Method to calculate RMS.
return_sigmask : bool, optional
Whether to return the 3D signal mask along with the RMS.
**kwargs
Additional arguments.
Returns
-------
Quantity or tuple of Quantity
RMS value, and optionally the signal mask.
"""
if trim is not None:
cube = trim_cube_edge(cube, trim)
if method == 'robust':
rms = cube.mad_std(axis=0) #how='ray'
# Refine RMS by masking signal regions
rms, sig_mask = refine_rms(cube, rms, sigma_cut=sigma_cut, expand=expand)
elif method == 'signal_mask':
# Use signal mask method to calculate RMS
sig_mask = get_signal_mask(cube, sigma_cut=sigma_cut, **kwargs)
noise_cube = cube.with_mask(~sig_mask & cube.mask.include())
rms = noise_cube.mad_std(axis=0) #how='ray'
if return_sigmask:
return rms, sig_mask
else:
return rms
[docs]
def refine_rms(cube, rms, sigma_cut=3, expand=20):
"""
Refine the RMS estimate by masking out signal regions.
Parameters
----------
cube : SpectralCube
SpectralCube object.
rms : Quantity
Initial RMS estimate.
sigma_cut : float, optional
Signal-to-noise threshold to mask signals.
expand : int, optional
Number of spectral channels to expand the signal mask.
Returns
-------
Quantity
Refined RMS.
ndarray
Signal mask.
"""
# Mask above the cut threshold
sig_mask = (cube > sigma_cut * rms)
sig_mask = binary_dilation(sig_mask.include(), np.ones(shape=(expand, 1, 1), dtype=bool))
noise_cube = cube.with_mask(~sig_mask & cube.mask.include())
rms = noise_cube.mad_std(axis=0) #how='ray'
return rms, sig_mask
[docs]
def get_signal_mask(cube, snr_cut=3, minsize=2, expand=20, clean=True, return_rms=False):
"""
Provide a 3D mask indicating signal regions based on RMS and SNR threshold.
Parameters
----------
cube : SpectralCube
SpectralCube object.
snr_cut : float, optional
Signal-to-noise threshold to define signals.
minsize : int, optional
Minimum size of connected components in spectral direction.
expand : int, optional
Number of spectral channels to expand signal regions.
clean : bool, optional
Remove spatially isolated pixels.
return_rms : bool, optional
Whether to return RMS along with the signal mask.
Returns
-------
ndarray
Signal mask.
Quantity, optional
RMS value if `return_rms` is True.
"""
rms = cube.mad_std(axis=0) #how='ray'
# Mask above the cut threshold
sig_mask = (cube > snr_cut * rms)
sig_mask = refine_signal_mask(sig_mask, minsize=minsize, expand=expand, clean=clean)
if return_rms:
return sig_mask, rms
else:
return sig_mask
[docs]
def refine_signal_mask(sig_mask, minsize=2, expand=20, clean=True):
"""
Refine a signal mask by removing noisy features and expanding the mask.
Parameters
----------
sig_mask : LazyComparisonMask or ndarray
Initial signal mask.
minsize : int, optional
Minimum number of continuous spectral channels for signal mask.
expand : int, optional
Number of spectral channels to expand signal regions.
clean : bool, optional
Remove spatially isolated pixels.
Returns
-------
ndarray
Refined signal mask.
"""
if isinstance(sig_mask, LazyComparisonMask):
sig_mask = sig_mask.include()
# Remove voxels that are likely just statistical noise
sig_mask = binary_erosion(sig_mask, np.ones(shape=(minsize, 1, 1), dtype=bool))
sig_mask = remove_small_objects(sig_mask, 1)
# Expand the "signal" regions a bit
struct = np.ones(shape=(expand, 1, 1), dtype=bool)
sig_mask = binary_dilation(sig_mask, struct)
sig_mask = binary_dilation(sig_mask, disk(3)[np.newaxis, :])
if clean:
# Remove isolated pixels
sig_mask *= remove_small_objects(binary_erosion(np.any(sig_mask, axis=0)), 1)
return sig_mask
[docs]
def trim_cube_edge(cube, trim=3, min_size=1000):
"""
Remove spatial edges from a cube.
Parameters
----------
cube : SpectralCube
SpectralCube object.
trim : int, optional
Number of pixels to trim from edges.
min_size : int, optional
Minimum size of cube for trimming to be applied.
Returns
-------
SpectralCube
Trimmed cube.
"""
mask = cube.mask.include()
planemask = np.any(mask, axis=0)
if np.sum(planemask) > min_size:
# Trim edges using a 2D mask
planemask = trim_edge(planemask, trim=trim)
mask[:, ~planemask] = False
return cube.with_mask(mask)
else:
return cube
[docs]
def trim_edge(mask, trim=3):
"""
Trim edges using a 2D mask.
Parameters
----------
mask : ndarray
2D mask.
trim : int, optional
Number of pixels to trim.
Returns
-------
ndarray
Trimmed mask.
"""
# Set all edge pixels to False
mask[[0, -1], :] = False
mask[:, [0, -1]] = False
return binary_erosion(mask, disk(trim))
[docs]
def v_estimate(cube, rms, snr_cut=3):
"""
Estimate the velocity centroid based on peak emission.
Parameters
----------
cube : SpectralCube
SpectralCube object.
rms : Quantity
Root mean square of the map.
snr_cut : float, optional
Signal-to-noise threshold for quality control.
Returns
-------
Quantity
Velocity centroid estimate.
"""
sig_mask = cube > rms * snr_cut
# Clean the mask to remove noisy features
sig_mask = binary_erosion(sig_mask.include(), ball(2))
return get_v_at_peak(cube.with_mask(sig_mask))
[docs]
def get_v_at_peak(cube_masked):
"""
Find the velocity corresponding to the peak emission.
Parameters
----------
cube_masked : SpectralCube
SpectralCube object with mask.
Returns
-------
ndarray
Velocity at peak emission.
"""
idx_max = cube_masked.argmax(axis=0)#, how='slice')
v_est = cube_masked.spectral_axis[idx_max]
# Set velocity to NaN where no valid data is present
v_est[~np.any(cube_masked.get_mask_array(), axis=0)] = np.nan
return v_est.value
[docs]
def get_v_mask(cube_masked, v_center, rms=None, window_hwidth=5):
"""
Return a mask centered on a reference velocity with a spectral window.
Parameters
----------
cube_masked : SpectralCube
SpectralCube object with mask.
v_center : Quantity
Reference velocity at center of spectral window.
rms : Quantity or None, optional
Root mean square of the map.
window_hwidth : float, optional
Half-width of spectral window in km/s.
Returns
-------
ndarray
Velocity mask.
"""
snr_cut = 3
# Get window centered on v_center with a half-width of window_hwidth
window_mask = (cube_masked.spectral_axis[:, np.newaxis, np.newaxis] < (v_center + window_hwidth) * u.km / u.s) & \
(cube_masked.spectral_axis[:, np.newaxis, np.newaxis] > (v_center - window_hwidth) * u.km / u.s)
if rms is not None:
# Remove voxels that are likely noise
mask_new = binary_opening((cube_masked > rms * snr_cut).include())
return mask_new & window_mask
else:
return cube_masked.get_mask_array() & window_mask
[docs]
def get_moments(cube, window_hwidth=5, linewidth_sigma=True, trim=3, return_rms=False, central_win_hwidth=None,
**kwargs):
"""
Calculate moments of the signals in a cube.
Parameters
----------
cube : SpectralCube
SpectralCube object.
window_hwidth : float, optional
Half-width of spectral window for moment calculation.
linewidth_sigma : bool, optional
Whether to calculate linewidth sigma instead of moment 2 (see spectral_cube documentation for further details).
trim : int, optional
Number of pixels to trim from edges of the cube.
return_rms : bool, optional
Whether to return RMS along with moments.
central_win_hwidth : float, optional
If provided, re-evulate velocity at peak using a window centered on the mode of the initial velocity estimate, with a
half-wdith of central_win_hwidth. Note that window is the same for all pixels, but only applied as an intermediate step.
This should only used if hyperfine lines far from the main hyperfines have a comprable brightness.
**kwargs
Additional arguments.
Returns
-------
tuple of Quantity
Tuple of moment maps (moment 0, moment 1, linewidth or moment 2, optionally RMS).
"""
snr_cut = 3
noise_mask_cut = 2.5
# Trim cube edges
if trim is not None:
cube = trim_cube_edge(cube, trim=trim)
rms = get_rms_robust(cube, sigma_cut=noise_mask_cut, **kwargs)
v_peak = v_estimate(cube, rms, snr_cut=snr_cut)
if central_win_hwidth is not None:
# impose a strick window based on the esitmated mode velocity to re-evaulate v_peak
v_mode = estimate_mode(v_peak, bins=25)
signal_mask_tmp = get_v_mask(cube, v_center=v_mode, rms=rms, window_hwidth=central_win_hwidth)
cube_signal_tmp = cube.with_mask(signal_mask_tmp)
v_peak = v_estimate(cube_signal_tmp, rms, snr_cut=snr_cut)
del signal_mask_tmp
del cube_signal_tmp
gc.collect()
# Get velocity mask centered on v_peak with window_hwidth
signal_mask = get_v_mask(cube, v_peak, rms=rms, window_hwidth=window_hwidth)
# Expand the footprint a bit
signal_mask = binary_dilation(signal_mask, disk(3)[np.newaxis, :])
cube_signal = cube.with_mask(signal_mask)
mom0 = cube_signal.moment0()
mom1 = cube_signal.moment1()
output = (mom0, mom1)
if linewidth_sigma:
output += (cube_signal.linewidth_sigma(),)
else:
output += (cube_signal.moment2(),)
if return_rms:
output += (rms,)
return output
[docs]
def estimate_mode(data, bins=25):
"""
Estimate the mode of the data using a histogram.
Parameters
----------
data : ndarray
The data for which to estimate the mode. Non-finite values will be ignored.
bins : int, optional
Number of bins to use for the histogram. Default is 25.
Returns
-------
float
The estimated mode of the data.
"""
data=data[np.isfinite(data)]
counts, bins = np.histogram(data, bins=bins)
# Find the bin with the maximum frequency
max_bin_index = np.argmax(counts)
return (bins[max_bin_index] + bins[max_bin_index + 1]) / 2