Source code for mufasa.spec_models.meta_model
"""
Defines the MetaModel class for managing spectral model parameters, constraints, and fitting functions.
"""
__author__ = 'mcychen'
import numpy as np
from astropy import units as u
from concurrent.futures import ThreadPoolExecutor
from ..exceptions import FitTypeError
from .. import moment_guess as momgue
[docs]
class MetaModel(object):
"""
A class to store spectral model-specific information relevant to spectral modeling tasks, such as fitting.
Attributes
----------
fittype : str
The identifying name for the spectral model (i.e., the type of model to fit).
ncomp : int
The number of components within the model.
adoptive_snr_masking : bool
Whether to use adaptive SNR masking during fitting (default is True).
window_hwidth : int
The default half-width of the window for calculating moments.
central_win_hwidth : float or None
The half-width of the intermediate window for filtering out satellite hyperfine lines. Default is None.
Texmin : float
Lower excitation temperature limit in Kelvin. Default is 3.0 K.
Texmax : float
Upper excitation temperature limit in Kelvin. Default is 40 K.
sigmin : float
Lower velocity dispersion limit in km/s. Default is 0.07 km/s.
sigmax : float
Upper velocity dispersion limit in km/s. Default is 2.5 km/s.
taumin : float
Lower optical depth limit. Default is 0.1.
taumax : float
Upper optical depth limit. Default is 30.0.
eps : float
A small perturbation that can be used in initial guesses. Default is 0.001.
main_hf_moments : function
Function for calculating moments for the main hyperfine component.
linetype : str
The type of spectral line (e.g., 'nh3', 'n2hp'). Set based on `fittype`.
linenames : list of str
Names of the spectral lines being modeled. Set based on `fittype`.
linename : str
Name of the main spectral line being modeled. Set based on `fittype`.
fitter : function
The model generator function for fitting spectral components. Set based on `fittype`.
freq_dict : dict
Dictionary containing rest frequencies for the spectral lines. Set based on `fittype`.
rest_value : astropy.units.Quantity
Rest frequency of the spectral line in Hz. Set based on `fittype`.
voffs : list of float
Velocity offsets for the hyperfine lines in km/s. Set based on `fittype`.
tau_wts : list of float
Weights for the optical depth of hyperfine lines. Set based on `fittype`.
voff_at_peak : float
Velocity offset at the brightest hyperfine line in km/s. Set based on `fittype`.
model_func : function
The specific spectral model function used for fitting. Set based on `fittype`.
"""
def __init__(self, fittype, ncomp):
"""
Initialize a MetaModel object.
Parameters
----------
fittype : str
The identifying name for the spectral model (e.g., 'nh3_multi_v', 'n2hp_multi_v').
ncomp : int
The number of components within the model to be fitted.
Raises
------
FitTypeError
If the provided `fittype` is not recognized.
"""
self.fittype = fittype
self.adoptive_snr_masking = True
# the default windows for calculating moments
# final window for evaluating moments (center differs from pixel to pixel)
self.window_hwidth = 5
# intermediate window for filtering out satellite hyperfine lines. The window is fixed and the same
# throughout all pixels
self.central_win_hwidth = None
# set default parameter limits
# (primarily based on NH3 (1,1) in the galactic molecular cloud conditions using RADEX)
self.Texmin = 3.0 # K; (assume T_kin=5K, density = 1e3 cm^-3, column=1e13 cm^-2, sigv=3km/s)
self.Texmax = 40 # K; T_k for Orion A is < 35 K. (T_k=40 at 1e5 cm^-3, 1e15 cm^-2, and 0.1 km/s --> Tex=37K)
self.sigmin = 0.07 # km/s
self.sigmax = 2.5 # km/s; (Larson's law for a 10pc cloud has sigma = 2.6 km/s)
self.taumax = 30.0 # thick enough for NH3 (1,1) satellite hyperfines to have the same intensity as the main
self.taumin = 0.1 # (note: NH3 (1,1) at 1e3 cm^-3, 1e13 cm^-2, 1 km/s linewidth, 40 K -> 0.15)
self.eps = 0.001 # a small perturbation that can be used in guesses
# a placeholder for if the window becomes line specific
self.main_hf_moments = momgue.window_moments
# for the NH3 (1,1) multicomponent model
if self.fittype == 'nh3_multi_v':
from .SpecModels import AmmoniaModel
self.model = AmmoniaModel()
self.linetype = 'nh3'
# the current implementation only fits the 1-1 lines
self.linename = 'oneone'
# for the N2H+ (1-0) multi-component model
elif self.fittype == 'n2hp_multi_v':
from .SpecModels import N2HplusModel
self.model = N2HplusModel()
self.linetype = 'n2hp'
# the current implementation only fits the 1-0 lines
self.linename = 'onezero'
# change the parameter limits from the default to better reflects N2H+ (1-0)
self.taumax = 40.0 # when the satellite hyperfine lines becomes optically thick
# overwrite default windows, since N2H+ (1-0) have a satellite line that could be fairly bright
self.window_hwidth = 4
self.central_win_hwidth = 3.5
else:
raise FitTypeError("\'{}\' is an invalid fittype".format(fittype))
freq_dict = self.model._molecular_constants['freq_dict']
voff_lines_dict = self.model._molecular_constants['voff_lines_dict']
tau_wts_dict = self.model._molecular_constants['tau_wts_dict']
self.fitter = self.model.multi_v_model_generator(n_comp=ncomp)
# the function to compute the spectral model
self.model_func = self.model.multi_v_spectrum
self.linenames = [self.linename]
self.rest_value = freq_dict[self.linename] * u.Hz
self.voffs = voff_lines_dict[self.linename]
self.tau_wts = tau_wts_dict[self.linename]
self.voff_at_peak = self.voffs[np.argmax(self.tau_wts)] # velocity offset at the brightest hyperfine line
[docs]
def model_function(self, xarr, parameters, planemask=None, multithreaded=False):
"""
Compute the spectral model for a given spectral axis and model parameters.
Parameters
----------
xarr : pyspeckit.spectrum.units.SpectroscopicAxis
The spectral axis along which to evaluate the model function.
parameters : array-like
The model parameters for evaluation. Can be:
- A 1D array with shape `(l,)`, representing a single set of parameters.
- A 2D array with shape `(l, n)`, where `l` is the number of parameters,
and `n` is the number of spatial positions.
- A 3D array with shape `(l, m, n)`, where `l` is the number of parameters
per pixel, and `m` and `n` represent the spatial dimensions. In this
case, the function will evaluate the model at each `(m, n)` pixel
specified by `planemask`.
planemask : array-like, optional
A 2D mask array with shape `(m, n)` indicating the pixels to evaluate when
`parameters` is 3D. Pixels with `True` in the mask will be evaluated. If
None, a default mask is generated where all non-zero, finite values in
`parameters` are considered valid.
multithreaded : bool, optional
Whether to enable multithreading to evaluate pixels in parallel when
`parameters` is 2D or 3D. Default is False.
Returns
-------
np.ndarray
The evaluated spectral model values. The output shape matches the input
shape of `parameters`:
- If `parameters` is 1D, returns a 1D array of computed values.
- If `parameters` is 2D, returns a 1D array with length `n`, containing
computed values for each position.
- If `parameters` is 3D, returns a 2D array with shape `(m, n)`, containing
computed values for each evaluated pixel in the masked region. Masked
pixels are filled with NaN.
"""
if parameters.ndim == 1:
# Direct evaluation for 1D input
return self.model_func(xarr, *parameters)
elif parameters.ndim == 2:
# For 2D input, transpose for easier iteration
pars = parameters.T
# Initialize results array for 2D output
results = np.full(pars.shape[0], np.nan, dtype=object)
# Define evaluation function for each position
def evaluate_position(i):
par = pars[i]
return self.model_func(xarr, *par)
if multithreaded:
with ThreadPoolExecutor() as executor:
evaluated_results = list(executor.map(evaluate_position, range(len(pars))))
results[:] = evaluated_results
else:
for i, par in enumerate(pars):
results[i] = self.model_func(xarr, *par)
return results
elif parameters.ndim == 3:
# Generate mask if not provided
if planemask is None:
planemask = np.all(np.isfinite(parameters), axis=0) & np.all(parameters != 0, axis=0)
# Mask parameters for efficient evaluation
pars = parameters[:, planemask].T
# Initialize results array to hold model outputs for 3D input
results = np.full((parameters.shape[1], parameters.shape[2]), np.nan, dtype=object)
# Define evaluation function for each pixel
def evaluate_pixel(i):
par = pars[i]
return self.model_func(xarr, *par)
if multithreaded:
with ThreadPoolExecutor() as executor:
evaluated_results = list(executor.map(evaluate_pixel, range(len(pars))))
results[planemask] = evaluated_results
else:
for i, par in enumerate(pars):
results[planemask][i] = self.model_func(xarr, *par)
return results
[docs]
def peakT(self, parameters, index_v=0, planemask=None, multithreaded=False):
"""
Estimate the peak brightness temperature of the spectral model by evaluating
the model at the spectral location (i.e., the velocity) of the brightest
hyperfine line.
Note
----
This method only works for a single-component model, where the peak
brightness is expected to be located near the brightest hyperfine line.
Parameters
----------
parameters : array-like
The model parameters to evaluate, where each element corresponds to a
parameter used in the spectral model. Can be:
- A 1D array with shape `(l,)`, representing a single set of parameters.
- A 2D array with shape `(l, n)`, where `l` is the number of parameters and
`n` is the number of spatial positions.
- A 3D array with shape `(l, m, n)`, where `l` is the number of parameters
per pixel, and `m` and `n` represent the spatial dimensions.
index_v : int, optional
Index of the parameter corresponding to the velocity centroid, which is 0
for all spectral models implemented in MUFASA. Default is 0.
planemask : array-like, optional
A 2D mask array with shape `(m, n)` indicating the pixels to evaluate when
`parameters` is 3D. Pixels with `True` in the mask will be evaluated. If
None, a mask is generated based on finite, non-zero values in
`parameters`.
multithreaded : bool, optional
Whether to enable multithreading to evaluate pixels in parallel when
`parameters` is 2D or 3D. Default is False.
Returns
-------
np.ndarray
An array with the peak brightness temperature for each evaluated pixel:
- If `parameters` is 1D, returns a scalar representing the peak
brightness temperature.
- If `parameters` is 2D, returns a 1D array with length `n`, containing
the peak brightness temperature for each position.
- If `parameters` is 3D, returns a 2D array with shape `(m, n)`, containing
the peak brightness temperature for each evaluated pixel in the masked
region. Masked pixels are filled with NaN.
"""
# Function implementation
# Get the velocity at the brightest hyperfine line
v0 = np.array([parameters[index_v] + self.voff_at_peak]) * u.km / u.s
# Convert the velocity to frequency for evaluation
nu0 = v0.to(u.GHz, equivalencies=u.doppler_radio(self.rest_value))
if parameters.ndim == 1:
# Direct evaluation for 1D input
return self.model_func(nu0, *parameters)[0]
else:
if parameters.ndim == 2:
pars = parameters.T
nu0_array = nu0.T
if parameters.ndim == 3:
# Generate mask if not provided
if planemask is None:
planemask = np.all(np.isfinite(parameters), axis=0) & np.all(parameters != 0, axis=0)
# Mask parameters and frequencies for efficient evaluation
pars = parameters[:, planemask].T
nu0_array = nu0[:, planemask].T
# Initialize results array
pT = np.full(nu0_array.shape[0], np.nan)
# Define a function for multithreading
def evaluate_pixel(i):
nu0_i = nu0_array[i]
par = pars[i]
return self.model_func(nu0_i, *par)[0]
if multithreaded:
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
results = list(executor.map(evaluate_pixel, range(len(pars))))
pT[:] = results
else:
# Single-threaded loop if multicore is False
for i, par in enumerate(pars):
nu0_i = nu0_array[i]
pT[i] = self.model_func(nu0_i, *par)[0]
if parameters.ndim == 3:
# Reconstruct the 2D output
pT2D = np.full(planemask.shape, np.nan)
pT2D[planemask] = pT
return pT2D
else:
return pT