"""
Key statistics functions
"""
import os
import copy
import math
import collections
import numpy as np
from numpy.lib.histograms import _hist_bin_fd
from scipy.interpolate import interp1d, CubicSpline
from scipy.ndimage.filters import gaussian_filter1d
from basta import freq_fit, glitch_fit
from basta import utils_seismic as su
from basta.constants import sydsun as sydc
from basta.constants import freqtypes, statdata
# Define named tuple used in selectedmodels
Trackstats = collections.namedtuple("Trackstats", "index logPDF chi2")
priorlogPDF = collections.namedtuple("Trackstats", "index logPDF chi2 bayw magw IMFw")
Trackdnusurf = collections.namedtuple("Trackdnusurf", "dnusurf")
Trackglitchpar = collections.namedtuple("Trackglitchpar", "AHe dHe tauHe")
def _weight(N, seisw):
"""
Determine weighting scheme dependent on given method
Parameters
----------
N : int
Number of data points
seisw : dict
User defined control of the weights, by "1/1", "1/N" or
"1/N-dof", and user defined "dof" and "N" if given
Returns
-------
w : int
Normalising factor
"""
# Overwrite N if specified by the user
if seisw["N"]:
N = int(seisw["N"])
# Select weight case
if seisw["weight"] == "1/1":
w = 1
elif seisw["weight"] == "1/N":
w = N
elif seisw["weight"] == "1/N-dof":
if not seisw["dof"]:
dof = 0
else:
dof = int(seisw["dof"])
w = N - dof
return w
[docs]
def chi2_astero(
obskey,
obs,
obsfreqmeta,
obsfreqdata,
obsintervals,
libitem,
ind,
fitfreqs,
warnings=True,
shapewarn=0,
debug=False,
verbose=False,
):
"""
`chi2_astero` calculates the chi2 contributions from the asteroseismic
fitting e.g., the frequency, ratio, or glitch fitting.
Parameters
----------
obskey : array
Array containing the angular degrees and radial orders of obs
obs : array
Array containing the modes in the observed data
obsfreqmeta : dict
The requested information about which frequency products to fit or
plot, unpacked for easier access later.
obsfreqdata : dict
Combinations of frequencies read from the frequency input files.
Contains ratios, glitches, epsilon differences and covariance matrices.
obsintervals : array
Array containing the endpoints of the intervals used in the frequency
fitting routine in :func:'freq_fit.calc_join'.
As it is the same in all iterations for the observed frequencies,
this is computed in su.prepare_obs once and given as an argument
in order to save time and memory.
libitem : hdf5 group
Contains the entire track of models being processed.
ind : int
The index in libitem corresponding to the current model being
processed.
fitfreqs : dict
Contains all user inputted frequency fitting options.
warnings : bool
If True, print something when it fails.
shapewarn : int
If a mismatch in array dimensions of the fitted parameters, or range
of frequencies is encountered, this is set to corresponding integer
in order to warn the user at the end of the run.
debug : str
Flag for print control.
verbose : str
Flag for print control.
Returns
-------
chi2rut : float
The calculated chi2 value for the given model to the observed data.
If the calculated value is less than zero, chi2rut will be np.inf.
warnings : bool
See 'warnings' above.
"""
# Additional parameters calculated during fitting
addpars = {"dnusurf": None, "glitchparams": None}
# Unpack model frequencies
rawmod = libitem["osc"][ind]
rawmodkey = libitem["osckey"][ind]
mod = su.transform_obj_array(rawmod)
modkey = su.transform_obj_array(rawmodkey)
# Determine which model modes correspond to observed modes
joins = freq_fit.calc_join(
mod=mod, modkey=modkey, obs=obs, obskey=obskey, obsintervals=obsintervals
)
if joins is None:
chi2rut = np.inf
return chi2rut, warnings, shapewarn, 0
else:
joinkeys, join = joins
nmodes = joinkeys[:, joinkeys[0, :] < 3].shape[1]
# Apply surface correction
if fitfreqs["fcor"] == "None":
corjoin = join
elif fitfreqs["fcor"] == "HK08":
corjoin, _ = freq_fit.HK08(
joinkeys=joinkeys,
join=join,
nuref=fitfreqs["numax"],
bcor=fitfreqs["bexp"],
)
elif fitfreqs["fcor"] == "BG14":
corjoin, _ = freq_fit.BG14(
joinkeys=joinkeys, join=join, scalnu=fitfreqs["numax"]
)
elif fitfreqs["fcor"] == "cubicBG14":
corjoin, _ = freq_fit.cubicBG14(
joinkeys=joinkeys, join=join, scalnu=fitfreqs["numax"]
)
else:
print(f'ERROR: fcor must be either "None" or in {freqtypes.surfeffcorrs}')
return
# Initialize chi2 value
chi2rut = 0.0
if any(x in freqtypes.freqs for x in fitfreqs["fittypes"]):
# The frequency correction moved up before the ratios fitting!
# --> If fitting frequencies, just add the already calculated things
x = corjoin[0, :] - corjoin[2, :]
w = _weight(len(corjoin[0, :]), fitfreqs["seismicweights"])
covinv = obsfreqdata["freqs"]["covinv"]
if x.shape[0] == covinv.shape[0]:
chi2rut += (x.T.dot(covinv).dot(x)) / w
else:
shapewarn = 1
chi2rut = np.inf
if ~np.isfinite(chi2rut):
chi2rut = np.inf
if debug and verbose:
print("DEBUG: Computed non-finite chi2, setting chi2 to inf")
elif chi2rut < 0:
chi2rut = np.inf
if debug and verbose:
print("DEBUG: chi2 less than zero, setting chi2 to inf")
# Add large frequency separation term (using corrected frequencies!)
# --> Equivalent to 'dnufit', but using the frequencies *after*
# applying the surface correction.
# --> Compared to the observed value, which is 'dnudata'.
if fitfreqs["dnufit_in_ratios"]:
# Read observed dnu
dnudata = obsfreqdata["freqs"]["dnudata"]
dnudata_err = obsfreqdata["freqs"]["dnudata_err"]
# Compute surface corrected dnu
dnusurf, _ = freq_fit.compute_dnu_wfit(joinkeys, corjoin, fitfreqs["numax"])
chi2rut += ((dnudata - dnusurf) / dnudata_err) ** 2
# Add the chi-square terms for ratios
if any(x in freqtypes.rtypes for x in fitfreqs["fittypes"]):
if not all(joinkeys[1, joinkeys[0, :] < 3] == joinkeys[2, joinkeys[0, :] < 3]):
chi2rut = np.inf
return chi2rut, warnings, shapewarn, 0
# Add frequency ratios terms
ratiotype = obsfreqmeta["ratios"]["fit"][0]
# Interpolate model ratios to observed frequencies
if fitfreqs["interp_ratios"]:
# Get all available model ratios
broadratio = freq_fit.compute_ratioseqs(
modkey,
mod,
ratiotype,
threepoint=fitfreqs["threepoint"],
)
modratio = copy.deepcopy(obsfreqdata[ratiotype]["data"])
# Seperate and interpolate within the separate r01, r10 and r02 sequences
for rtype in set(modratio[2, :]):
obsmask = modratio[2, :] == rtype
modmask = broadratio[2, :] == rtype
# Check we have the range to interpolate
if (
modratio[1, obsmask][0] < broadratio[1, modmask][0]
or modratio[1, obsmask][-1] > broadratio[1, modmask][-1]
):
chi2rut = np.inf
shapewarn = 2
return chi2rut, warnings, shapewarn, 0
intfunc = interp1d(
broadratio[1, modmask], broadratio[0, modmask], kind="linear"
)
modratio[0, obsmask] = intfunc(modratio[1, obsmask])
else:
modratio = freq_fit.compute_ratioseqs(
joinkeys, join, ratiotype, threepoint=fitfreqs["threepoint"]
)
# Calculate chi square with chosen asteroseismic weight
x = obsfreqdata[ratiotype]["data"][0, :] - modratio[0, :]
w = _weight(len(x), fitfreqs["seismicweights"])
covinv = obsfreqdata[ratiotype]["covinv"]
if x.shape[0] == covinv.shape[0]:
chi2rut += (x.T.dot(covinv).dot(x)) / w
else:
shapewarn = 1
chi2rut = np.inf
# Add contribution from glitches
if any(x in freqtypes.glitches for x in fitfreqs["fittypes"]):
# Obtain glitch sequence to be fitted
glitchtype = obsfreqmeta["glitch"]["fit"][0]
# Compute surface corrected dnu, if not already computed
if not fitfreqs["dnufit_in_ratios"]:
dnusurf, _ = freq_fit.compute_dnu_wfit(joinkeys, corjoin, fitfreqs["numax"])
# Assign acoustic depts for glitch search
ac_depths = {
"tauHe": libitem["tauhe"][ind],
"dtauHe": 100.0,
"tauCZ": libitem["taubcz"][ind],
"dtauCZ": 200.0,
}
# If interpolating in ratios (default), we need to do an extra step
if fitfreqs["interp_ratios"] and "r" in glitchtype:
# Get all ratios of model
broadglitches = glitch_fit.compute_glitchseqs(
joinkeys,
join,
glitchtype,
dnusurf,
fitfreqs,
ac_depths,
debug,
)
# Construct output array
modglitches = copy.deepcopy(obsfreqdata[glitchtype]["data"])
modglitches[0, -3:] = broadglitches[0, -3:]
# Separate and interpolate within the separate r01, r10 and r02 sequences
# rtype = {7, 8, 9} is glitch parameters, can't interpolate those
for rtype in set(modglitches[2, :]) - {7.0, 8.0, 9.0}:
joinmask = modglitches[2, :] == rtype
broadmask = broadglitches[2, :] == rtype
# Check we are in range
if (
modglitches[1, joinmask][0] < broadglitches[1, broadmask][0]
or modglitches[1, joinmask][-1] > broadglitches[1, broadmask][-1]
):
chi2rut = np.inf
shapewarn = 2
return chi2rut, warnings, shapewarn, addpars
intfunc = interp1d(
broadglitches[1, broadmask],
broadglitches[0, broadmask],
kind="linear",
)
modglitches[0, joinmask] = intfunc(modglitches[1, joinmask])
else:
# Get model glitch sequence
modglitches = glitch_fit.compute_glitchseqs(
joinkeys,
join,
glitchtype,
dnusurf,
fitfreqs,
ac_depths,
debug,
)
# Calculate chi square difference
x = obsfreqdata[glitchtype]["data"][0, :] - modglitches[0, :]
w = _weight(len(x), fitfreqs["seismicweights"])
covinv = obsfreqdata[glitchtype]["covinv"]
if x.shape[0] == covinv.shape[0] and not any(np.isnan(x)):
chi2rut += (x.T.dot(covinv).dot(x)) / w
else:
shapewarn = 1
chi2rut = np.inf
# Store the determined glitch parameters for outputting
addpars["glitchparams"] = modglitches[0, -3:]
if any([x in freqtypes.epsdiff for x in fitfreqs["fittypes"]]):
epsdifftype = list(set(fitfreqs["fittypes"]).intersection(freqtypes.epsdiff))[0]
obsepsdiff = obsfreqdata[epsdifftype]["data"]
# Purge model freqs of unused modes
l_available = [int(ll) for ll in obsepsdiff[2]]
index = np.zeros(mod.shape[1], dtype=bool)
for ll in [0, *l_available]:
index |= modkey[0] == ll
mod = mod[:, index]
modkey = modkey[:, index]
# Compute epsilon differences of the model
# --> (0: deps, 1: nu, 2: l, 3: n)
modepsdiff = freq_fit.compute_epsilondiffseqs(
modkey,
mod,
libitem["dnufit"][ind],
sequence=epsdifftype,
nsorting=fitfreqs["nsorting"],
)
# Mixed modes results in negative differences. Flag using nans
mask = np.where(modepsdiff[0, :] < 0)[0]
modepsdiff[0, mask] = np.nan
# Interpolate model epsdiff to the frequencies of the observations
evalepsdiff = np.zeros(obsepsdiff.shape[1])
evalepsdiff[:] = np.nan
for ll in l_available:
indobs = obsepsdiff[2] == ll
indmod = modepsdiff[2] == ll
nans = np.isnan(modepsdiff[0][indmod])
if sum(~nans) > 1:
spline = CubicSpline(
modepsdiff[1][indmod][~nans],
modepsdiff[0][indmod][~nans],
extrapolate=False,
)
evalepsdiff[indobs] = spline(obsepsdiff[1][indobs])
# Compute chi^2 of epsilon contribution
chi2rut = 0.0
x = obsepsdiff[0] - evalepsdiff
w = _weight(len(evalepsdiff), fitfreqs["seismicweights"])
covinv = obsfreqdata[epsdifftype]["covinv"]
chi2rut += (x.T.dot(covinv).dot(x)) / w
# Check extreme values
if any(np.isnan(evalepsdiff)):
chi2rut = np.inf
shapewarn = 3
elif ~np.isfinite(chi2rut) or chi2rut < 0:
chi2rut = np.inf
# Store dnusurf for output
if fitfreqs["dnufit_in_ratios"]:
addpars["dnusurf"] = dnusurf
return chi2rut, warnings, shapewarn, addpars
[docs]
def most_likely(selectedmodels):
"""
Find the index of the model with the highest probability
Parameters
----------
selectedmodels : dict
Contains information on all models with a non-zero likelihood.
Returns
-------
maxPDF_path : str
The path to the model with the highest probability.
maxPDF_ind : int
Index of the model with the highest probability.
"""
maxPDF = -np.inf
for path, trackstats in selectedmodels.items():
i = np.argmax(trackstats.logPDF)
if trackstats.logPDF[i] > maxPDF:
maxPDF = trackstats.logPDF[i]
maxPDF_path = path
maxPDF_ind = trackstats.index.nonzero()[0][i]
if maxPDF == -np.inf:
print("The logPDF are all -np.inf")
return maxPDF_path, maxPDF_ind
[docs]
def lowest_chi2(selectedmodels):
"""
Find the index of the model with the lowest chi2 (only gaussian fitting parameters)
Parameters
----------
selectedmodels : dict
Contains information on all models with a non-zero likelihood.
Returns
-------
minchi2_path : str
Path to the model with the lowest chi2.
minchi2_ind : int
Index of the model with the lowest chi2.
"""
minchi2 = np.inf
for path, trackstats in selectedmodels.items():
i = np.argmin(trackstats.chi2)
if trackstats.chi2[i] < minchi2:
minchi2 = trackstats.chi2[i]
minchi2_path = path
minchi2_ind = trackstats.index.nonzero()[0][i]
return minchi2_path, minchi2_ind
[docs]
def chi_for_plot(selectedmodels):
"""
FOR VALIDATION PLOTTING!
Could this be combined with the functions above in a wrapper?
Return chi2 value of HLM and LCM.
"""
# Get highest likelihood model (HLM)
maxPDF = -np.inf
minchi2 = np.inf
for _, trackstats in selectedmodels.items():
i = np.argmax(trackstats.logPDF)
j = np.argmin(trackstats.chi2)
if trackstats.logPDF[i] > maxPDF:
maxPDF = trackstats.logPDF[i]
maxPDFchi2 = trackstats.chi2[i]
if trackstats.chi2[j] < minchi2:
minchi2 = trackstats.chi2[j]
return maxPDFchi2, minchi2
[docs]
def get_highest_likelihood(Grid, selectedmodels, inputparams):
"""
Find highest likelihood model and print info.
Parameters
----------
Grid : hdf5 object
The already loaded grid, containing the tracks/isochrones.
selectedmodels : dict
Contains information on all models with a non-zero likelihood.
inputparams : dict
The standard bundle of all fitting information
Returns
-------
maxPDF_path : str
Path to model with maximum likelihood.
maxPDF_ind : int
Index of model with maximum likelihood.
"""
print("\nHighest likelihood model:")
maxPDF_path, maxPDF_ind = most_likely(selectedmodels)
print(
"* Weighted, non-normalized log-probability:",
np.max(selectedmodels[maxPDF_path].logPDF),
)
print("* Grid-index: {0}[{1}], with parameters:".format(maxPDF_path, maxPDF_ind))
# Print name if it exists
if "name" in Grid[maxPDF_path]:
print(" - Name:", Grid[maxPDF_path + "/name"][maxPDF_ind].decode("utf-8"))
# Print parameters
outparams = inputparams["asciiparams"]
dnu_scales = inputparams.get("dnu_scales", {})
for param in outparams:
if param == "distance":
continue
paramval = Grid[os.path.join(maxPDF_path, param)][maxPDF_ind]
# Handle the scaled asteroseismic parameters
if param.startswith("dnu") and param not in ["dnufit", "dnufitMos12"]:
dnu_rescal = dnu_scales.get(param, 1.00)
scaleval = paramval * inputparams.get("dnusun", sydc.SUNdnu) / dnu_rescal
elif param.startswith("numax"):
scaleval = paramval * inputparams.get("numsun", sydc.SUNnumax)
elif param in ["dnufit", "dnufitMos12"]:
scaleval = paramval / dnu_scales.get(param, 1.00)
else:
scaleval = None
if scaleval:
scaleprt = f"(after rescaling: {scaleval:12.6f})"
else:
scaleprt = ""
print(" - {0:10}: {1:12.6f} {2}".format(param, paramval, scaleprt))
return maxPDF_path, maxPDF_ind
[docs]
def get_lowest_chi2(Grid, selectedmodels, inputparams):
"""
Find model with lowest chi2 value and print info.
Parameters
----------
Grid : hdf5 object
The already loaded grid, containing the tracks/isochrones.
selectedmodels : dict
Contains information on all models with a non-zero likelihood.
inputparams : dict
The standard bundle of all fitting information
Returns
-------
minchi2_path : str
Path to model with lowest chi2
minchi2_ind : int
Index of model with lowest chi2
"""
print("\nLowest chi2 model:")
minchi2_path, minchi2_ind = lowest_chi2(selectedmodels)
print("* chi2:", np.min(selectedmodels[minchi2_path].chi2))
print("* Grid-index: {0}[{1}], with parameters:".format(minchi2_path, minchi2_ind))
# Print name if it exists
if "name" in Grid[minchi2_path]:
print(" - Name:", Grid[minchi2_path + "/name"][minchi2_ind].decode("utf-8"))
# Print parameters
outparams = inputparams["asciiparams"]
dnu_scales = inputparams.get("dnu_scales", {})
for param in outparams:
if param == "distance":
continue
paramval = Grid[os.path.join(minchi2_path, param)][minchi2_ind]
# Handle the scaled asteroseismic parameters
if param.startswith("dnu") and param not in ["dnufit", "dnufitMos12"]:
dnu_rescal = dnu_scales.get(param, 1.00)
scaleval = paramval * inputparams.get("dnusun", sydc.SUNdnu) / dnu_rescal
elif param.startswith("numax"):
scaleval = paramval * inputparams.get("numsun", sydc.SUNnumax)
elif param in ["dnufit", "dnufitMos12"]:
scaleval = paramval / dnu_scales.get(param, 1.00)
else:
scaleval = None
if scaleval:
scaleprt = f"(after rescaling: {scaleval:12.6f})"
else:
scaleprt = ""
print(" - {0:10}: {1:12.6f} {2}".format(param, paramval, scaleprt))
return minchi2_path, minchi2_ind
[docs]
def quantile_1D(data, weights, quantile):
"""
Compute the weighted quantile of a 1D numpy array.
The function is borrowed from the Python package wquantiles
Parameters
----------
data : ndarray
Input array (one dimension).
weights : ndarray
Array with the weights of the same size of `data`.
quantile : float
Quantile to compute. It must have a value between 0 and 1.
Returns
-------
result : float
The output value.
"""
# Sort the data
ind_sorted = np.argsort(data)
sorted_data = data[ind_sorted]
sorted_weights = weights[ind_sorted]
# Compute the auxiliary arrays
Sn = np.cumsum(sorted_weights)
Pn = (Sn - 0.5 * sorted_weights) / np.sum(sorted_weights)
result = np.interp(quantile, Pn, sorted_data)
assert not np.any(np.isnan(result)), "NaN encounted in quantile_1D."
return result
[docs]
def posterior(x, nonzeroprop, sampled_indices, nsigma=0.25):
"""
Compute posterior of x as a weighted histogram of x with weights y.
The bin width is determined by the Freedman Diaconis Estimator
The histogram is smoothed using a Gaussian kernel with a width
of nsigma*sigma
Parameters
----------
x : array
samples
y : array
likelihood of samples
nonzeroprop : array
indices of x with non-zero likelihood
sampled_indices :
indices of weighted draws from y, used for bin estimation
nsigma : float
fractional standard deviation used for smoothing
Returns
-------
like : function
interpolated posterior
"""
samples = x[nonzeroprop][sampled_indices]
xmin, xmax = np.nanmin(samples), np.nanmax(samples)
# Check if all entries of x are equal
if np.all(x == x[0]) or math.isclose(xmin, xmax, rel_tol=1e-5):
xvalues = np.array([x[0], x[0]])
like = np.array([1.0, 1.0])
like = interp1d(xvalues, like, fill_value=0, bounds_error=False)
return like
# Compute bin width and number of bins
N = len(samples)
bin_width = _hist_bin_fd(samples, None)
if math.isclose(bin_width, 0, rel_tol=1e-5):
nbins = int(np.ceil(np.sqrt(N)))
else:
nbins = int(np.ceil((xmax - xmin) / bin_width)) + 1
if nbins > N:
nbins = int(np.ceil(np.sqrt(N)))
bin_edges = np.linspace(xmin, xmax, nbins)
xvalues = bin_edges[:-1] + np.diff(bin_edges) / 2.0
sigma = np.std(samples)
like = np.histogram(samples, bins=bin_edges, density=True)[0]
if sigma > 0 and bin_width > 0:
like = gaussian_filter1d(like, (nsigma * sigma / bin_width))
like = interp1d(xvalues, like, fill_value=0, bounds_error=False)
return like
[docs]
def calc_key_stats(x, centroid, uncert, weights=None):
"""
Calculate and report the wanted format of centroid value and
uncertainties.
Parameters
----------
x : list
Parameter values of the models with non-zero likelihood.
centroid : str
Options for centroid value definition, 'median' or 'mean'.
uncert : str
Options for reported uncertainty format, 'quantiles' or
'std' for standard deviation.
weights : list
Weights needed to be applied to x before extracting statistics.
Returns
-------
xcen : float
Centroid value
xm : float
16'th percentile if quantile selected, 1 sigma for standard
deviation.
xp : float, None
84'th percentile if quantile selected, None for standard
deviation.
"""
xp = None
# Handling af all different combinations of input
if uncert == "quantiles" and not type(weights) == type(None):
xcen, xm, xp = quantile_1D(x, weights, statdata.quantiles)
elif uncert == "quantiles":
xcen, xm, xp = np.quantile(x, statdata.quantiles)
else:
xm = np.std(x)
if centroid == "mean" and not type(weights) == type(None):
xcen = np.average(x, weights=weights)
elif centroid == "mean":
xcen = np.mean(x)
elif uncert != "quantiles":
xcen = np.median(x)
return xcen, xm, xp