Source code for fileio

"""
Auxiliary functions for file operations
"""
import os
import json
import h5py
import warnings
from io import IOBase
from copy import deepcopy
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, SubElement, tostring
from xml.dom import minidom

import numpy as np

from basta import stats, freq_fit, glitch_fit
from basta import utils_seismic as su
from basta import utils_general as util
from basta.constants import freqtypes


def _export_selectedmodels(selectedmodels):
    res = {}
    for trackno, ts in selectedmodels.items():
        index = ts.index.nonzero()[0].tolist()
        res[trackno] = {
            "chi2": ts.chi2.tolist(),
            "index": index,
            "n": len(ts.index),
            "logPDF": ts.logPDF.tolist(),
        }

    return res


def _import_selectedmodels(data):
    res = {}
    for trackno, ts in data.items():
        index = np.zeros(ts["n"], dtype=bool)
        index[ts["index"]] = True
        res[trackno] = stats.Trackstats(
            chi2=np.asarray(ts["chi2"]), index=index, logPDF=np.asarray(ts["logPDF"])
        )

    return res


def save_selectedmodels(fname, selectedmodels):
    s = json.dumps(_export_selectedmodels(selectedmodels))
    with open(fname, "w") as fp:
        fp.write(s)


def load_selectedmodels(fname):
    with open(fname) as fp:
        data = json.load(fp)

    return _import_selectedmodels(data)


[docs] def write_star_to_errfile(starid, inputparams, errormessage): """ Write starid and error message to .err-file Parameters ---------- starid : str Unique identifier for this target. inputparams : dict Dictionary of all controls and input. errormessage : str String explaining error which will be written to the .err-file """ errfile = inputparams.get("erroutput") if isinstance(errfile, IOBase): errfile.write("{}\t{}\n".format(starid, errormessage)) else: with open(errfile, "a") as ef: ef.write("{}\t{}\n".format(starid, errormessage))
[docs] def no_models(starid, inputparams, errormessage): """ If no models are found in the grid, create an outputfile with nans (for consistency reasons). The approach mirrors process_output.compute_posterior() Parameters ---------- starid : str Unique identifier for this target. inputparams : dict Dictionary of all controls and input. errormessage : str String explaining error which will be written to the .err-file """ # Extract the output parameters asciifile = inputparams.get("asciioutput") asciifile_dist = inputparams.get("asciioutput_dist") params = deepcopy(inputparams["asciiparams"]) # Init vectors and add the Star ID hout = [] out = [] hout_dist = [] out_dist = [] hout.append("starid") out.append(starid) hout_dist.append("starid") out_dist.append(starid) uncert = inputparams.get("uncert") # The distance parameters if "distance" in params: distanceparams = inputparams["distanceparams"] ms = list(distanceparams["filters"]) for m in ms: hout_dist, out_dist = util.add_out( hout_dist, out_dist, "distance_" + m, np.nan, np.nan, np.nan, uncert ) hout_dist, out_dist = util.add_out( hout_dist, out_dist, "A_" + m, np.nan, np.nan, np.nan, uncert ) hout_dist, out_dist = util.add_out( hout_dist, out_dist, "M_" + m, np.nan, np.nan, np.nan, uncert ) hout_dist, out_dist = util.add_out( hout_dist, out_dist, "distance", np.nan, np.nan, np.nan, uncert ) hout_dist, out_dist = util.add_out( hout_dist, out_dist, "EBV_", np.nan, np.nan, np.nan, uncert ) hout, out = util.add_out(hout, out, "distance", np.nan, np.nan, np.nan, uncert) params.remove("distance") # The normal parameters for param in params: hout, out = util.add_out(hout, out, param, np.nan, np.nan, np.nan, uncert) # Write to file if asciifile is not False: hline = b"# " for i in range(len(hout)): hline += hout[i].encode() + " ".encode() if isinstance(asciifile, IOBase): asciifile.seek(0) if b"#" not in asciifile.readline(): asciifile.write(hline + b"\n") np.savetxt( asciifile, np.asarray(out).reshape(1, len(out)), fmt="%s", delimiter=" " ) print("Saved results to " + asciifile.name + ".") elif asciifile is False: pass else: np.savetxt( asciifile, np.asarray(out).reshape(1, len(out)), fmt="%s", header=hline, delimiter=" ", ) print("Saved results to " + asciifile + ".") # Write to file if asciifile_dist: hline = b"# " for i in range(len(hout_dist)): hline += hout_dist[i].encode() + " ".encode() if isinstance(asciifile_dist, IOBase): asciifile_dist.seek(0) if b"#" not in asciifile_dist.readline(): asciifile_dist.write(hline + b"\n") np.savetxt( asciifile_dist, np.asarray(out_dist).reshape(1, len(out_dist)), fmt="%s", delimiter=" ", ) print( "Saved distance results for different filters to " + asciifile_dist.name + "." ) else: np.savetxt( asciifile_dist, np.asarray(out_dist).reshape(1, len(out_dist)), fmt="%s", header=hline, delimiter=" ", ) print( "Saved distance results for different filters to " + asciifile_dist + "." ) write_star_to_errfile(starid, inputparams, errormessage)
[docs] def read_freq_xml(filename): """ Read frequencies from an xml file Parameters ---------- filename : str Name of file to read Returns ------- frequencies : array Individual frequencies errors : array Uncertainties in the frequencies orders : array Radial orders degrees : array Angular degree """ # Parse the XML file: tree = ElementTree.parse(filename) root = tree.getroot() # Find a list of all the frequency ratios: freq_modes = root.findall("mode") # Simply get the orders and frequency ratios as numpy arrays: orders = np.array( [mode.find("order").get("value") for mode in freq_modes], dtype=int ) degrees = np.array( [mode.find("degree").get("value") for mode in freq_modes], dtype=int ) errors = np.array( [mode.find("frequency").get("error") for mode in freq_modes], dtype="float64" ) frequencies = np.array( [mode.find("frequency").get("value") for mode in freq_modes], dtype="float64" ) return frequencies, errors, orders, degrees
def _read_freq_cov_xml(filename, obskey): """ Read frequency covariances from xml-file Parameters ---------- filename : str Name of xml-file containing the frequencies obskey : array Observed modes after filtering Returns ------- corr : array Correlation matrix of frequencies cov : array Covariance matrix of frequencies """ # Parse the XML file: tree = ElementTree.parse(filename) root = tree.getroot() # Find a list of all the frequency ratios: freqs = root.findall("mode") # Set up the matrices for collection corr = np.identity(len(obskey[0, :])) cov = np.zeros((len(obskey[0, :]), len(obskey[0, :]))) # Loop over all modes to collect input for i, mode1 in enumerate(freqs): id1 = mode1.get("id") column = root.findall("frequency_corr[@id1='%s']" % id1) if not len(column): errmsg = "Correlations in frequency fit requested, but not pr" errmsg += "ovided! If not available, set 'correlations=False'." raise KeyError(errmsg) # Loop over possible all modes again for j, mode2 in enumerate(freqs): id2 = mode2.get("id") # Loop to find matching entries in matrix for row in column: if row.get("id2") == id2: n1 = int(mode1.find("order").get("value")) l1 = int(mode1.find("degree").get("value")) n2 = int(mode2.find("order").get("value")) l2 = int(mode2.find("degree").get("value")) i = np.where(np.logical_and(obskey[0, :] == l1, obskey[1, :] == n1)) j = np.where(np.logical_and(obskey[0, :] == l2, obskey[1, :] == n2)) corr[i, j] = corr[j, i] = row.find("correlation").get("value") cov[i, j] = cov[j, i] = row.find("covariance").get("value") break return corr, cov
[docs] def read_freq(filename, excludemodes=None, onlyradial=False, covarfre=False): """ Routine to extract the frequencies in the desired n-range, and the corresponding covariance matrix Parameters ---------- filename : str Name of file to read excludemodes : str or None, optional Name of file containing the (l, n) values of frequencies to be omitted in the fit. If None, no modes will be excluded. onlyradial : bool Flag to determine to only fit the l=0 modes covarfre : bool, optional Read also the covariances in the individual frequencies Returns ------- obskey : array Array containing the angular degrees and radial orders of obs obs : array Individual frequencies covarfreq : array Array including covariances in the frequencies. If ``covarfre`` is ``False`` then a diagonal matrix is produced """ # Read frequencies from file frecu, errors, norder, ldegree = read_freq_xml(filename) # Build osc and osckey in a sorted manner f = np.asarray([]) n = np.asarray([]) e = np.asarray([]) l = np.asarray([]) for li in [0, 1, 2]: given_l = ldegree == li incrn = np.argsort(norder[given_l], kind="mergesort") l = np.concatenate([l, ldegree[given_l][incrn]]) n = np.concatenate([n, norder[given_l][incrn]]) f = np.concatenate([f, frecu[given_l][incrn]]) e = np.concatenate([e, errors[given_l][incrn]]) assert len(f) == len(n) == len(e) == len(l) obskey = np.asarray([l, n], dtype=int) obs = np.array([f, e]) # Remove untrusted frequencies if excludemodes not in [None, "", "None", "none", "False", "false"]: with warnings.catch_warnings(): warnings.simplefilter("ignore") nottrustedobskey = np.genfromtxt(excludemodes, comments="#", encoding=None) # If there is only one not-trusted mode, it misses a dimension if nottrustedobskey.size == 0: print("File for not-trusted frequencies was empty") else: if nottrustedobskey.shape == (2,): nottrustedobskey = [nottrustedobskey] for l, n in nottrustedobskey: nottrustedmask = (obskey[0] == l) & (obskey[1] == n) print( *(f"Removed mode at {x} µHz" for x in obs[:, nottrustedmask][0]), sep="\n", ) obskey = obskey[:, ~nottrustedmask] obs = obs[:, ~nottrustedmask] if onlyradial: for l in [1, 2, 3]: nottrustedmask = obskey[0] == l print( *(f"Removed mode at {x} µHz" for x in obs[:, nottrustedmask][0]), sep="\n", ) obskey = obskey[:, ~nottrustedmask] obs = obs[:, ~nottrustedmask] if covarfre: corrfre, covarfreq = _read_freq_cov_xml(filename, obskey) else: covarfreq = np.diag(obs[1, :]) ** 2 return obskey, obs, covarfreq
def _read_precomputed_glitches( filename: str, type: str = "glitches" ) -> tuple[np.array, np.array]: """ Read glitch parameters. If fitted together with ratios, these must be provided in this file as well, for covariance between them. Parameters ---------- filename : str Name of file to read grtype : str Parameter combination to be read from: glitches, gr02, gr01, gr10 gr010, gr012, gr102. Returns ------- gdata : array Array of median glitch parameters (and ratios) gcov : array Covariance matrix, for glitch parameters or glitch parameters and ratios """ # Read datafile try: datfile = h5py.File(filename, "r") except: return NameError("Could not find f{filename}") # Read ratio type in file, and check that it matches requested fittype if type != "glitches": rtype = datfile["rto/rtype"][()].decode("utf-8") if rtype != type[1:]: raise KeyError(f"Requested ratio type {type[1:]} not found in {filename}") # Read data and covariance matrix gdata = datfile["cov/params"][()] gcov = datfile["cov/cov"][()] return gdata, gcov def _read_precomputed_ratios_xml( filename, ratiotype, obskey, obs, excludemodes=None, correlations=True ): """ Read the precomputed ratios and covariance matrix from xml-file. Parameters ---------- filename : str Path to xml-file to be read ratiotype : str Ratio sequence to be read, see constants.freqtypes.rtypes for possible sequences. obskey : array Array containing the angular degrees and radial orders of obs obs : array Individual frequencies and uncertainties. excludemodes : str or None, optional Name of file containing the (l, n) values of frequencies to be omitted in the fit. Does however only trigger a warning for precomputed ratios. correlations : bool True for reading covariance matrix from xml, False to assume no correlations, and simply use individual errors on ratios. Returns ------- ratios : array Contains frequency ratios, their identifying integers of sequence and radial order, and their frequency location (matching l=0 frequency). cov : array Covariance matrix matching the ratios. """ # Print warning to user if excludemodes != None: wstr = "Warning: Removing precomputed ratios based on " wstr += "not-trusted-file is not yet supported!" print(wstr) # Read in xml tree/root tree = ElementTree.parse(filename) root = tree.getroot() # Get all ratios available in xml all_ratios = root.findall("frequency_ratio") # Make numpy arrays of all ratios # Developers note: Standard examples also contain unused "error_minus" and "error_plus" orders = np.array([ratio.get("order") for ratio in all_ratios], dtype=int) ratval = np.array([ratio.get("value") for ratio in all_ratios], dtype=float) types = np.array([ratio.get("type") for ratio in all_ratios], dtype="U3") errors = np.array([ratio.get("error") for ratio in all_ratios], dtype=float) # Make sorting mask for the desired ratio sequence if ratiotype == "r012": mask = np.where(np.logical_or(types == "r01", types == "r02"))[0] elif ratiotype == "r102": mask = np.where(np.logical_or(types == "r10", types == "r02"))[0] elif ratiotype == "r010": mask = np.where(np.logical_or(types == "r01", types == "r10"))[0] else: mask = np.where(types == ratiotype)[0] # Pack into data structure ratios = np.zeros((4, len(mask))) ratios[0, :] = ratval[mask] ratios[3, :] = orders[mask] ratios[2, :] = [int(r[1:]) for r in types[mask]] # Get frequency location from obs and obskey try: for i, nn in enumerate(ratios[3, :]): l0mask = obskey[0, :] == 0 ratios[1, i] = obs[0, l0mask][obskey[1, l0mask] == nn] except ValueError as e: wstr = "Could not find l=0, n={0:d} frequency to match {1}(n={0:d})!" raise KeyError(wstr.format(int(nn), types[mask][i])) from e # Sort n-before-l sorting = np.argsort(ratios[3, :] + 0.01 * ratios[2, :]) ratios = ratios[:, sorting] # Either read covariance matrix or assume uncorrelated if correlations: cov = _read_ratios_cov_xml(root, types[mask][sorting], orders[mask][sorting]) else: cov = np.diag(errors[mask][sorting]) return ratios, cov def _read_ratios_cov_xml(xmlroot, types, order): """ Read the precomputed covariance matrix of the read precomputed ratios. Parameters ---------- xmlroot : Element Element tree of xml-file. types : array Ratios types of all ratios to be read order : array Radial order of all ratios to be read Returns ------- cov : array Covariance matrix matching the ratios. """ # Empty matrix to fill out and format string to look for cov = np.zeros((len(types), len(types))) fstr = ( "frequency_ratio_corr[@type1='{0}'][@order1='{1}'][@type2='{2}'][@order2='{3}']" ) # Loop for each index in matrix and find it in xml for ind1, (type1, order1) in enumerate(zip(types, order)): for ind2, (type2, order2) in enumerate(zip(types, order)): try: element = xmlroot.findall(fstr.format(type1, order1, type2, order2))[0] except IndexError as e: wstr = ( "Could not find covariance between {0}(n={1}) and {2}(n={3}) in xml" ) raise KeyError(wstr.format(type1, order1, type2, order2)) from e cov[ind1, ind2] = float(element.find("covariance").get("value")) return cov def _make_obsfreqs(obskey, obs, obscov, allfits, freqplots, numax, debug=False): """ Make a dictionary of frequency-dependent data Parameters ---------- allfits : list Type of fits available for individual frequencies freqplots : list List of frequency-dependent fits obscov : array Covariance matrix of the individual frequencies obscovinv : array The inverse of the ovariance matrix of the individual frequencies debug : bool, optional Activate additional output for debugging (for developers) Returns ------- obsfreqdata : dict Requested frequency-dependent data such as glitches, ratios, and epsilon difference. It also contains the covariance matrix and its inverse of the individual frequency modes. The keys correspond to the science case, e.g. `r01a, `glitch`, or `e012`. Inside each case, you find the data (`data`), the covariance matrix (`cov`), and its inverse (`covinv`). obsfreqmeta : dict The requested information about which frequency products to fit or plot, unpacked for easier access later. """ obsfreqdata = {} obsfreqmeta = {} allfits = np.asarray(list(allfits)) allplots = np.asarray(list(freqplots)) fitratiotypes = [] plotratiotypes = [] fitglitchtypes = [] plotglitchtypes = [] fitepsdifftypes = [] plotepsdifftypes = [] getratios = False getglitch = False getepsdiff = False obscovinv = np.linalg.pinv(obscov, rcond=1e-8) obsls = np.unique(obskey[0, :]) # Large frequency separation from individual frequencies dnudata, dnudata_err = freq_fit.compute_dnu_wfit(obskey, obs, numax) for fit in allfits: obsfreqdata[fit] = {} # Look for ratios if fit in freqtypes.rtypes: getratios = True fitratiotypes.append(fit) if not "ratios" in obsfreqmeta.keys(): obsfreqmeta["ratios"] = {} # Look for glitches elif fit in freqtypes.glitches: getglitch = True fitglitchtypes.append(fit) if not "glitch" in obsfreqmeta.keys(): obsfreqmeta["glitch"] = {} # Look for epsdiff elif fit in freqtypes.epsdiff: getepsdiff = True fitepsdifftypes.append(fit) if not "epsdiff" in obsfreqmeta.keys(): obsfreqmeta["epsdiff"] = {} elif fit not in freqtypes.freqs: print(f"Fittype {fit} not recognised") raise ValueError obsfreqdata["freqs"] = { "cov": obscov, "covinv": obscovinv, "dnudata": dnudata, "dnudata_err": dnudata_err, } # If all frequency plots enabled, turn on defaults if len(freqplots) and freqplots[0] == True: getratios = True getepsdiff = True plotratiotypes = list(set(freqtypes.defaultrtypes) | set(fitratiotypes)) plotepsdifftypes = list(set(freqtypes.defaultepstypes) | set(fitepsdifftypes)) # Only turn on glitches if they are fitted (expensive) if getglitch: plotglitchtypes = list(set(fitglitchtypes)) elif len(freqplots): for plot in allplots: # Look for ratios if plot in ["ratios", *freqtypes.rtypes]: getratios = True if plot in [ "ratios", ]: for rtype in freqtypes.defaultrtypes: if rtype not in plotratiotypes: plotratiotypes.append(rtype) else: if plot not in plotratiotypes: plotratiotypes.append(plot) # Look for glitches if plot in freqtypes.glitches: getglitch = True if plot not in plotglitchtypes: plotglitchtypes.append(plot) # Look for epsdiff if plot in ["epsdiff", *freqtypes.epsdiff]: getepsdiff = True if plot in ["epsdiff"]: for etype in freqtypes.defaultepstypes: if etype not in plotepsdifftypes: plotepsdifftypes.append(etype) else: if plot not in plotepsdifftypes: plotepsdifftypes.append(plot) # Check that there is observational data available for fits and plots if getratios or getepsdiff: for fittype in set(fitratiotypes) | set(fitepsdifftypes) | set(fitglitchtypes): if not all(x in obsls.astype(str) for x in fittype if x.isdigit()): for l in fittype[1:]: if not l in obsls.astype(str): print(f"* No l={l} modes were found in the observations") print(f"* It is not possible to fit {fittype}") raise ValueError for fittype in ( set(plotratiotypes) | set(plotepsdifftypes) | set(plotglitchtypes) ): if not all(x in obsls.astype(str) for x in fittype if x.isdigit()): if debug: print(f"*BASTA {fittype} cannot be plotted") if fittype in plotratiotypes: plotratiotypes.remove(fittype) if fittype in plotepsdifftypes: plotepsdifftypes.remove(fittype) if fittype in plotglitchtypes: plotglitchtypes.remove(fittype) if getratios and ((len(fitratiotypes) == 0) & (len(plotratiotypes) == 0)): getratios = False if getepsdiff and ((len(fitepsdifftypes) == 0) & (len(plotepsdifftypes) == 0)): getepsdiff = False if getratios: obsfreqmeta["ratios"] = {} obsfreqmeta["ratios"]["fit"] = fitratiotypes obsfreqmeta["ratios"]["plot"] = plotratiotypes if getglitch: obsfreqmeta["glitch"] = {} obsfreqmeta["glitch"]["fit"] = fitglitchtypes obsfreqmeta["glitch"]["plot"] = plotglitchtypes if getepsdiff: obsfreqmeta["epsdiff"] = {} obsfreqmeta["epsdiff"]["fit"] = fitepsdifftypes obsfreqmeta["epsdiff"]["plot"] = plotepsdifftypes obsfreqmeta["getratios"] = getratios obsfreqmeta["getglitch"] = getglitch obsfreqmeta["getepsdiff"] = getepsdiff return obsfreqdata, obsfreqmeta
[docs] def read_allseismic( fitfreqs, freqplots, verbose=False, debug=False, ): """ Routine to all necesary data from individual frequencies for the desired fit Parameters ---------- fitfreqs : dict Contains all frequency related input needed for reading. freqplots : list List of frequency-dependent fits verbose : bool, optional If True, extra text will be printed to log (for developers). debug : bool, optional Activate additional output for debugging (for developers) Returns ------- obskey : array Array containing the angular degrees and radial orders of obs obs : array Individual frequencies and uncertainties. obsfreqdata : dict Requested frequency-dependent data such as glitches, ratios, and epsilon difference. It also contains the covariance matrix and its inverse of the individual frequency modes. The keys correspond to the science case, e.g. `r01`, `glitch`, or `e012`. Inside each case, you find the data (`data`), the covariance matrix (`cov`), and its inverse (`covinv`). obsfreqmeta : dict The requested information about which frequency products to fit or plot, unpacked for easier access later. dnudata : scalar Large frequency separation obtained by fitting the radial mode observed frequencies. Similar to dnufit, but from data and not from the theoretical frequencies in the grid of models. dnudata_err : scalar Uncertainty on dnudata. """ if "freqs" in fitfreqs["fittypes"] and fitfreqs["correlations"]: obskey, obs, obscov = read_freq( fitfreqs["freqfile"], excludemodes=fitfreqs["excludemodes"], onlyradial=fitfreqs["onlyradial"], covarfre=True, ) else: obskey, obs, obscov = read_freq( fitfreqs["freqfile"], excludemodes=fitfreqs["excludemodes"], onlyradial=fitfreqs["onlyradial"], covarfre=False, ) # Construct data and metadata dictionaries obsfreqdata, obsfreqmeta = _make_obsfreqs( obskey, obs, obscov, fitfreqs["fittypes"], freqplots, numax=fitfreqs["numax"], debug=debug, ) # Add large frequency separation bias (default is 0) if fitfreqs["dnubias"]: print( f"Added {fitfreqs['dnubias']}muHz bias/systematic to dnu error, from {obsfreqdata['freqs']['dnudata_err']:.3f}", end=" ", ) obsfreqdata["freqs"]["dnudata_err"] = np.sqrt( obsfreqdata["freqs"]["dnudata_err"] ** 2.0 + fitfreqs["dnubias"] ** 2.0 ) print(f"to {obsfreqdata['freqs']['dnudata_err']:.3f}") # Compute or dataread in required ratios if obsfreqmeta["getratios"]: if fitfreqs["readratios"]: # Read all requested ratio sequences for ratiotype in set(obsfreqmeta["ratios"]["fit"]) | set( obsfreqmeta["ratios"]["plot"] ): datos = _read_precomputed_ratios_xml( fitfreqs["freqfile"], ratiotype, obskey, obs, ratiotype, fitfreqs["excludemodes"], threepoint=fitfreqs["threepoint"], verbose=verbose, ) obsfreqdata[ratiotype] = {} obsfreqdata[ratiotype]["data"] = datos[0] obsfreqdata[ratiotype]["cov"] = datos[1] else: for ratiotype in set(obsfreqmeta["ratios"]["fit"]) | set( obsfreqmeta["ratios"]["plot"] ): obsfreqdata[ratiotype] = {} datos = freq_fit.compute_ratios( obskey, obs, ratiotype, threepoint=fitfreqs["threepoint"] ) if datos is not None: obsfreqdata[ratiotype]["data"] = datos[0] obsfreqdata[ratiotype]["cov"] = datos[1] elif ratiotype in obsfreqmeta["ratios"]["fit"]: # Fail raise ValueError( f"Fitting parameter {ratiotype} could not be computed." ) else: # Do not fail as much print(f"Ratio {ratiotype} could not be computed.") obsfreqdata[ratiotype]["data"] = None obsfreqdata[ratiotype]["cov"] = None obsfreqdata[ratiotype]["covinv"] = None # Get glitches if obsfreqmeta["getglitch"]: for glitchtype in set(obsfreqmeta["glitch"]["fit"]) | set( obsfreqmeta["glitch"]["plot"] ): obsfreqdata[glitchtype] = {} if fitfreqs["readglitchfile"]: datos = _read_precomputed_glitches(fitfreqs["glitchfile"], glitchtype) # Precomputed from glitchpy lacks the data structure, so sample once to obtain that obsseq = glitch_fit.compute_glitchseqs( obskey, obs, glitchtype, obsfreqdata["freqs"]["dnudata"], fitfreqs ) # Store data in new structure, overwrite old obsseq[0] = datos[0] datos = (obsseq, datos[1]) else: datos = glitch_fit.compute_observed_glitches( obskey, obs, glitchtype, obsfreqdata["freqs"]["dnudata"], fitfreqs, debug=debug, ) if datos is not None: obsfreqdata[glitchtype]["data"] = datos[0] obsfreqdata[glitchtype]["cov"] = datos[1] elif glitchtype in obsfreqmeta["glitches"]["fit"]: # Fail raise ValueError( f"Fitting parameter {glitchtype} could not be computed." ) else: # Do not fail as much print(f"Glitch type {glitchtype} could not be computed.") obsfreqdata[glitchtype]["data"] = None obsfreqdata[glitchtype]["cov"] = None obsfreqdata[glitchtype]["covinv"] = None # Get epsilon differences if obsfreqmeta["getepsdiff"]: for epsdifffit in set(obsfreqmeta["epsdiff"]["fit"]) | set( obsfreqmeta["epsdiff"]["plot"] ): obsfreqdata[epsdifffit] = {} if epsdifffit in obsfreqmeta["epsdiff"]["fit"]: datos = freq_fit.compute_epsilondiff( obskey, obs, obsfreqdata["freqs"]["dnudata"], sequence=epsdifffit, nsorting=fitfreqs["nsorting"], debug=debug, ) obsfreqdata[epsdifffit]["data"] = datos[0] obsfreqdata[epsdifffit]["cov"] = datos[1] elif epsdifffit in obsfreqmeta["epsdiff"]["plot"]: datos = freq_fit.compute_epsilondiff( obskey, obs, obsfreqdata["freqs"]["dnudata"], sequence=epsdifffit, nsorting=fitfreqs["nsorting"], nrealisations=2000, debug=debug, ) obsfreqdata[epsdifffit]["data"] = datos[0] obsfreqdata[epsdifffit]["cov"] = datos[1] # As the inverse covariance matrix is actually what is used, compute it once # Diagonalise covariance matrices if correlations is set to False for key in obsfreqdata.keys(): if not fitfreqs["correlations"]: obsfreqdata[key]["cov"] = ( np.identity(obsfreqdata[key]["cov"].shape[0]) * obsfreqdata[key]["cov"] ) if "covinv" not in obsfreqdata[key].keys(): obsfreqdata[key]["covinv"] = np.linalg.pinv( obsfreqdata[key]["cov"], rcond=1e-8 ) return obskey, obs, obsfreqdata, obsfreqmeta
############################################################## # Routines related to reading ascii-files and convert to xml # ##############################################################
[docs] def freqs_ascii_to_xml( directory, starid, freqsfile: str | None = None, covfile: str | None = None, ratiosfile: str | None = None, cov010file: str | None = None, cov02file: str | None = None, symmetric_errors=True, check_radial_orders=False, quiet=False, ): """ Creates frequency xml-file based on ascii-files. Parameters ---------- directory : str Absolute path of the location of the ascii-files. This is also the location where the xml-file will be created. The directory must contain the ascii-file with the extension '.fre' (containing frequencies), and may contain the ascii-files with the extensions '.cov' (frequency covariances), '.ratios', '.cov010', and '.cov02' (ratios and their covariances). starid : str id of the star. The ascii files must be named 'starid.xxx' and the generated xml-file will be named 'starid.xml' symmetric_errors : bool, optional If True, the ascii files are assumed to only include symmetric errors. Otherwise, asymmetric errors are assumed. Default is True. check_radial_orders : bool or float, optional If True, the routine will correct the radial order printed in the xml, based on the calculated epsilon value, with its own dnufit. If float, does the same, but uses the inputted float as dnufit. quiet : bool, optional Toggle to silence the output (useful for running batches) """ # (Potential) filepaths if freqsfile is None: freqsfile = os.path.join(directory, starid + ".fre") if covfile is None: covfile = os.path.join(directory, starid + ".cov") if ratiosfile is None: ratiosfile = os.path.join(directory, starid + ".ratios") if cov010file is None: cov010file = os.path.join(directory, starid + ".cov010") if cov02file is None: cov02file = os.path.join(directory, starid + ".cov02") # Flags for existence of ratios and covariances cov_flag, ratios_flag, cov010_flag, cov02_flag = 1, 1, 1, 1 ##################################### # Flags which are redundant for now # ##################################### cov01_flag, cov10_flag, cov012_flag, cov102_flag = 0, 0, 0, 0 # Make sure that the frequency file exists, and read the frequencies if os.path.exists(freqsfile): freqs = _read_freq_ascii(freqsfile, symmetric_errors) # If covariances are available, read them if os.path.exists(covfile): cov = _read_freq_cov_ascii(covfile) else: cov_flag = 0 else: raise RuntimeError("Frequency file not found") # Check the value of epsilon, to estimate if the radial orders # are correctly identified. Correct them, if user allows to. ncorrection = su.check_epsilon_of_freqs( freqs, starid, check_radial_orders, quiet=quiet ) if check_radial_orders: freqs["order"] += ncorrection if not quiet: print("The proposed correction has been implemented.\n") elif not quiet: print("No correction made.\n") # Look for ratios and their covariances, read if available if os.path.exists(ratiosfile): ratios = _read_ratios_ascii(ratiosfile, symmetric_errors) if os.path.exists(cov010file): cov010 = _read_ratios_cov_ascii(cov010file) else: cov010_flag = 0 if os.path.exists(cov02file): cov02 = _read_ratios_cov_ascii(cov02file) else: cov02_flag = 0 else: ratios_flag, cov010_flag, cov02_flag = 0, 0, 0 # Main xml element main = Element("frequencies", {"kic": starid}) flags = SubElement( main, "flags", { "cov": str(cov_flag), "ratios": str(ratios_flag), "cov010": str(cov010_flag), "cov02": str(cov02_flag), "cov01": str(cov01_flag), "cov10": str(cov10_flag), "cov012": str(cov012_flag), "cov102": str(cov102_flag), }, ) # If needed for covariance, store mode id if cov_flag: mode_ids = np.zeros((len(freqs)), dtype=[("f_id", "int"), ("modeid", "8U")]) # Frequency elements for i in range(len(freqs)): modeid = "mode" + str(i + 1) order = freqs["order"][i] degree = freqs["degree"][i] frequency = freqs["frequency"][i] error = freqs["error"][i] mode_element = SubElement(main, "mode", {"id": modeid}) SubElement(mode_element, "order", {"value": str(order)}) SubElement(mode_element, "degree", {"value": str(degree)}) SubElement( mode_element, "frequency", {"error": str(error), "unit": "uHz", "value": str(frequency)}, ) if cov_flag: mode_ids["f_id"][i] = freqs["order"][i] * 10 + freqs["degree"][i] mode_ids["modeid"][i] = modeid # Frequency correlation elements if cov_flag: # Prepare sorting arrays, order before degree fsort = [freqs["order"][i] * 10 + freqs["degree"][i] for i in range(len(freqs))] csort = np.array( [ [cov["n1"][i] * 10 + cov["l1"][i] for i in range(len(cov))], [cov["n2"][i] * 10 + cov["l2"][i] for i in range(len(cov))], ] ) # Double loop over covariance matrix row and columns for f1_id in fsort: # Get modeid and create subsection of full covariance list f1_modeid = mode_ids["modeid"][np.where(mode_ids["f_id"] == f1_id)[0]][0] f1_mask = np.where(csort[0] == f1_id)[0] covf1 = cov[f1_mask] covs1 = csort[1][f1_mask] for f2_id in fsort: f2_modeid = mode_ids["modeid"][np.where(mode_ids["f_id"] == f2_id)[0]][ 0 ] f2_index = np.where(covs1 == f2_id)[0][0] order1 = covf1["n1"][f2_index] order2 = covf1["n2"][f2_index] deg1 = covf1["l1"][f2_index] deg2 = covf1["l2"][f2_index] covariance = covf1["covariance"][f2_index] correlation = covf1["correlation"][f2_index] freq_corr_element = SubElement( main, "frequency_corr", { "id1": f1_modeid, "id2": f2_modeid, "order1": str(order1), "order2": str(order2), "degree1": str(deg1), "degree2": str(deg2), }, ) SubElement(freq_corr_element, "covariance", {"value": str(covariance)}) SubElement( freq_corr_element, "correlation", {"value": str(correlation)} ) # Ratio elements if ratios_flag: n02 = 0 for i in range(len(ratios)): ratioid = "ratio" + str(i + 1) order = ratios["n"][i] rtype = ratios["ratio_type"][i].decode("utf-8") ratio = ratios["value"][i] error = ratios["error"][i] # Count the number of r02 ratios if rtype == "r02": n02 += 1 # Note: setting error_minus = error_plus = error SubElement( main, "frequency_ratio", { "error": str(error), "error_minus": str(error), "error_plus": str(error), "id": ratioid, "order": str(order), "type": rtype, "value": str(ratio), }, ) if cov010_flag: # Number of r010 ratios n010 = len(ratios) - n02 # Ratio 010 correlation elements ratio1_id = n02 for i in range(len(cov010)): id2_index = np.mod(i, n010) if id2_index == 0: ratio1_id += 1 ratio2_id = n02 + id2_index + 1 order1 = cov010["n1"][i] order2 = cov010["n2"][i] rtype1 = cov010["ratio_type1"][i].decode("utf-8") rtype2 = cov010["ratio_type2"][i].decode("utf-8") covariance = cov010["covariance"][i] correlation = cov010["correlation"][i] for rtype in ("01", "10"): if rtype in rtype1: rtype1 = "r" + rtype if rtype in rtype2: rtype2 = "r" + rtype ratio_corr_element = SubElement( main, "frequency_ratio_corr", { "id1": "ratio" + str(ratio1_id), "id2": "ratio" + str(ratio2_id), "order1": str(order1), "order2": str(order2), "type1": rtype1, "type2": rtype2, }, ) SubElement(ratio_corr_element, "covariance", {"value": str(covariance)}) SubElement(ratio_corr_element, "correlation", {"value": str(correlation)}) if cov02_flag: # Ratio 02 correlation elements ratio1_id = 0 for i in range(len(cov02)): id2_index = np.mod(i, n02) if id2_index == 0: ratio1_id += 1 ratio2_id = id2_index + 1 order1 = cov02["n1"][i] order2 = cov02["n2"][i] rtype1 = "r02" rtype2 = "r02" covariance = cov02["covariance"][i] correlation = cov02["correlation"][i] ratio_corr_element = SubElement( main, "frequency_ratio_corr", { "id1": "ratio" + str(ratio1_id), "id2": "ratio" + str(ratio2_id), "order1": str(order1), "order2": str(order2), "type1": rtype1, "type2": rtype2, }, ) SubElement(ratio_corr_element, "covariance", {"value": str(covariance)}) SubElement(ratio_corr_element, "correlation", {"value": str(correlation)}) # Create xml string and make it pretty xml = tostring(main) reparsed = minidom.parseString(xml) pretty_xml = reparsed.toprettyxml() # Write output to file starid.xml with open(os.path.join(directory, starid + ".xml"), "w") as xmlfile: print(pretty_xml, file=xmlfile)
def _read_freq_ascii( filename: str, symmetric_errors: bool = True, ): """ Read individual frequencies from an ascii file Parameters ---------- filename : str Name of file to read symmetric_errors : bool, optional If True, assumes the ascii file to contain only one column with frequency errors. Otherwise, the file must include asymmetric errors (error_plus and error_minus). Default is True. Returns ------- freqs : array Contains the radial order, angular degree, frequency, uncertainty, and flag """ cols = [] data = np.genfromtxt(filename, dtype=None, encoding=None, names=True) rdict = { "frequency": { "dtype": "float", "recognisednames": [ "f", "freq", "freqs", "frequency", "modefrequency", ], }, "order": { "dtype": "int", "recognisednames": [ "n", "ns", "order", "radialorder", ], }, "degree": { "dtype": "int", "recognisednames": [ "l", "ls", "ell", "ells", "degree", "angulardegree", ], }, "error": { "dtype": "float", "recognisednames": [ "error", "err", "e", "uncertainty", "uncert", ], }, "error_plus": { "dtype": "float", "recognisednames": [ "error_plus", "err_plus", "error_upper", "upper_error", ], }, "error_minus": { "dtype": "float", "recognisednames": [ "error_minus", "err_minus", "error_lower", "lower_error", ], }, "flag": { "dtype": "int", "recognisednames": [ "flag", ], }, "frequency_mode": { "dtype": "float", "recognisednames": [ "frequency_mode", ], }, } for colname in data.dtype.names: param = [p for p in rdict if colname.lower() in rdict[p]["recognisednames"]] if not param: raise ValueError(f"BASTA cannot recognize {colname}") assert len(param) == 1, (colname, param) cols.append((param[0], rdict[param[0]]["dtype"])) sym = np.any([col[0] == "error" for col in cols]) asym = all( [ item in col[0] for col in cols for item in rdict["error_plus"]["recognisednames"] ] ) if not sym ^ asym: if sym | asym: raise ValueError( "BASTA found too many uncertainties, please specify which to use." ) else: raise ValueError("BASTA is missing frequency uncertainties.") if not symmetric_errors and not asym: raise ValueError( "BASTA is looking for asymmetric frequency uncertainties, but did not find them" ) freqs = np.genfromtxt( filename, dtype=cols, encoding=None, ) if not np.any(["order" in col[0] for col in cols]): print("BASTA did not find column with radial orders, they will be generated") freqcol = [ col for col in data.dtype.names if col in rdict["frequency"]["recognisednames"] ] lcol = [ col for col in data.dtype.names if col in rdict["degree"]["recognisednames"] ] assert len(freqcol) == 1, freqcol assert len(lcol) == 1, lcol mask = data[lcol[0]] == 0 l0s = data[freqcol[0]][mask] assert len(l0s) > 1, "More than 1 l=0 mode is required for estimation of dnu" dnu = np.median(np.diff(l0s)) ns = np.asarray(data[freqcol[0]]) // dnu - 1 from numpy.lib.recfunctions import append_fields freqs = append_fields(freqs, "order", data=ns.astype(int)) return freqs def _read_freq_cov_ascii(filename: str): """ Read covariance and correlations for individual frequencies from an ascii file Parameters ---------- filename : str Name of file to read Returns ------- cov : array Contains the angular degree, radial order, covariances and correlations between the individual frequencies """ cov = np.genfromtxt( filename, dtype=[ ("l1", "int"), ("n1", "int"), ("l2", "int"), ("n2", "int"), ("covariance", "float"), ("correlation", "float"), ], encoding=None, ) return cov def _read_ratios_ascii(filename, symmetric_errors=True): """ Read frequency ratios from an ascii file Parameters ---------- filename : str Name of file to read symmetric_errors : bool, optional If True, assumes the ascii file to contain only one column with frequency errors. Otherwise, the file must include asymmetric errors (error_plus and error_minus). Default is True. Returns ------- ratios : array Contains the frequency ratios type, radial order, value, uncertainty added in quadrature, upper uncertainty, and lower uncertainty """ if symmetric_errors: ratios = np.genfromtxt( filename, dtype=[ ("ratio_type", "S3"), ("n", "int"), ("value", "float"), ("error", "float"), ], encoding=None, ) else: ratios = np.genfromtxt( filename, dtype=[ ("ratio_type", "S3"), ("n", "int"), ("value", "float"), ("error", "float"), ("error_plus", "float"), ("error_minus", "float"), ], encoding=None, ) return ratios def _read_ratios_cov_ascii(filename): """ Read covariance and correlations for frequency ratios from an ascii file Parameters ---------- filename : str Name of file to read Returns ------- cov : array Contains the ratio type, radial order, covariances and correlations between the frequency ratios """ cov = np.genfromtxt( filename, dtype=[ ("ratio_type1", "S4"), ("n1", "int"), ("ratio_type2", "S4"), ("n2", "int"), ("covariance", "float"), ("correlation", "float"), ], encoding=None, ) return cov