Source code for distances

"""
Parallax fitting and computation of distances
"""

import os
import warnings
import collections
from bisect import bisect_left
from typing import Callable

import h5py
import numpy as np
import scipy.stats
from scipy.interpolate import interp1d
from astropy.coordinates import SkyCoord
from healpy import ang2pix
from dustmaps.sfd import SFDQuery
from dustmaps.bayestar import BayestarWebQuery
from astropy.utils.exceptions import AstropyWarning

import basta.utils_distances as udist
import basta.constants as cnsts
import basta.stats as stats

import matplotlib

matplotlib.use("Agg")
import matplotlib.pyplot as plt

# Don't print Astropy warnings (catch error caused by mock'ing astropy in Sphinx)
try:
    warnings.filterwarnings("ignore", category=AstropyWarning)
except AssertionError:
    pass

try:
    from basta._dustpath import __dustpath__
except ModuleNotFoundError:
    print("\nCannot find path to dustmaps. Did you run 'setup.py'?\n")
    raise


[docs] def get_EBV_along_LOS(distanceparams: dict) -> Callable[[np.array], np.array]: """ Returns color excess E(B-V) for a line of sight, mainly using a pre-downloaded 3D extinction map provided by Green et al. 2015/2018 - see http://argonaut.skymaps.info/. The extinction map is only computed for distance modulus between :math:`4 < m-M < 19` in units of magnitude. Parameters ---------- distanceparams : dict Dictionary with parameters required for constructing likelihood for absolute magnitude constraint, i.e. coordinates, given dustframe, etc.. Returns ------- EBV_along_LOS : function Reddening or excess color function along the given line-of-sight """ if "EBV" in distanceparams: return lambda x: np.ones(len(x)) * distanceparams["EBV"][1] # Convert to galactic coordinates frame = distanceparams["dustframe"] if frame == "icrs": ra = distanceparams["RA"] dec = distanceparams["DEC"] c = SkyCoord(ra=ra, dec=dec, frame="icrs", unit="deg") elif frame == "galactic": lon = distanceparams["lon"] lat = distanceparams["lat"] c = SkyCoord(l=lon, b=lat, frame="galactic", unit="deg") else: raise ValueError("Unknown dust map frame for computing reddening!") # Load extinction data cube pathmap = os.path.join(__dustpath__, "bayestar/bayestar2019.h5") dcube = h5py.File(pathmap, "r") # Distance modulus bins bin_edges = dcube["/pixel_info"].attrs["DM_bin_edges"] dmbin = bin_edges + (bin_edges[1] - bin_edges[0]) / 2.0 # If webquery fails use local copy of dustmap try: bayestar = BayestarWebQuery(version="bayestar2019") Egr_samples = bayestar(c, mode="samples") except Exception: # contains positional info pinfo = dcube["/pixel_info"][:] nsides = np.unique(dcube["/pixel_info"][:]["nside"]) # Convert coordinates to galactic frame lon = c.galactic.l.deg lat = c.galactic.b.deg # Convert l,b[deg] to theta,phi[rad] theta = np.pi / 2.0 - lat * np.pi / 180.0 phi = lon * np.pi / 180.0 # To check if we are within the maps coordinates Egr_samples = np.array([np.nan]) # Run through nsides for nside in reversed(nsides): healpixNside = ang2pix(nside, theta, phi, nest=True) # Find the one with the correct nside and the correct healpix indNside = [ i for i, x in enumerate(pinfo) if x[0] == nside and x[1] == healpixNside ] if indNside: index = indNside[0] Egr_samples = dcube["/samples"][index] break # If coordinates outside dust map, use Schegel if np.isnan(Egr_samples).any(): print("WARNING: Coordinates outside dust map boundaries!") print("Default to Schegel 1998 dust map") sfd = SFDQuery() EBV_along_LOS = lambda x: np.full_like(x, sfd(c)) return EBV_along_LOS Egr_med, Egr_err = [], [] for i in range(len(dmbin)): Egr_med.append(np.nanmedian(Egr_samples[:, i])) Egr_err.append(np.nanstd(Egr_samples[:, i])) Egr_med_fun = interp1d( dmbin, Egr_med, bounds_error=False, fill_value=(0, np.max(Egr_med)) ) Egr_err_fun = interp1d( dmbin, Egr_err, bounds_error=False, fill_value=np.max(Egr_err) ) dcube.close() def EBV_along_LOS(dm): Egr = np.asarray(np.random.normal(Egr_med_fun(dm), Egr_err_fun(dm))) EBV = cnsts.extinction.Conv_Bayestar * Egr return EBV return EBV_along_LOS
[docs] def get_EBV( dist: np.array, EBV_along_LOS: Callable[[np.array], np.array], debug: bool = False, debug_dirpath: str = "", ) -> np.array: """ Estimate E(B-V) by drawing distances from a normal parallax distribution with EDSD prior. Parameters ----- dist : array The drawn distances EBV_along_LOS : func EBV function. debug : bool, optional Debug flag. If True, this function outputs two plots, one of distance modulus vs. E(B-V) and a histogram of the E(B-V). debug_dirpath : str, optional Name of directory of where to put plots outputted if debug is True. Returns ------- EBVs : array E(B-V) at distances specified in `dist` along the line-of-sight """ dmod = 5 * np.log10(dist / 10) EBVs = EBV_along_LOS(dmod) if debug: plt.figure() plt.plot(dmod, EBVs, ".") plt.xlabel("dmod") plt.ylabel("E(B-V)") plt.savefig(debug_dirpath + "_DEBUG_dmod_EBVs.png") plt.close() return EBVs
[docs] def get_absorption(EBV: np.array, fitparams: dict, filt: str) -> np.array: """ Compute extinction coefficient Rzeta for band zeta. Using parameterized law from Casagrande & VandenBerg 2014. Valid for: logg = 4.1 Teff = 5250 - 7000K Fe/H = -2.0 - 0.25 a/Fe = -0.4 - 0.4 Assume nominal reddening law with RV=3.1. In a band zeta, Azeta = Rzeta*E(B-V). Parameters ---------- EBV : array E(B-V) values fitparams : dict The fitting params in inputparams. filt : str Name of the given filter Returns ------- R*EBV : array Extinction coefficient times E(B-V) """ N = len(EBV) table = cnsts.extinction.R i_filter = table["Filter"] == filt if not any(i_filter) or table["RZ_mean"][i_filter] == 0: print("WARNING: Unknown extinction coefficient for filter: " + filt) print(" Using reddening law coefficient R = 0.") return np.zeros(N) metal = "MeH" if "MeH" in fitparams else "FeH" if "Teff" not in fitparams or metal not in fitparams: R = np.ones_like(EBV) * table["RZ_mean"][i_filter].item() else: Teff_val, Teff_err = fitparams["Teff"] metal_val, metal_err = fitparams[metal] Teff = np.random.normal(Teff_val, Teff_err, size=N) FeH = np.random.normal(metal_val, metal_err, size=N) a0 = table["a0"][i_filter].item() a1 = table["a1"][i_filter].item() a2 = table["a2"][i_filter].item() a3 = table["a3"][i_filter].item() T4 = 1e-4 * Teff R = a0 + T4 * (a1 + a2 * T4) + a3 * FeH return R * EBV
[docs] def add_absolute_magnitudes( inputparams: dict, n: int = 1000, k: int = 1000, use_gaussian_priors: bool = False, debug=False, debug_dirpath="", ) -> dict: """ Convert apparent magnitudes to absolute magnitudes using the distance and add it to `inputparams`. Extinction E(B-V) is estimated based on Green et al. (2015) dust map. Extinction is converted to reddening using Casagrande & VandenBerg 2014. The converted colors and magnitudes are added to fitsparams. Parameters ---------- inputparams : dict Inputparams used in BASTA run. n : int Number of samples from parallax range k : int Number of samples from apparent magnitude range. debug_dirpath : str, optional Name of directory of where to put plots outputted if debug is True. debug : bool, optional Debug flag. If True, debugging plots will be outputted. use_gaussian_priors : bool, optional If True, gaussian priors will be used for apparent magnitude in the distance computation. Returns ------- inputparams : dict Modified version of inputparams including absolute magnitudes. """ if "parallax" not in inputparams["fitparams"]: return inputparams print("\nPreparing distance/parallax/magnitude input ...", flush=True) fitparams = inputparams["fitparams"] distanceparams = inputparams["distanceparams"] if use_gaussian_priors: inputparams["fitparams"][filt] = [val, std] return inputparams # Get apparent magnitudes from input data mobs = distanceparams["m"] mobs_err = distanceparams["m_err"] if len(mobs.keys()) == 0: raise ValueError("No filters were given") # Convert the inputted parallax in mas to as plxobs = fitparams["parallax"][0] * 1e-3 plxobs_err = fitparams["parallax"][1] * 1e-3 L = udist.EDSD(None, None) * 1e3 fitparams.pop("parallax") # Sample distances more densely around the mode of the distance distribution # See Bailer-Jones 2015, Eq 19 coeffs = [1 / L, -2, plxobs / (plxobs_err**2), -1 / (plxobs_err**2)] roots = np.roots(coeffs) if np.sum((np.isreal(roots))) == 1: (mode,) = np.real(roots[np.isreal(roots)]) else: assert np.sum((np.isreal(roots))) == 3 if plxobs >= 0: mode = np.amin(np.real(roots[np.isreal(roots)])) else: (mode,) = np.real(roots[roots > 0]) # By sampling linearly in quantiles, the probablity mass is equal for the samples bla = scipy.stats.norm.cdf(0, loc=mode, scale=1000) + 0.01 dist = scipy.stats.norm.ppf( np.linspace(bla, 0.96, n - n // 2), loc=mode, scale=1000 ) # We also want to sample across the entire range. lindist = 10 ** np.linspace(-0.4, 4.4, n // 2) assert np.all(np.isfinite(dist)) assert np.all(dist > 0) dist = np.concatenate([dist, lindist]) dist = np.sort(dist) lldist = udist.compute_distlikelihoods( dist, plxobs, plxobs_err, L, debug=debug, debug_dirpath=debug_dirpath, ) dists = np.repeat(dist, k) lldists = np.repeat(lldist, k) # Get EBV values EBV_along_LOS = get_EBV_along_LOS(distanceparams=distanceparams) EBV = get_EBV( dist=dist, EBV_along_LOS=EBV_along_LOS, debug=debug, debug_dirpath=debug_dirpath ) EBVs = np.repeat(EBV, k) distanceparams["As"] = {} llabsms_joined = np.zeros(n * k) for filt in mobs.keys(): # Sample apparent magnitudes over the entire parameter range if filt in cnsts.distanceranges.filters: m = np.linspace( cnsts.distanceranges.filters[filt]["min"], cnsts.distanceranges.filters[filt]["max"], k - k // 2, ) else: m = np.linspace(-10, 25, k - k // 2) m = np.concatenate( [ m, scipy.stats.norm.ppf( np.linspace(0.04, 0.96, k // 2), loc=mobs[filt], scale=mobs_err[filt], ), ] ) m = np.sort(m) llm = udist.compute_mslikelihoods(m, mobs[filt], mobs_err[filt]) ms = np.tile(m, n) llms = np.tile(llm, n) assert len(dists) == len(ms) == n * k A = get_absorption(EBV, fitparams, filt) As = np.repeat(A, k) absms = udist.compute_absmag(ms, dists, As) # Construct likelihood distribution llabsms = llms + lldists llabsms_joined += llabsms llabsms -= np.amax(llabsms) labsms = np.exp(llabsms - np.log(np.sum(np.exp(llabsms)))) # Create prior by interpolating a histogram and add it to inputparams # Use bins from non-weighted histogram in the weighted histogram bin_edges = np.histogram_bin_edges(absms, bins="auto") like, bins = np.histogram(absms, bins=bin_edges, weights=labsms, density=True) M_prior = interp1d( bins[:-1] + np.diff(bins) / 2.0, like, fill_value=0, bounds_error=False ) if debug: plt.figure() plt.hist(absms, bins=150, weights=labsms) plt.xlabel("Abs %s" % filt) plt.savefig(debug_dirpath + "_DEBUG_absms_%s.png" % filt) plt.close() absms_qs = stats.quantile_1D(absms, labsms, cnsts.statdata.quantiles) distanceparams["As"][filt] = list( stats.quantile_1D(As, labsms, cnsts.statdata.quantiles) ) # Extended dictionary for use in Kiel diagram, and clarifying it as a prior inputparams["magnitudes"][filt] = { "prior": M_prior, "median": absms_qs[0], "errp": np.abs(absms_qs[2] - absms_qs[0]), "errm": np.abs(absms_qs[0] - absms_qs[1]), } # Get an estimate from all filters labsms_joined = np.exp(llabsms_joined - np.log(np.sum(np.exp(llabsms_joined)))) distanceparams["priorEBV"] = list( stats.quantile_1D(EBVs, labsms_joined, cnsts.statdata.quantiles) ) distanceparams["priordistance"] = list( stats.quantile_1D(dists, labsms_joined, cnsts.statdata.quantiles) ) # Constrain metallicity within the limits of the color transformations metal = "MeH" if "MeH" in inputparams["limits"] else "FeH" metal_limit = inputparams["limits"].get(metal) if not metal_limit: inputparams["limits"][metal] = [ cnsts.metallicityranges.values["metallicity"]["min"], cnsts.metallicityranges.values["metallicity"]["max"], ] else: if metal_limit[0] < cnsts.metallicityranges.values["metallicity"]["min"]: inputparams["limits"][metal][0] = cnsts.metallicityranges.values[ "metallicity" ]["min"] if metal_limit[1] > cnsts.metallicityranges.values["metallicity"]["max"]: inputparams["limits"][metal][1] = cnsts.metallicityranges.values[ "metallicity" ]["max"] print("Done!") return inputparams