Source code for mufasa.moment_guess

"""
The `mufasa.moment_guess` module provides utilities for generating initial guesses, refining moment maps,
and handling physical parameter calculations such as excitation temperature and
optical depth.
"""

from __future__ import print_function
import numpy as np
import copy

from astropy import units as u
from spectral_cube import SpectralCube

import pyspeckit
from pyspeckit.spectrum.models.ammonia_constants import (ckms, h, kb)
from astropy.stats import mad_std

from .utils import map_divide
import multiprocessing
from .utils.multicore import validate_n_cores
#=======================================================================================================================
from .utils.mufasa_log import get_logger
logger = get_logger(__name__)
#=======================================================================================================================


[docs] class LineSetup(object): def __init__(self, linetype='nh3', new_recipe=True): if linetype == 'nh3': # define max and min values of tex and tau to use for the test # a spectrum with tex and tau values both below the specified minima has an intensity below the expected GAS rms if new_recipe: self.tex_max = 8.0 self.tau_max = 5.0 self.tex_min = 3.0 self.tau_min = 0.1 else: self.tex_max = 8.0 self.tau_max = 1.0 self.tex_min = 3.1 self.tau_min = 0.3 elif linetype == 'n2hp': # define max and min values of tex and tau to use for the test if new_recipe: self.tex_max = 8.0 self.tau_max = 5.0 self.tex_min = 3.0 self.tau_min = 0.1 else: self.tex_max = 8.0 self.tau_max = 1.0 self.tex_min = 3.1 self.tau_min = 0.3 else: raise Exception("{} is an invalid linetype".format(linetype))
#=======================================================================================================================
[docs] def master_guess(spectrum, ncomp, sigmin=0.07, v_peak_hwidth=3.0, v_atpeak=None, widewVSep=False, snr_cut=3, signal_mask=None, linetype='nh3'): m0, m1, m2 = window_moments(spectrum, window_hwidth=v_peak_hwidth, v_atpeak=v_atpeak, signal_mask=signal_mask) # estimate the rms level, and pass to the spectrum (probably should've been performed in another function) rms = get_rms_prefit(spectrum, window_hwidth=v_peak_hwidth, v_atpeak=m1) if m0 < snr_cut*rms: gg = np.zeros((ncomp * 4,) + np.array([m1], dtype=object).shape) gg[:] = np.nan return gg if ncomp == 2 and widewVSep: # use recipe that recovers two-slab spectra (warning, may not be ideal if more than 3 slabs are present) m0_b, m1_b, m2_b = noisemask_moment(spectrum, m1, m2, mask_sigma=4, noise_rms=rms, window_hwidth=v_peak_hwidth) if m0_b > snr_cut*rms: # if the residual spectrum has m0 that is 3 sigma above the rms noise, treat both moment as individual # one component parts gg_a = moment_guesses(np.array([m1]), np.array([m2]), ncomp=1, sigmin=sigmin, moment0=np.array([m0]), linetype=linetype) gg_b = moment_guesses(np.array([m1_b]), np.array([m2_b]), ncomp=1, sigmin=sigmin, moment0=np.array([m0_b]), linetype=linetype) gg = np.zeros((ncomp * 4,) + np.array([m1]).shape) gg[:4,:] = gg_a[:] gg[4:,:] = gg_b[:] else: gg = moment_guesses(np.array([m1]), np.array([m2]), ncomp, sigmin=sigmin, moment0=np.array([m0]), linetype=linetype) else: # get the guesses based on moment maps based on "traditional" recipe gg = moment_guesses(np.array([m1]), np.array([m2]), ncomp, sigmin=sigmin, moment0=np.array([m0]), linetype=linetype) return gg
# THIS FUNCTION IS NEVER CALLED
[docs] def get_window_slab(maskcube, window_hwidth=3.0, v_atpeak=None): if v_atpeak is None: # find the peak of the integrated spectrum if v_atpeak isn't provided tot_spec = np.nansum(maskcube._data[:,]*maskcube.get_mask_array(), axis=(1,2)) idx_peak = np.nanargmax(tot_spec) print("peak T_B: {0}".format(np.nanmax(tot_spec))) v_atpeak = maskcube.spectral_axis[idx_peak].to(u.km/u.s).value print("v_atpeak: {0}".format(v_atpeak)) vmax = v_atpeak + window_hwidth vmin = v_atpeak - window_hwidth # Extract the spectrum within the window defined around the main hyperfine components and take moments slab = maskcube.spectral_slab(vmin*u.km/u.s, vmax*u.km/u.s) return slab
[docs] def vmask_moments(cube, vmap, window_hwidth=3.0): # obtain moments with windows centered around the vlsr specified in the provided map cubemasked = vmask_cube(cube, vmap, window_hwidth) m0 = cubemasked.moment0(axis=0).value m1 = cubemasked.moment1(axis=0).to(u.km/u.s).value m2 = (np.abs(cubemasked.moment2(axis=0))**0.5).to(u.km/u.s).value return m0, m1, m2
[docs] def vmask_cube(cube, vmap, window_hwidth=3.0): spax = cube.spectral_axis.value spax_cube = np.ones(cube.shape) * spax[:, None, None] v_up = vmap + window_hwidth v_down = vmap - window_hwidth mask = np.logical_and(spax_cube > v_down, spax_cube < v_up) cubemasked = cube.with_mask(mask) return cubemasked
[docs] def window_moments(spec, window_hwidth=4.0, v_atpeak=None, signal_mask=None): """ Calculate the zeroth, first, and second moments of a spectrum or cube within a specified velocity window. Parameters ---------- spec : pyspeckit.Cube, pyspeckit.spectrum.classes.Spectrum, or SpectralCube The spectrum or cube for which moments are calculated. Can be a `pyspeckit.Cube`, `pyspeckit.spectrum.classes.Spectrum`, or `SpectralCube` object. window_hwidth : float, optional The half-width of the spectral window in km/s used to calculate moments. This is useful for isolating hyperfine lines. Default is 4.0. v_atpeak : float, numpy.ndarray, or None, optional The velocity or velocity map (in km/s) around which to center the moment calculation. If None, it will be estimated from the spectrum or cube. Default is None. signal_mask : numpy.ndarray or None, optional An optional mask to apply when estimating the velocity peak (`v_atpeak`). Useful for suppressing noise when determining the peak. Default is None. Returns ------- tuple of numpy.ndarray A tuple containing: - Zeroth moment (`m0`) as a numpy array. - First moment (`m1`) as a numpy array in km/s. - Second moment (`m2`) as a numpy array in km/s. Raises ------ Exception If the input `spec` is not a supported type. Notes ----- - This function acts as a wrapper to calculate moments for different types of inputs (`SpectralCube`, `pyspeckit.Cube`, or `pyspeckit.spectrum`). - For `SpectralCube`, `v_atpeak` as a map is not currently supported. Examples -------- For a `SpectralCube` object: >>> from spectral_cube import SpectralCube >>> cube = SpectralCube.read("example_cube.fits") >>> m0, m1, m2 = window_moments(cube, window_hwidth=3.0, v_atpeak=5.0) For a `pyspeckit.spectrum.classes.Spectrum`: >>> import pyspeckit >>> spec = pyspeckit.Spectrum("example_spectrum.fits") >>> m0, m1, m2 = window_moments(spec, window_hwidth=2.0) """ def moments_pys_spectrum(spectrum, window_hwidth=4.0, v_atpeak=None, iter_refine=False): ''' find moments within a given window (e.g., around the main hyperfine lines) # note: iter_refine has not proven to be very effective in our tests :param spectrum: <pyspeckit.spectrum.classes.Spectrum> the spectrum to take the momentw of :param window_hwidth: float half-width of the window (in km/s) to be used to isolate the main hyperfine lines from the rest of the spectrum ''' if v_atpeak is None: moments = spectrum.moments(unit=u.km/u.s) v_atpeak = moments[2] vmax = v_atpeak + window_hwidth vmin = v_atpeak - window_hwidth # Extract the spectrum within the window defined around the main hyperfine components and take moments slice = spectrum.slice(vmin, vmax, unit=u.km/u.s) moments = slice.moments(unit=u.km/u.s) if iter_refine: # for low snr- this method really doesn't work well m0, m1, m2 = moments[1], moments[2], moments[3] # make the window smaller by making out channels outside a specific width around moment 1 # create a window 2 times the second moment new_window_hw = m2*3.0 if new_window_hw > window_hwidth: new_window_hw = window_hwidth vmax = m1 + new_window_hw vmin = m1 - new_window_hw slice = spectrum.slice(vmin, vmax, unit=u.km / u.s) moments = slice.moments(unit=u.km / u.s) return moments[1], moments[2], moments[3] def moments_pys_cube(pcube, window_hwidth=4.0, v_atpeak=None, iter_refine=False, multicore=None): """ Calculate moments of a pyspeckit spectral cube within a specified velocity window. Parameters ---------- pcube : pyspeckit.cubes.SpectralCube.Cube The spectral cube for which moments are calculated. window_hwidth : float, optional The half-width of the velocity window (in km/s) used to isolate spectral lines. Default is 4.0 km/s. v_atpeak : float, numpy.ndarray, or None, optional The velocity or velocity map (in km/s) around which to center the moment calculations. If None, moments are calculated over the entire cube and refined iteratively. Default is None. iter_refine : bool, optional If True, refines the velocity window iteratively based on the first moment. This is useful for low signal-to-noise spectra. Default is False. multicore : int or None, optional The number of CPU cores to use for parallel processing. If None, it will use all available cores. Default is None. Returns ------- tuple of numpy.ndarray A tuple containing: - Zeroth moment (`m0`) as a numpy array. - First moment (`m1`) as a numpy array in km/s. - Second moment (`m2`) as a numpy array in km/s. Notes ----- - This function uses pyspeckit's `momenteach` method for calculating moments. - If `v_atpeak` is provided, the function will restrict calculations to the specified velocity window. Otherwise, an initial pass over the cube will estimate the velocity map, which is used to define the window. - Iterative refinement of moments can be computationally expensive and is not recommended for large datasets unless necessary. Examples -------- >>> from pyspeckit import Cube >>> pcube = Cube("example_cube.fits") >>> m0, m1, m2 = moments_pys_cube(pcube, window_hwidth=3.0) >>> vmap = pcube.moment1(unit='km/s').value >>> m0, m1, m2 = moments_pys_cube(pcube, window_hwidth=2.0, v_atpeak=vmap, iter_refine=True) Raises ------ AttributeError If parallel processing with `momenteach` fails, the function will fall back to single-core execution. """ multicore = validate_n_cores(multicore) def get_win_moms(pcube, v_atpeak): # get window moments when v_atpeak is given pcube_masked = window_mask_pcube(pcube, v_atpeak, win_hwidth=window_hwidth) try: pcube_masked.momenteach(unit='km/s', verbose=False, multicore=multicore) except AttributeError: # if multicore fails, only use one core pcube_masked.momenteach(unit='km/s', verbose=False, multicore=1) moments = pcube_masked.momentcube return moments # note: the current implementation may be memory intensitive if v_atpeak is None: # note the moment 1 estimate of pcube.momenteach seem to be pretty good at ignoring hyperfine structures try: pcube.momenteach(unit='km/s', vheight=False, verbose=False, multicore=multicore) except AttributeError: # if multicore fails, only use one core pcube.momenteach(unit='km/s', vheight=False, verbose=False, multicore=1) moments = pcube.momentcube # redo again with the hyperfine lines masked out moments = get_win_moms(pcube, v_atpeak=moments[1]) else: moments = get_win_moms(pcube, v_atpeak) if iter_refine: # use the moment 1 estimated velocity to define new windows # for low snr this method may not work well moments = get_win_moms(pcube, v_atpeak=moments[1]) return moments[0], moments[1], moments[2] def moments_spectralcube(maskcube, window_hwidth, v_atpeak=None, signal_mask=None): # signal_mask is to provide additional masking specifically and only for v_atpeak estimate if v_atpeak is None: # find the peak of the integrated spectrum if v_atpeak isn't provided mask = maskcube.get_mask_array() if signal_mask is not None: mask = mask*signal_mask tot_spec = np.nansum(maskcube._data[:,]*mask, axis=(1,2)) idx_peak = np.nanargmax(tot_spec) logger.debug("Getting window moments of SpectralCube") logger.debug("peak T_B: {0}".format(np.nanmax(tot_spec))) v_atpeak = maskcube.spectral_axis[idx_peak].to(u.km/u.s).value logger.debug("v_atpeak: {0}".format(v_atpeak)) vmax = v_atpeak + window_hwidth vmin = v_atpeak - window_hwidth # Extract the spectrum within the window defined around the main hyperfine components and take moments slab = maskcube.spectral_slab(vmin*u.km/u.s, vmax*u.km/u.s) m0 = slab.moment0(axis=0).value m1 = slab.moment1(axis=0).to(u.km/u.s).value m2 = (np.abs(slab.moment2(axis=0))**0.5).to(u.km/u.s).value return m0, m1, m2 # wrapper to find moments for different types of inputs if isinstance(spec, pyspeckit.Cube): # this method is much slower than using SpectralCube, but also seems more robust at spectral peaks return moments_pys_cube(spec, window_hwidth, v_atpeak) elif isinstance(spec, pyspeckit.spectrum.classes.Spectrum): return moments_pys_spectrum(spec, window_hwidth, v_atpeak) elif isinstance(spec, SpectralCube): # currently cannot handle v_atpeak as a map if hasattr(v_atpeak, 'ndim') and v_atpeak.ndim>0: # numpy floats have ndim=0 logger.error("the method that handles SpectralCube cannot currently handle v_atpeak as a map, please use single value v_atpeak instead") return moments_spectralcube(spec, window_hwidth, v_atpeak, signal_mask) else: raise Exception("the spec provided is of invalid type") return None
[docs] def noisemask_moment(sp, m1, m2, mask_sigma=4, noise_rms = None, **kwargs): # mask out the 'main' component based on moment map and replace them with fake noise # and rerun window_mements to find additional components sp_m = copy.copy(sp) if 'v_atpeak' not in kwargs: kwargs['v_atpeak'] = m1 if noise_rms is None: noise_rms = get_rms_prefit(sp, **kwargs) mask = np.logical_and(sp_m.xarr.value < m1 + mask_sigma * m2, sp_m.xarr.value > m1 - mask_sigma * m2) sp_m.data[mask] = np.random.randn(np.sum(mask)) * noise_rms return window_moments(sp_m, **kwargs)
[docs] def moment_guesses(moment1, moment2, ncomp, sigmin=0.07, tex_guess=3.2, tau_guess=0.5, moment0=None, linetype='nh3', mom0_floor=None): """ Generate reasonable initial guesses for multiple component fits based on moment maps. Parameters ---------- moment1 : numpy.ndarray The first moment (velocity centroid) map. moment2 : numpy.ndarray The second moment (velocity dispersion) map. ncomp : int Number of components for the fit. sigmin : float, optional Minimum velocity dispersion (in km/s). Default is 0.07 km/s, the spectral resolution of the GAS channels. tex_guess : float, optional Initial guess for the excitation temperature (T_ex). Default is 3.2 K. tau_guess : float, optional Initial guess for the optical depth (tau). Default is 0.5. moment0 : numpy.ndarray, optional Zeroth moment (integrated intensity) map. If provided, this will be used to modify the guesses for T_ex and tau. Default is None. linetype : str, optional Line type, either 'nh3' or 'n2hp'. Determines parameter limits. Default is 'nh3'. mom0_floor : float, optional Minimum floor value for moment0 when normalizing. If not provided, a default normalization is applied. Default is None. Returns ------- numpy.ndarray A 2D array of shape `(ncomp * 4, moment1.shape)` containing the guesses for each parameter: - Component velocity centroids. - Velocity dispersions. - Excitation temperatures. - Optical depths. Notes ----- - For `ncomp == 1`, the method uses a single-component guess based on the provided moments. - For `ncomp == 2`, the method applies a recipe for a bright and faint component with offsets in velocity and reduced T_ex and tau for the faint component. - For `ncomp > 2`, components are evenly spaced in velocity, and parameter guesses are scaled accordingly. - If `moment0` is provided, the guesses for T_ex and tau are scaled based on the normalized moment0 values. Examples -------- >>> import numpy as np >>> moment1 = np.array([[1.0, 2.0], [3.0, 4.0]]) >>> moment2 = np.array([[0.1, 0.2], [0.3, 0.4]]) >>> ncomp = 2 >>> guesses = moment_guesses(moment1, moment2, ncomp, tex_guess=4.0, tau_guess=0.8) >>> moment0 = np.array([[1.5, 2.5], [3.5, 4.5]]) >>> guesses_with_mom0 = moment_guesses(moment1, moment2, ncomp, moment0=moment0, mom0_floor=0.1) Raises ------ ValueError If `ncomp` is less than 1 or other parameters are invalid. """ # setup different limits based on the line specie line_setup = LineSetup(linetype) tex_max = line_setup.tex_max tau_max = line_setup.tau_max tex_min = line_setup.tex_min tau_min = line_setup.tau_min if moment0 is not None: #print "[WARNING]: moment0 map is provided, thus the user-provided tex and tau will not be used" # normalize the moment 0 map with respect to the norm_ref percentile value # e.g., 95 percentile value being normalized to have a value of 0.95 norm_ref = 99.73 mom0high = np.percentile(moment0[np.isfinite(moment0)], norm_ref) # may want to modify this normalization to be something a little simpler or physical (i.e., 99.73/100 ~ 1) if mom0_floor is None: m0Norm = moment0.copy()*norm_ref/100.0/mom0high tex_guess = m0Norm*tex_max tau_guess = m0Norm*tau_max else: m0Norm = moment0.copy() * norm_ref / 100.0 / (mom0high - mom0_floor) tex_guess = m0Norm * (tex_max - tex_min) + tex_min tau_guess = m0Norm * (tau_max - tau_min) + tau_min m1 = moment1 m2 = moment2 # Guess linewidth (the current recipe works okay, but potential improvements can be made. gs_sig = m2/ncomp gs_sig[gs_sig < sigmin] = sigmin # note 0.08 k is narrow enough to be purely thermal @ ~10 K # there are 4 parameters for each v-component gg = np.zeros((ncomp*4,)+m1.shape) if ncomp == 1: gg[0,:] = m1 # v0 centriod gg[1,:] = gs_sig # v0 width gg[2,:] = tex_guess # v0 T_ex gg[3,:] = tau_guess # v0 tau # using a working recipe (assuming a bright and a faint componet) if ncomp == 2: sigmaoff = 0.4 tau2_frac = 0.25 # the tau weight of the second component relative to the total fraction gg[0,:] = m1 - sigmaoff*m2 # v0 centriod gg[1,:] = gs_sig # v0 width gg[2,:] = tex_guess # v0 T_ex gg[3,:] = tau_guess*(1-tau2_frac) # v0 tau gg[4,:] = m1 + sigmaoff*m2 # v1 centriod gg[5,:] = gs_sig # v1 width gg[6,:] = tex_guess*0.8 # v1 T_ex gg[7,:] = tau_guess*tau2_frac # v1 tau # using a generalized receipe that have not been tested (lots of room for improvement!) if ncomp > 2: for i in range (0, ncomp): gg[i, :] = m1+(-1.0+i*1.0/ncomp)*0.5*m2 # v0 centriod (step through a range fo velocities within sigma_v) gg[i+1,:] = gs_sig # v0 width gg[i+2,:] = tex_guess*0.8 # v0 T_ex gg[i+3,:] = tau_guess/ncomp*0.25 # v0 tau # ensure the tex and tau guesses falls within the guessing limits tex_guess[tex_guess < tex_min] = tex_min tex_guess[tex_guess > tex_max] = tex_max tau_guess[tau_guess < tau_min] = tau_min tau_guess[tau_guess > tau_max] = tau_max return gg
#======================================================================================================================= # additional recipe
[docs] def moment_guesses_1c(m0, m1, m2): # make guesses based on the moment maps # note: it does not impose limits on parameters such as tau and tex # m0 from pyspeckit is actually amplitube proxy, so to calculate gs_sig = m2 ** 0.5 mom0 = m0 * gs_sig * np.sqrt(2 * np.pi) mom0_thres = 3.0 # the regime boundary between fix-tau- or fix-tex-based calculation mom0_min = 0.03 # Ta ~ 0.25 at Tex=3, tau=0.1 tau_fx = 2.5 #1.5 #2.5 tex_fx = 6.0 #7.0 #6.0 # K # divide the tau tex guess into two regimes by integrated flux # if flux is greater than mom0_thres, assume a fixed tau value and caculate the corrosponding tex assume Gaussian # otherwise, assume a fixed tex value and calculate tau instead if isinstance(m0, float): # for 0D if mom0 < mom0_min: mom0 = mom0_min if mom0 > mom0_thres: tau_guess = tau_fx tex_guess = get_tex(mom0, tau_fx) else: tex_guess = tex_fx tau_guess = get_tau(mom0, tex_guess) elif m0.ndim == 2: # aassume all mom0 lower than mom0 to be mom0_min mom0[mom0 < mom0_min] = mom0_min # for 2D tau_guess = np.zeros(m0.shape) tex_guess = np.zeros(m0.shape) mask = mom0 > mom0_thres tau_guess[mask] = tau_fx tex_guess[mask] = get_tex(mom0[mask], tau_fx) tau_guess[~mask] = get_tau(mom0[~mask], tex_fx) tex_guess[~mask] = tex_fx else: raise Exception("the moment 0 input has the wrong dimension ({})".format(m0.ndim)) return None return np.asarray([m1, gs_sig, tex_guess, tau_guess])
[docs] def mom_guess_wide_sep(spec, vpeak=None, rms=None, planemask=None, multicore=None): # for two components win_hwidth = 4.0 # the window for the second component recovery (though the 1st win_hwidth should mask out the hyperfines already) win_hwidth2 = 10.0 # the amount of moment 2 estimated sigma to maskout for "residual moment maps" f_sig_mask = 2.0 rms_thres = 2.0 # the amplitude threshold to count as the signal f_tau = 0.5 # weight of the tau for the "equal-weight" guesses f_sig = 0.5 # weight of the linewidth for all guesses multicore = validate_n_cores(multicore) if isinstance(spec, pyspeckit.spectrum.classes.Spectrum): spec.xarr.velocity_convention = 'radio' spec.xarr = spec.xarr.as_unit('km/s') if rms is None: # we only need a crude estimate rms = mad_std(spec.data, axis=None, ignore_nan=True) if vpeak is None: # find the v_peak pased on smoothed spectrum sp_smooth = spec.copy() sp_smooth.smooth(3) idx = np.argmax(sp_smooth.data) vpeak = sp_smooth.xarr[idx] vpeak = vpeak.value logger.info("vpeak for widely separated moment guesses: {}".format(vpeak)) # get the moments m0, m1, m2 = window_moments(spec, window_hwidth=win_hwidth, v_atpeak=vpeak) # assume the emission is dominated by the brighter component, and use moment maps to # create guesses for the first component gg1 = moment_guesses_1c(m0, m1, m2) # convert moment 2 to sigma sig = m2 ** 0.5 # mask emission found by moment masks, and try to find a secondary peak using a broader window sp4 = spec.copy() mask = np.logical_and(sp4.xarr.value > m1 - sig * f_sig_mask, sp4.xarr.value < m1 + sig * f_sig_mask) #mask2 = np.logical_and(sp4.xarr.value > vpeak - win_hwidth, sp4.xarr.value < vpeak + win_hwidth) mask2 = np.logical_and(sp4.xarr.value > m1 - win_hwidth, sp4.xarr.value < m1 + win_hwidth) mask = np.logical_or(mask, ~mask2) sp4.data[mask] = 0.0 #sp4.data[~mask2] = 0.0 m0n, m1n, m2n = window_moments(sp4, window_hwidth=win_hwidth2, v_atpeak=vpeak) gg2 = moment_guesses_1c(m0n, m1n, m2n) gg = np.concatenate((gg1, gg2)) if m0n < rms * rms_thres: gg[4:8] = gg1[:] gg[0] += sig gg[4] -= sig # set tau of each component to half of what the 1-component moment guess is gg[3] *= f_tau gg[7] *= f_tau gg[1] *= f_sig gg[5] *= f_sig elif isinstance(spec, SpectralCube): if rms is None: # we only need a crude estimate rms = mad_std(spec._data, axis=0, ignore_nan=True) spec = spec.with_spectral_unit("km/s", velocity_convention="radio") pcube = pyspeckit.Cube(cube=spec, maskmap=planemask, velocity_convention = 'radio') # get the moments using the pyspeckit method m0, m1, m2 = window_moments(pcube, window_hwidth=win_hwidth, v_atpeak=vpeak) # assume the emission is dominated by the brighter component, and use moment maps to # create guesses for the first component gg1 = moment_guesses_1c(m0, m1, m2) # convert moment 2 to sigma sig = m2 ** 0.5 # maskout emission found by moment masks, and try to find a secondary peak spax = spec.spectral_axis.value spax_cube = np.broadcast_to(spax[:, np.newaxis, np.newaxis], (len(spax), m1.shape[0], m1.shape[1])) smask = np.logical_and(spax_cube > m1 - sig * f_sig_mask, spax_cube < m1 + sig * f_sig_mask) smask2 = np.logical_and(spax_cube > m1 - win_hwidth, spax_cube < m1 + win_hwidth) smask = np.logical_or(smask, ~smask2) maskcube = spec.with_mask(~smask) # fill value needs to be zero for moment estimates to work as expected maskcube = maskcube.with_fill_value(0.0) # convert it to pyspeckit to take advantage of pyspeckit's moment methods pcube = pyspeckit.Cube(cube=maskcube, maskmap=planemask) pcube.momenteach(verbose=False, multicore=multicore) m0n, m1n, m2n = pcube.momentcube gg2 = moment_guesses_1c(m0n, m1n, m2n) # use "equal weight guesses" if the remaining spectrum is too faint mask = m0n < rms * rms_thres gg2[:,mask] = gg1[:,mask].copy() gg = np.concatenate((gg1, gg2), axis=0) #gg[4:8, mask] = gg1[:, mask] gg[0, mask] += sig[mask] gg[4, mask] -= sig[mask] # set tau of each component to half of what the 1-component moment guess is gg[3,mask] *= f_tau gg[7,mask] *= f_tau # set the guessing linewidth to be half of the moment guesses, for both components gg[1,:] *= f_sig gg[5,:] *= f_sig return gg
#======================================================================================================================= # utility functions
[docs] def get_rms_prefit(spectrum, window_hwidth, v_atpeak, linetype='nh3'): s = spectrum line_setup = LineSetup(linetype) vsys = v_atpeak*u.km/u.s throw = window_hwidth*u.km/u.s voff = line_setup.voff mask = np.ones(s.shape[0], dtype=np.bool) for deltav in voff: mask *= (np.abs(s.xarr - (deltav * u.km / u.s + vsys)) > throw) d_rms = s.data.copy() return mad_std(d_rms[mask])
[docs] def adaptive_moment_maps(maskcube, seeds, window_hwidth, weights=None, signal_mask=None): # split the cube up into different regions and make mosaic moment maps from moments of each individual regions labels, n_labs = map_divide.dist_divide(seeds, weights=weights, return_nmarkers=True) if signal_mask is None: signal_mask = seeds m0 = np.zeros(labels.shape) m0[:] = np.nan m1 = m0.copy() m2 = m0.copy() moments = [m0, m1, m2] # make moment map for each region for i in range(n_labs): mask = labels == i + 1 new_cube_mask = maskcube.get_mask_array() new_cube_mask = new_cube_mask * mask moms = window_moments(maskcube.with_mask(new_cube_mask), window_hwidth, signal_mask=signal_mask*mask) for m, m_p in zip(moments, moms): m[mask] = m_p[mask] return m0, m1, m2
[docs] def window_mask_pcube(pcube, vmid, win_hwidth=4.0): # returns a copy of the pucbe (pyspeckit.cubes.SpectralCube.Cube) with spectra outside of the window zeroed shape = pcube.cube.shape spax = pcube.xarr.value spax_cube = np.broadcast_to(spax[:, np.newaxis, np.newaxis], (len(spax), shape[1], shape[2])) smask = np.logical_and(spax_cube > vmid - win_hwidth, spax_cube < vmid + win_hwidth) pcube = pcube.copy() #pcube.cube[~smask] = 0.0 pcube.cube[~smask] = np.nan pcube.maskmap = np.any(np.isfinite(pcube.cube), axis=0) return pcube
#======================================================================================================================= # physics functions
[docs] def get_tex(Ta, tau=0.5, nu=23.722634): # calculate the excitation temperature given tau background_tb = 2.7315 T0 = (h * nu * 1e9 / kb) term1 = Ta / T0 / (1 - np.exp(-tau)) term2 = 1.0 / (np.exp(T0 / background_tb) - 1) term3 = 1.0 / (term1 + term2) + 1 term4 = T0 / np.log(term3) return term4
[docs] def get_tau(Ta, tex=6.0, nu=23.722634): background_tb = 2.7315 T0 = (h * nu * 1e9 / kb) term1 = 1 / (np.exp(T0 / tex) - 1) - 1 / (np.exp(T0 / background_tb) - 1) term2 = -Ta / T0 / term1 + 1 return -np.log(term2)
[docs] def peakT(tex, tau, nu=23.722634): background_tb = 2.7315 tauprof = tau T0 = (h * nu * 1e9 / kb) return (T0 / (np.exp(T0 / tex) - 1) - T0 / (np.exp(T0 / background_tb) - 1)) * (1 - np.exp(-tauprof))