diff --git a/src/pint/polycos.py b/src/pint/polycos.py index dd569a9cae..c608ab8252 100644 --- a/src/pint/polycos.py +++ b/src/pint/polycos.py @@ -33,13 +33,16 @@ http://tempo.sourceforge.net/ref_man_sections/tz-polyco.txt """ -from collections import OrderedDict - +from __future__ import annotations import astropy.table as table import astropy.units as u import numpy as np from astropy.io import registry from astropy.time import Time +from collections import OrderedDict +import pathlib +from typing import Optional, Callable + from loguru import logger as log try: @@ -94,17 +97,28 @@ class PolycoEntry: Middle point of the time span in mjd mjdspan : int Time span in minutes - rphase : float - Reference phase + rph_int : int + Integer part of reference phase + rph_frac : float + Fractional part of reference phase f0 : float Reference spin frequency ncoeff : int Number of coefficients - coeff : numpy.ndarray + coeffs : numpy.ndarray Polynomial coefficents """ - def __init__(self, tmid, mjdspan, rph_int, rph_frac, f0, ncoeff, coeffs): + def __init__( + self, + tmid: float, + mjdspan: int, + rph_int: int, + rph_frac: float, + f0: float, + ncoeff: int, + coeffs: np.ndarray, + ): self.tmid = data2longdouble(tmid) * u.day self.mjdspan = data2longdouble(mjdspan / MIN_PER_DAY) * u.day self.tstart = self.tmid - (self.mjdspan / 2) @@ -141,7 +155,7 @@ def __str__(self): + repr(self.coeffs) ) - def evalabsphase(self, t): + def evalabsphase(self, t: float | np.ndarray) -> Phase: """Return the phase at time t, computed with this polyco entry Parameters @@ -168,7 +182,7 @@ def evalabsphase(self, t): phase += self.rphase + Phase(dt * 60.0 * self.f0) return phase - def evalphase(self, t): + def evalphase(self, t: float | np.ndarray) -> Phase: """Return the phase at time t, computed with this polyco entry Parameters @@ -182,7 +196,7 @@ def evalphase(self, t): """ return self.evalabsphase(t).frac - def evalfreq(self, t): + def evalfreq(self, t: float | np.ndarray) -> np.ndarray: """Return the freq at time t, computed with this polyco entry Parameters @@ -201,7 +215,7 @@ def evalfreq(self, t): s += data2longdouble(i) * self.coeffs[i] * dt ** (i - 1) return self.f0 + s / 60.0 - def evalfreqderiv(self, t): + def evalfreqderiv(self, t: float | np.ndarray) -> np.ndarray: """Return the frequency derivative at time t. Parameters @@ -228,7 +242,7 @@ def evalfreqderiv(self, t): # Read polycos file data to table -def tempo_polyco_table_reader(filename): +def tempo_polyco_table_reader(filename: str) -> table.Table: """Read tempo style polyco file to an astropy table. Tempo style: The polynomial ephemerides are written to file 'polyco.dat'. @@ -356,7 +370,9 @@ def tempo_polyco_table_reader(filename): return table.Table(entries, meta={"name": "Polyco Data Table"}) -def tempo_polyco_table_writer(polycoTable, filename="polyco.dat"): +def tempo_polyco_table_writer( + polycoTable: table.Table, filename: str | pathlib.Path = "polyco.dat" +): """Write tempo style polyco file from an astropy table. Tempo style polyco file: @@ -487,7 +503,7 @@ class Polycos: polycoFormats = [] @classmethod - def _register(cls, formatlist=_polycoFormats): + def _register(cls, formatlist: list = _polycoFormats): """ Register the table built-in reading and writing format @@ -511,7 +527,9 @@ def _register(cls, formatlist=_polycoFormats): fmt["format"], "rw", fmt["read_method"], fmt["write_method"] ) - def __init__(self, filename=None, format="tempo"): + def __init__( + self, filename: Optional[str | pathlib.Path] = None, format: str = "tempo" + ): """Initialize polycos from a file Parameters @@ -542,7 +560,7 @@ def __init__(self, filename=None, format="tempo"): raise ValueError("Zero polycos found for table") @classmethod - def read(cls, filename, format="tempo"): + def read(cls, filename: str | pathlib.Path, format: str = "tempo") -> Polycos: """Read polyco file to a table. Parameters @@ -561,7 +579,11 @@ def read(cls, filename, format="tempo"): @classmethod def add_polyco_file_format( - cls, formatName, methodMood, readMethod=None, writeMethod=None + cls, + formatName: str, + methodMood: str, + readMethod: Optional[Callable] = None, + writeMethod: Optional[Callable] = None, ): """ Add a polyco file format and its reading/writing method to the class. @@ -632,7 +654,9 @@ def add_polyco_file_format( cls.polycoFormats.append(pFormat) - def read_polyco_file(self, filename, format="tempo"): + def read_polyco_file( + self, filename: str | pathlib.Path, format: str = "tempo" + ) -> Polycos: """Read polyco file to a table. Included for backward compatibility. It is better to use :meth:`pint.polycos.Polycos.read`. @@ -677,18 +701,18 @@ def __getitem__(self, item): @classmethod def generate_polycos( cls, - model, - mjdStart, - mjdEnd, - obs, - segLength, - ncoeff, - obsFreq, - maxha=12.0, - method="TEMPO", - numNodes=20, - progress=True, - ): + model: pint.models.TimingModel, + mjdStart: float | np.longdouble, + mjdEnd: float | np.longdouble, + obs: str, + segLength: int, + ncoeff: int, + obsFreq: float, + maxha: float = 12.0, + method: str = "TEMPO", + numNodes: int = 20, + progress: bool = True, + ) -> Polycos: """ Generate the polyco data. @@ -713,7 +737,7 @@ def generate_polycos( method : str, optional Method to generate polycos. Only the ``TEMPO`` method is supported for now. numNodes : int, optional - Number of nodes for fitting. It cannot be less then the number of + Number of nodes (sample points within each segment) for fitting. It cannot be less then the number of coefficents. Default: 20 progress : bool, optional Whether or not to show the progress bar during calculation @@ -752,35 +776,21 @@ def generate_polycos( obsFreq = float(obsFreq) # Using tempo1 method to create polycos - # If you want to disable the progress bar, add disable=True to the tqdm() call. for tmid in tqdm(tmids, disable=not progress): tStart = tmid - mjdSpan / 2 tStop = tmid + mjdSpan / 2 nodes = np.linspace(tStart, tStop, numNodes) + # midpoint TOA and reference phase for each segment toaMid = toa.get_TOAs_array( (np.modf(tmid)[1], np.modf(tmid)[0]), obs=obs, freqs=obsFreq, ephem=ephem, ) - # toaMid = toa.get_TOAs_list( - # [toa.TOA()], - # ) - refPhase = model.phase(toaMid, abs_phase=True) # Create node toas(Time sample using TOA class) - # toaList = [ - # toa.TOA( - # (np.modf(toaNode)[1], np.modf(toaNode)[0]), - # obs=obs, - # freq=obsFreq, - # ) - # for toaNode in nodes - # ] - - # toas = toa.get_TOAs_list(toaList, ephem=ephem) toas = toa.get_TOAs_array( (np.modf(nodes)[0], np.modf(nodes)[1]), obs=obs, @@ -794,13 +804,18 @@ def generate_polycos( rdcPhase = rdcPhase.int - (dt * model.F0.value * 60.0) + rdcPhase.frac dtd = dt.astype(float) # Truncate to double rdcPhased = rdcPhase.astype(float) - coeffs = np.polyfit(dtd, rdcPhased, ncoeff - 1)[::-1] + rawcoeffs = np.polyfit(dtd, rdcPhased, ncoeff - 1) + coeffs = rawcoeffs[::-1] date, hms = Time(tmid, format="mjd", scale="utc").iso.split() yy, mm, dd = date.split("-") date = f"{dd}-{MONTHS[int(mm) - 1]}-{yy[-2:]}" hms = float(hms.replace(":", "")) + ssbfreq = model.barycentric_radio_freq(toaMid) + doppler = float( + 1e4 * ((ssbfreq - toaMid["freq"]) / toaMid["freq"]).decompose() + ) entry = PolycoEntry( tmid, segLength, @@ -810,6 +825,8 @@ def generate_polycos( ncoeff, coeffs, ) + ph_polyco = np.polyval(rawcoeffs, dtd) + rms = np.sqrt(((ph_polyco - rdcPhased) ** 2).sum() / (len(dtd) - 1)) entry_dict = OrderedDict() entry_dict["psr"] = model.PSR.value @@ -817,8 +834,9 @@ def generate_polycos( entry_dict["utc"] = hms entry_dict["tmid"] = tmid entry_dict["dm"] = model.DM.value - entry_dict["doppler"] = 0.0 - entry_dict["logrms"] = 0.0 + entry_dict["doppler"] = doppler + # truncate so write/read work + entry_dict["logrms"] = max(np.log10(rms), -9.999) entry_dict["mjd_span"] = segLength entry_dict["t_start"] = entry.tstart entry_dict["t_stop"] = entry.tstop @@ -841,7 +859,7 @@ def generate_polycos( raise ValueError("Zero polycos found for table") return out - def write_polyco_file(self, filename="polyco.dat", format="tempo"): + def write_polyco_file(self, filename: str = "polyco.dat", format: str = "tempo"): """Write Polyco table to a file. Parameters @@ -863,7 +881,7 @@ def write_polyco_file(self, filename="polyco.dat", format="tempo"): self.polycoTable.write(filename, format=format) - def find_entry(self, t): + def find_entry(self, t: float | np.ndarray) -> np.ndarray: """Find the right entry for the input time. Parameters @@ -897,7 +915,7 @@ def find_entry(self, t): raise ValueError("Some input times not covered by Polyco entries.") return start_idx - def eval_phase(self, t): + def eval_phase(self, t: float | np.ndarray) -> np.ndarray: """Polyco evaluation of fractional phase Parameters @@ -918,7 +936,7 @@ def eval_phase(self, t): t = np.array([t]) return self.eval_abs_phase(t).frac - def eval_abs_phase(self, t): + def eval_abs_phase(self, t: float | np.ndarray) -> Phase: """ Polyco evaluate absolute phase for a time array. @@ -963,7 +981,7 @@ def eval_abs_phase(self, t): phaseFrac = np.hstack(phaseFrac).value return Phase(phaseInt, phaseFrac) - def eval_spin_freq(self, t): + def eval_spin_freq(self, t: float | np.ndarray) -> np.ndarray: """ Polyco evaluate spin frequency for a time array. @@ -998,7 +1016,7 @@ def eval_spin_freq(self, t): return spinFreq - def eval_spin_freq_derivative(self, t): + def eval_spin_freq_derivative(self, t: float | np.ndarray) -> np.ndarray: """ Polyco evaluate spin frequency derivative for a time array.