"""
The `mufasa.guess_refine` module provides tools for cleaning, normalizing, and
interpolating fitted parameter maps and generating refined guesses for fitting.
"""
__author__ = 'mcychen'
#=======================================================================================================================
import numpy as np
import warnings
from astropy.stats import mad_std
from astropy.wcs import WCS
from skimage.morphology import remove_small_objects, binary_dilation, disk, remove_small_holes
from scipy.ndimage.filters import median_filter
from scipy.interpolate import CloughTocher2DInterpolator as intp
from scipy.interpolate import griddata
from .utils.fits_utils import get_pixel_mapping
from astropy.convolution import Gaussian2DKernel, convolve
from scipy.spatial.qhull import QhullError
from pyspeckit.spectrum.models.ammonia_constants import freq_dict
from astropy import units as u
nu0_nh3 = freq_dict['oneone'] * u.Hz
nu0_nh3 = nu0_nh3.to("GHz").value
from .utils import interpolate
from . import moment_guess as mmg
#=======================================================================================================================
from .utils.mufasa_log import get_logger
logger = get_logger(__name__)
#=======================================================================================================================
[docs]
def quick_2comp_sort(data_cnv, filtsize=2, method="error_v", nu=nu0_nh3, f_tau=0.5):
# use median filtered vlsr & sigma maps as a velocity reference to sort the two components
# f_tau is the factor to down scale tau to minimic the effective tau of the main hyperfines
if method == "error_v":
# arange the maps so the component with the least vlsr errors is the first component
swapmask = data_cnv[8] > data_cnv[12]
data_cnv = mask_swap_2comp(data_cnv, swapmask)
elif method == "Tpeak":
# sort by the peak brigthness temperature using the tau & tax parameter
# the brigther component is placed as the first component (the further away from the observer)
Tb0_a = mmg.peakT(data_cnv[3], data_cnv[4]*f_tau, nu=nu)
Tb0_b = mmg.peakT(data_cnv[6], data_cnv[7]*f_tau, nu=nu)
swapmask = Tb0_b > Tb0_a
data_cnv = mask_swap_2comp(data_cnv, swapmask)
elif method == "tautex":
# sort by the relative emission brightness proxy
# the brigther component is placed as the first component (the further away from the observer)
Tb0_a = data_cnv[3]*data_cnv[4]
Tb0_b = data_cnv[6]*data_cnv[7]
swapmask = Tb0_b > Tb0_a
data_cnv = mask_swap_2comp(data_cnv, swapmask)
elif method == "tau":
# placing the optically thicker component in the back (1st component)
# would only recommend if the degeneracy between tau and tex has been somewhat addressed already
swapmask = data_cnv[7] > data_cnv[4]
data_cnv = mask_swap_2comp(data_cnv, swapmask)
elif method == "chen2020":
# this is the method used by Chen+ 2020 ApJ
# arange the maps so the component with the least vlsr errors is the first component
swapmask = data_cnv[8] > data_cnv[12]
data_cnv = mask_swap_2comp(data_cnv, swapmask)
# the use the vlsr error in the first component as the reference and sort the component based on their similarities
# to this reference (similary bright structures should have similar errors)
ref = median_filter(data_cnv[8], size=(filtsize, filtsize))
swapmask = np.abs(data_cnv[8] - ref) > np.abs(data_cnv[12] - ref)
data_cnv = mask_swap_2comp(data_cnv, swapmask)
def dist_metric(p1, p2):
# use the first map (the one that should have the smallest error, hense more reliable) to compute
# distance metric based on their similarities to the median filtered quantity
p_refa = median_filter(p1, size=(filtsize, filtsize))
#p_refb = median_filter(p2, size=(filtsize, filtsize))
# distance of the current arangment to the median
del_pa = np.abs(p1 - p_refa)
# distance of the swapped arangment to the median
del_pb = np.abs(p2 - p_refa)
return del_pa, del_pb
dist_va, dist_vb = dist_metric(data_cnv[0], data_cnv[4])
dist_siga, dist_sigb = dist_metric(data_cnv[1], data_cnv[5])
# use both the vlsr and the sigma as a distance metric
swapmask = np.hypot(dist_va, dist_siga) > np.hypot(dist_vb, dist_sigb)
data_cnv= mask_swap_2comp(data_cnv, swapmask)
return data_cnv
[docs]
def mask_swap_2comp(data_cnv, swapmask):
# swap data over the mask
data_cnv= data_cnv.copy()
data_cnv[0:4,swapmask], data_cnv[4:8,swapmask] = data_cnv[4:8,swapmask], data_cnv[0:4,swapmask]
data_cnv[8:12,swapmask], data_cnv[12:16,swapmask] = data_cnv[12:16,swapmask], data_cnv[8:12,swapmask]
return data_cnv
[docs]
def guess_from_cnvpara(data_cnv, header_cnv, header_target, mask=None, tau_thresh=1, clean_map=True):
# a wrapper to make guesses based on the parameters fitted to the convolved data
npara = 4
ncomp = int(data_cnv.shape[0]/npara/2)
data_cnv = data_cnv.copy()
data_cnv[data_cnv == 0] = np.nan
# clean up the maps based on vlsr errors
if ncomp == 1:
std_thres = 3
else:
std_thres = 1
if ncomp == 2:
data_cnv = quick_2comp_sort(data_cnv, filtsize=2, method="error_v")
if clean_map:
data_cnv = simple_para_clean(data_cnv, ncomp, npara=npara, std_thres=std_thres)
# remove the error component
data_cnv = data_cnv[0:npara*ncomp]
for i in range (0, ncomp):
data_cnv[i*npara:i*npara+npara] = refine_each_comp(data_cnv[i*npara:i*npara+npara], mask, tau_thresh=tau_thresh)
# regrid the guess back to that of the original data
hdr_conv = get_celestial_hdr(header_cnv)
hdr_final = get_celestial_hdr(header_target)
guesses_final = []
newmask = np.any(np.isfinite(data_cnv), axis=0)
newmask = remove_small_holes(newmask, 25)
# regrid the guesses
for gss in data_cnv:
if False:
newmask = np.isfinite(gss)
# removal holes with areas that smaller than a 5 by 5 square
newmask = remove_small_holes(newmask, 25)
# create a mask to regrid over
newmask = regrid(newmask, hdr_conv, hdr_final, dmask=None, method='nearest')
newmask = newmask.astype('bool')
new_guess = regrid(gss, hdr_conv, hdr_final, dmask=newmask)
# expand the interpolation a bit, since regridding can often miss some pixels due to aliasing
newmask_l = binary_dilation(newmask)
newmask_l = binary_dilation(newmask_l)
kernel = Gaussian2DKernel(1)
new_guess_cnv = convolve(new_guess, kernel, boundary='extend')
new_guess_cnv[~newmask_l] = np.nan
# retrain the originally interpolataed values within the original mask
mask_finite = np.isfinite(new_guess)
new_guess_cnv[mask_finite] = new_guess[mask_finite]
else:
new_guess_cnv = interpolate.iter_expand(gss, mask=newmask)
new_guess_cnv = regrid(new_guess_cnv, hdr_conv, hdr_final, dmask=None)
# insure the footprint wasn't made smaller due to regriding
mask_l = np.isfinite(new_guess_cnv)
mask_l = binary_dilation(mask_l, disk(2))
new_guess_cnv = interpolate.iter_expand(new_guess_cnv, mask=mask_l)
guesses_final.append(new_guess_cnv)
return np.array(guesses_final)
[docs]
def tautex_renorm(taumap, texmap, tau_thresh = 0.21, tex_thresh = 15.0, nu=nu0_nh3):
# attempt to re-normalize the tau & text values at the optically thin regime (where the two are degenerate)
# note, the latest recipe also works for the optically thick regime in principle
# only emission with lower amplitude than TA_ltau_thres and tau < tau_thin will have tex > tex_thresh recalculated
# (i.e., expected to be optically thin
f_tau = 0.5 # a factor to minic the effecitve tau of the main hyperfines
isthin = np.logical_and(taumap < tau_thresh, np.isfinite(taumap))
TA_lowtau = mmg.peakT(texmap[isthin], taumap[isthin]*f_tau, nu=nu)
TA_ltau_thres = 0.5 # where tau ~1 for Tex = 3.5; tau with Ta above this diverges quickly
# assume a fixed Tex for low TA
tex_thin = 3.5 # note: at Tk = 30K, n = 1e3, N = 1e13, & sig = 0.2 km.s --> Tex = 3.49 K, tau = 0.8
tau_thin = tau_thresh#1.0 # where the main hyperfines of NH3 (1,1) starts to get optically thick
# for when tau is less than tau_thresh
texmap[isthin] = mmg.get_tex(TA_lowtau, tau=tau_thresh*f_tau) #note: tex can be higher than at 40K at Ta~7K
taumap[isthin] = tau_thresh
# optically thin gas are also unlikely to have high spatial density and thus high Tex
hightex = np.logical_and(texmap > tex_thresh, np.isfinite(texmap))
TA_hightex = mmg.peakT(texmap[hightex], taumap[hightex]*f_tau, nu=nu)
mask = TA_hightex < TA_ltau_thres # only renormalize high tex when Ta is less than the threshold
mask = np.logical_and(mask, taumap[hightex] < tau_thin)
texmap[hightex][mask] = tex_thin
taumap[hightex][mask] = mmg.get_tau(TA_hightex[mask], tex=tex_thin, nu=nu)
# note, tau values that are too low will be taken care of by refine_each_comp()
return taumap, texmap
[docs]
def refine_each_comp(guess_comp, mask=None, v_range=None, sig_range=None, tau_thresh=0.1):
# refine guesses for each component, with values outside ranges specified below removed
Tex_min = 3.0
Tex_max = 8.0
Tau_min = 0.2
Tau_max = 8.0
disksize = 1.0
if mask is None:
mask = master_mask(guess_comp)
if v_range is None:
vmin = None
vmax = None
else:
vmin, vmax =v_range
if sig_range is None:
sigmin = None
sigmax = None
else:
sigmin, sigmax = sig_range
guess_comp[0] = refine_guess(guess_comp[0], min=vmin, max=vmax, mask=mask, disksize=disksize)
guess_comp[1] = refine_guess(guess_comp[1], min=sigmin, max=sigmax, mask=mask, disksize=disksize)
# re-normalize the degenerated tau & text for the purpose of estimate guesses
guess_comp[3], guess_comp[2] = tautex_renorm(guess_comp[3], guess_comp[2], tau_thresh = tau_thresh)
# place a more "strict" limits for Tex and Tau guessing than the fitting itself
guess_comp[2] = refine_guess(guess_comp[2], min=Tex_min, max=Tex_max, mask=mask, disksize=disksize)
guess_comp[3] = refine_guess(guess_comp[3], min=Tau_min, max=Tau_max, mask=mask, disksize=disksize)
return guess_comp
[docs]
def simple_para_clean(pmaps, ncomp, npara=4, std_thres = 2):
# clean parameter maps based on their error values
pmaps=pmaps.copy()
# remove component with vlsrErr that is number of sigma off from the median as specified below
pmaps[pmaps == 0] = np.nan
# loop through each component
for i in range (0, ncomp):
# get the STD and Medians of the vlsr errors
std_vErr = mad_std(pmaps[(i+ncomp)*npara][np.isfinite(pmaps[(i+ncomp)*npara])])
median_vErr = np.median(pmaps[(i+ncomp)*npara][np.isfinite(pmaps[(i+ncomp)*npara])])
# remove outlier pixels
mask = pmaps[(i+ncomp)*npara] > median_vErr + std_vErr*std_thres
pmaps[i*npara:(i+1)*npara, mask] = np.nan
pmaps[(i+ncomp)*npara:(i+ncomp+1)*npara, mask] = np.nan
return pmaps
[docs]
def get_celestial_hdr(header):
# make a new header that only contains celestial (i.e., on-sky) information
new_hdr = WCS(header).celestial.to_header()
new_hdr['NAXIS1'] = header['NAXIS1']
new_hdr['NAXIS2'] = header['NAXIS2']
return new_hdr
[docs]
def master_mask(pcube):
# create a 2D mask over where any of the paramater map has finite values
mask = np.any(np.isfinite(pcube), axis=0)
mask = mask_cleaning(mask)
return mask
[docs]
def mask_cleaning(mask):
# designed to clean a noisy map, with a footprint that is likely slightly larger
#mask = remove_small_objects(mask, min_size=9) #pending investigation before removed permantly
mask = binary_dilation(mask, disk(1))
mask = remove_small_holes(mask, 9)
return mask
[docs]
def regrid(image, header1, header2, dmask=None, method='cubic'):
# similar to hcongrid from FITS_tools, but uses scipy.interpolate.griddata to interpolate over nan values
grid1 = get_pixel_mapping(header1, header2)
xline = np.arange(image.shape[1])
yline = np.arange(image.shape[0])
X,Y = np.meshgrid(xline, yline)
mask = np.isfinite(image)
if dmask is None:
dmask = np.ones(grid1[0].shape, dtype=bool)
return griddata((X[mask],Y[mask]), image[mask], (grid1[1]*dmask, grid1[0]*dmask), method=method, fill_value=np.nan)
[docs]
def refine_guess(map, min=None, max=None, mask=None, disksize=1, scipy_interpolate=False):
# refine parameter maps by outlier-fitering, masking, and interpolating
map = map.copy()
if min is not None:
map[map<min] = np.nan
if max is not None:
map[map>max] = np.nan
# check the number of finite pixels in the provided map
mask_finite = np.isfinite(map)
n_valid = np.sum(mask_finite)
# in case there are too few valid pixels in the provided map
if n_valid < 2:
# if there's a single pixel
if n_valid == 1:
map[:] = map[mask_finite][0]
# if there are no valid pixel in the guesses, set it to one of the limits or zero
elif min is not None:
if max is None:
map[:] = min
else:
map[:] = (max + min) / 2
elif max is not None:
map[:] = max
else:
map[:] = 0.0
return map
if mask is None:
mask = mask_finite
mask = mask_cleaning(mask)
def interpolate_scipy(map, mask):
# interpolate over the dmask footprint
xline = np.arange(map.shape[1])
yline = np.arange(map.shape[0])
X,Y = np.meshgrid(xline, yline)
itpmask = np.isfinite(map)
C = intp((X[itpmask],Y[itpmask]), map[itpmask])
# interpolate over the dmask footprint
zi = C(X*mask,Y*mask)
return zi
def interpolate_via_cnv(map, mask):
mask_finite = np.isfinite(map)
kernel = Gaussian2DKernel(2.5/2.355)
zi = convolve(map, kernel, boundary='extend')
# only populate pixels where the original map was finite
zi[mask_finite] = map[mask_finite]
# interpolate further to fill the mask if the mask is much larger than the kernel
zi = interpolate.iter_expand(zi, mask=mask)
return zi
if scipy_interpolate:
warn_msg = "The usage of scipy.interpolate for guess refine is deprecated and will be removed in a future version." \
"The default going forward will be astropy's convovle method."
warnings.warn(warn_msg, DeprecationWarning, stacklevel=2)
logger.warning(warn_msg)
try:
# interpolate the mask
zi = interpolate_scipy(map, mask)
except QhullError as e:
logger.warning("qhull input error found; astropy convolve will be used instead")
zi = interpolate_via_cnv(map, mask) # use astropy convolve as a proxy for interpolation
except ValueError as e:
logger.error("ValueError found (no points given); astropy convolve will be used instead")
zi = interpolate_via_cnv(map, mask)
else:
zi = interpolate_via_cnv(map, mask)
return zi
[docs]
def refine_2c_guess(guesses, f_sigv = 0.5):
# f_sigv is the fraction of sigma_v to increase the velocity seperation by
v1, v2 = guesses[0], guesses[4]
s1, s2 = guesses[1], guesses[5]
vdiff = v1 - v2
vdiff_sign = vdiff/np.abs(vdiff)
sigvsum = s1 + s2
# increase their v seperation by half of their total linewidth scaled by f_sigv
v1 = v1 + sigvsum/2 * f_sigv * vdiff_sign
v2 = v2 - sigvsum/2 * f_sigv * vdiff_sign
# reduce the linewidth guesses, since they tend to be overestimated
s1 = s1/2
s2 = s2/2
guesses[0], guesses[4] = v1, v2
guesses[1], guesses[5] = s1, s2
return guesses
[docs]
def save_guesses(paracube, header, savename, ncomp=2):
# a method to save the fitted parameter cube with relavent header information
import copy
from astropy.io import fits
npara = 4
hdr_new = copy.deepcopy(header)
# write the header information for each plane (i.e., map)
for i in range (0, ncomp):
hdr_new['PLANE{0}'.format(i*npara+0)] = 'VELOCITY_{0}'.format(i+1)
hdr_new['PLANE{0}'.format(i*npara+1)] = 'SIGMA_{0}'.format(i+1)
hdr_new['PLANE{0}'.format(i*npara+2)] = 'TEX_{0}'.format(i+1)
hdr_new['PLANE{0}'.format(i*npara+3)] = 'TAU_{0}'.format(i+1)
hdr_new['CDELT3']= 1
hdr_new['CTYPE3']= 'FITPAR'
hdr_new['CRVAL3']= 0
hdr_new['CRPIX3']= 1
fitcubefile = fits.PrimaryHDU(data=paracube, header=hdr_new)
fitcubefile.writeto(savename, overwrite=True)