From 4a078a608a13122c4dead72af17eb61055f84911 Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Tue, 29 Jul 2025 11:29:14 +0200 Subject: [PATCH 1/6] created files to discover antex files and for the tests --- .../antex/antex_file_discovery.py | 143 ++++++++++++++++++ .../datasets/test_antex_file_discovery.py | 62 ++++++++ 2 files changed, 205 insertions(+) create mode 100644 src/prx/precise_corrections/antex/antex_file_discovery.py create mode 100644 src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py diff --git a/src/prx/precise_corrections/antex/antex_file_discovery.py b/src/prx/precise_corrections/antex/antex_file_discovery.py new file mode 100644 index 00000000..056df17c --- /dev/null +++ b/src/prx/precise_corrections/antex/antex_file_discovery.py @@ -0,0 +1,143 @@ +import argparse +import logging +from ftplib import FTP +import os +from pathlib import Path +import re + +import georinex +import pandas as pd +import urllib +import fnmatch + +from prx import converters +from prx import util +import prx +from prx.util import timestamp_to_mid_day + +log = util.get_logger(__name__) + +atx_filename = f"igs*.atx" + +def list_all_atx_files(): + server = 'gssc.esa.int' + folder = '/igs/station/general' + with FTP(server) as ftp: + ftp.login() + ftp.cwd(folder) + files = ftp.nlst() + return [f for f in files if f.lower().endswith('.atx')] + +def date_to_gps_week(date:pd.Timestamp): + """ + Convert a Timestamp to GPS week + + GPS week starts on 01/06/1980 (sunday) + """ + gps_epoch = pd.Timestamp("1980-01-06T00:00:00Z") + delta = date.tz_localize("UTC") - gps_epoch + gps_week = delta.days // 7 + return gps_week + +def extract_gps_week(filename: str) -> int: + """ + Extract GPS week from filenames like 'igsxx_WWWW.atx'. + """ + match = re.search(r"igs...(\d{4})\.atx", filename) + if match: + return int(match.group(1)) + return -1 + +def atx_file_database_folder(): + """ + Returns the path to the folder where ATX database files are stored. + """ + db_folder = util.prx_repository_root() / "src/prx/precise_corrections/antex/atx_files" + db_folder.mkdir(exist_ok=True) + return db_folder + +def get_local_atx(gps_week : int, db_folder=atx_file_database_folder()): + candidates = list( + db_folder.glob(atx_filename) + ) + candidates = [ + c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 + ] + if len(candidates) == 0: + return None + if len(candidates) > 1: + candidates = sorted( + candidates, + key=lambda c: extract_gps_week(c), + reverse=True + ) + log.warning( + f"Found more than one ANTEX file for {gps_week}: \n{candidates} \n Will use the first one." + ) + log.info(f"Found the ANTEX local file : {candidates[0]}") + return candidates[0] + +def list_ftp_directory(server, folder): + ftp = FTP(server) + ftp.login() + ftp.cwd(folder) + dir_list = [] + ftp.dir(dir_list.append) + return [c.split()[-1].strip() for c in dir_list] + +def try_downloading_atx_ftp(gps_week : int, folder : Path): + server = 'gssc.esa.int' + remote_folder = f"/igs/station/general" + candidates = list_ftp_directory(server, remote_folder) + candidates = [ + c + for c in candidates + if fnmatch.fnmatch(c, atx_filename) + ] + candidates = [ + c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 + ] + if len(candidates) == 0: + log.warning(f"Could not find ANTEX file for {gps_week}") + return None + + candidates = sorted( + candidates, + key=lambda c: extract_gps_week(c), + reverse=True + ) + file = candidates[0] + ftp_file = f"ftp://{server}/{remote_folder}/{file}" + local_file = folder / file + urllib.request.urlretrieve(ftp_file, local_file) + if not local_file.exists(): + log.warning(f"Could not download {ftp_file}") + return None + log.info(f"Downloaded ANTEX file {ftp_file}") + return local_file + +def get_atx_file(date: pd.Timestamp, db_folder = atx_file_database_folder()): + gps_week = date_to_gps_week(date) + atx_file = None + while atx_file is None: + atx_file = get_local_atx(gps_week, db_folder) + if atx_file is None : + atx_file = try_downloading_atx_ftp(gps_week, db_folder) + if atx_file is not None : + return atx_file + gps_week -=1 + return None + +def discover_or_download_atx_file(observation_file_path=Path()): + """ + Returns the path to a valid antes file (local or downloaded) corresponding to the observation file. + """ + log.info(f"Finding auxiliary files for {observation_file_path} ...") + rinex_3_obs_file = converters.anything_to_rinex_3(observation_file_path) + header = georinex.rinexheader(rinex_3_obs_file) + + t_start = timestamp_to_mid_day(util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) + - pd.Timedelta(200, unit="milliseconds")) + + atx_file = get_atx_file(t_start) + return atx_file \ No newline at end of file diff --git a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py new file mode 100644 index 00000000..621eb0a5 --- /dev/null +++ b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py @@ -0,0 +1,62 @@ + +import os +from pathlib import Path +import shutil +import georinex +import pandas as pd +import pytest +from unittest.mock import patch + +from prx import util +from prx.precise_corrections.antex import antex_file_discovery as atx + + +@pytest.fixture +def set_up_test(): + test_directory = Path(f"./tmp_test_directory_{__name__}").resolve() + if test_directory.exists(): + # Make sure the expected files has not been generated before and is still on disk due to e.g. a previous + # test run having crashed: + shutil.rmtree(test_directory) + os.makedirs(test_directory) + test_obs_file = test_directory.joinpath("TLSE00FRA_R_20230010100_10S_01S_MO.crx.gz") + # local_database = atx.atx_file_folder(pd.Timestamp('2023-01-01'), Path('src/prx/precise_corrections/atx/test/datasets')) + + shutil.copy( + util.prx_repository_root() + / f"src/prx/test/datasets/TLSE_2023001/{test_obs_file.name}", + test_obs_file, + ) + assert test_obs_file.exists() + + yield {"test_obs_file": test_obs_file} + shutil.rmtree(test_directory) + +def test_get_atx_file(set_up_test): + """ + Assert that prx will download the latest ATX file available if it is not present locally. + """ + obs_file = set_up_test["test_obs_file"] + header = georinex.rinexheader(obs_file) + t_start = util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) - pd.Timedelta(200, unit="milliseconds") + + db_folder = set_up_test["test_obs_file"].parent + downloaded_atx = atx.get_atx_file(t_start, db_folder) + + assert downloaded_atx is not None + assert downloaded_atx.exists() + +def test_fallback_to_latest_existing_atx(set_up_test): + """ + This test checks that if no ANTEX file is available for the target GPS week (e.g. the week is too recent), + the latest available ANTEX file from a previous GPS week is used instead. + """ + + db_folder = set_up_test["test_obs_file"].parent + now = pd.Timestamp.now() + gps_week_future = atx.date_to_gps_week(now) + 10 + atx_filename = f'igs*{str(gps_week_future)}.atx' + with patch('prx.precise_corrections.antex.antex_file_discovery.atx_filename', atx_filename): + downloaded_atx = atx.get_atx_file(now, db_folder) + + \ No newline at end of file From b86ec99f343bd7600cb78c2019cdf87fa0443eb6 Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Tue, 29 Jul 2025 16:44:23 +0200 Subject: [PATCH 2/6] Add tests for ANTEX file download logic based on GPS week --- .../antex/antex_file_discovery.py | 97 +++++++++++-------- .../datasets/test_antex_file_discovery.py | 92 +++++++++++++++--- 2 files changed, 137 insertions(+), 52 deletions(-) diff --git a/src/prx/precise_corrections/antex/antex_file_discovery.py b/src/prx/precise_corrections/antex/antex_file_discovery.py index 056df17c..f5280ab1 100644 --- a/src/prx/precise_corrections/antex/antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/antex_file_discovery.py @@ -17,16 +17,7 @@ log = util.get_logger(__name__) -atx_filename = f"igs*.atx" - -def list_all_atx_files(): - server = 'gssc.esa.int' - folder = '/igs/station/general' - with FTP(server) as ftp: - ftp.login() - ftp.cwd(folder) - files = ftp.nlst() - return [f for f in files if f.lower().endswith('.atx')] +atx_filename = f"igs[0-9][0-9]_????.atx" def date_to_gps_week(date:pd.Timestamp): """ @@ -56,26 +47,34 @@ def atx_file_database_folder(): db_folder.mkdir(exist_ok=True) return db_folder -def get_local_atx(gps_week : int, db_folder=atx_file_database_folder()): +def find_latest_local_antex_file(db_folder = atx_file_database_folder()): candidates = list( db_folder.glob(atx_filename) ) - candidates = [ - c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 - ] - if len(candidates) == 0: + if not candidates : return None - if len(candidates) > 1: - candidates = sorted( - candidates, - key=lambda c: extract_gps_week(c), - reverse=True - ) - log.warning( - f"Found more than one ANTEX file for {gps_week}: \n{candidates} \n Will use the first one." - ) - log.info(f"Found the ANTEX local file : {candidates[0]}") - return candidates[0] + return max(candidates, key=lambda c: extract_gps_week(c)) + +# def get_local_atx(gps_week : int, db_folder=atx_file_database_folder()): +# candidates = list( +# db_folder.glob(atx_filename) +# ) +# candidates = [ +# c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 +# ] +# if len(candidates) == 0: +# return None +# if len(candidates) > 1: +# candidates = sorted( +# candidates, +# key=lambda c: extract_gps_week(c), +# reverse=True +# ) +# log.warning( +# f"Found more than one ANTEX file for {gps_week}: \n{candidates} \n Will use the first one." +# ) +# log.info(f"Found the ANTEX local file : {candidates[0]}") +# return candidates[0] def list_ftp_directory(server, folder): ftp = FTP(server) @@ -85,26 +84,38 @@ def list_ftp_directory(server, folder): ftp.dir(dir_list.append) return [c.split()[-1].strip() for c in dir_list] -def try_downloading_atx_ftp(gps_week : int, folder : Path): +def fetch_latest_remote_antex_file(): + """ + List the ANTEX files available online and returns the latest + """ server = 'gssc.esa.int' remote_folder = f"/igs/station/general" candidates = list_ftp_directory(server, remote_folder) + candidates = list_ftp_directory(server, remote_folder) candidates = [ c for c in candidates if fnmatch.fnmatch(c, atx_filename) ] + if not candidates : + return None + return max(candidates, key=lambda c:extract_gps_week(c)) + +def try_downloading_atx_ftp(remote_file : str, folder : Path): + server = 'gssc.esa.int' + remote_folder = f"/igs/station/general" + candidates = list_ftp_directory(server, remote_folder) candidates = [ - c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 + c + for c in candidates + if fnmatch.fnmatch(c, remote_file) ] if len(candidates) == 0: - log.warning(f"Could not find ANTEX file for {gps_week}") + log.warning(f"Could not find ANTEX file named {remote_file}") return None - - candidates = sorted( - candidates, - key=lambda c: extract_gps_week(c), - reverse=True + if len(candidates) > 1 : + log.warning( + f"Found more than one ANTEX file named {remote_file}: \n{candidates} \n Will use the first one." ) file = candidates[0] ftp_file = f"ftp://{server}/{remote_folder}/{file}" @@ -120,13 +131,21 @@ def get_atx_file(date: pd.Timestamp, db_folder = atx_file_database_folder()): gps_week = date_to_gps_week(date) atx_file = None while atx_file is None: - atx_file = get_local_atx(gps_week, db_folder) - if atx_file is None : - atx_file = try_downloading_atx_ftp(gps_week, db_folder) + latest_atx_local = find_latest_local_antex_file(db_folder) + latest_atx_remote = fetch_latest_remote_antex_file() + if latest_atx_remote == latest_atx_local : + return latest_atx_local + elif latest_atx_remote and (not latest_atx_local or extract_gps_week(latest_atx_remote)> extract_gps_week(latest_atx_local)): + # Download the latest file + atx_file = try_downloading_atx_ftp(latest_atx_remote, db_folder) if atx_file is not None : + if gps_week > extract_gps_week(atx_file.name) : + log.warning( + f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." + ) return atx_file - gps_week -=1 - return None + + raise FileNotFoundError("No file ANTEX found locally or online.") def discover_or_download_atx_file(observation_file_path=Path()): """ diff --git a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py index 621eb0a5..e6121d69 100644 --- a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py @@ -6,6 +6,7 @@ import pandas as pd import pytest from unittest.mock import patch +from types import SimpleNamespace from prx import util from prx.precise_corrections.antex import antex_file_discovery as atx @@ -32,9 +33,25 @@ def set_up_test(): yield {"test_obs_file": test_obs_file} shutil.rmtree(test_directory) -def test_get_atx_file(set_up_test): +def test_extract_gps_week(): + filenames = [ + 'igs20_2134.atx', + 'igs_10.atx', + 'igs14_abc.atx', + ] + expected_returns = [2134, -1, -1] + gps_week = [atx.extract_gps_week(f) for f in filenames] + assert gps_week == expected_returns + +def test_download_if_not_local(set_up_test): """ - Assert that prx will download the latest ATX file available if it is not present locally. + Tests that the ANTEX file is downloaded when no local file is available, + but a valid remote file exists. + + Scenario : + - No local ANTEX file available + - A remote ANTEX file exists + -> The function should trigger the download of the remote file. """ obs_file = set_up_test["test_obs_file"] header = georinex.rinexheader(obs_file) @@ -43,20 +60,69 @@ def test_get_atx_file(set_up_test): db_folder = set_up_test["test_obs_file"].parent downloaded_atx = atx.get_atx_file(t_start, db_folder) + # Ensure the file was downloaded and exists assert downloaded_atx is not None assert downloaded_atx.exists() -def test_fallback_to_latest_existing_atx(set_up_test): - """ - This test checks that if no ANTEX file is available for the target GPS week (e.g. the week is too recent), - the latest available ANTEX file from a previous GPS week is used instead. - """ +def test_download_if_remote_is_newer(set_up_test): + ''' + Tests that the ANTEX file is downloaded when the remote file has a newer GPS week + than the local one. + + Scenario : + - local file = igs20_2370.atx + - remote file = igs20_2375.atx + -> The function should download igs20_2375.atx + ''' + latest_local = SimpleNamespace(name="igs20_2370.atx") + latest_remote = SimpleNamespace(name="igs20_2375.atx") + date = pd.Timestamp('2025-01-01 12:00:00') + db_folder = set_up_test["test_obs_file"].parent + + # Mock the dependencies : download, local discovery, and remote fetch + # The first patch mock the function `try_downloading_atx_ftp` in order to create a fake downloaded file ('igs20_2375.atx') + # so that the files are not downloaded each time the tests are run + # The second and third patch are mocking the functions `find_latest_local_antex_file` and `fetch_latest_remote_antex_file` + # to be in the scenario where the latest local file is 'igs20_2370.atx', and the remote one is 'igs20_2375.atx' + with ( + patch("prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp") as mock_download, \ + patch('prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', return_value=latest_local.name), \ + patch('prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', return_value=latest_remote.name), \ + ): + mock_download.return_value = Path("fake/path/igs20_2375.atx") + + result = atx.get_atx_file(date,db_folder) + + # Ensure `try_downloading_atx_ftp` was triggered with correct arguments + mock_download.assert_called_once_with(latest_remote.name, db_folder) + assert result.name == "igs20_2375.atx" + +def test_skip_download_if_same_week(set_up_test): + ''' + Tests that the ANTEX file is not downloaded when the remote and local files + are from the same GPS week. + Scenario: + - local file = igs20_2375.atx + - remote file = igs20_2375.atx + → The function should skip download and return the local file + ''' + latest_local = SimpleNamespace(name="igs20_2375.atx") + latest_remote = SimpleNamespace(name="igs20_2375.atx") + date = pd.Timestamp('2025-01-01 12:00:00') db_folder = set_up_test["test_obs_file"].parent - now = pd.Timestamp.now() - gps_week_future = atx.date_to_gps_week(now) + 10 - atx_filename = f'igs*{str(gps_week_future)}.atx' - with patch('prx.precise_corrections.antex.antex_file_discovery.atx_filename', atx_filename): - downloaded_atx = atx.get_atx_file(now, db_folder) + # Mock the dependencies : download, local discovery, and remote fetch + # The first patch mock the function `try_downloading_atx_ftp` in order to check if it was used by `get_atx_file` + # The second and third patch are mocking the functions `find_latest_local_antex_file` and `fetch_latest_remote_antex_file` + # to be in the scenario where the latest local and remote files are both 'igs20_2375.atx'. + with ( + patch("prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp") as mock_download, \ + patch('prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', return_value=latest_local.name), \ + patch('prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', return_value=latest_remote.name), \ + ): + result = atx.get_atx_file(date,db_folder) - \ No newline at end of file + # Ensure `try_downloading_atx_ftp` was not triggered. + mock_download.assert_not_called() + # Ensure the returned file is the local one + assert result == latest_local.name From 4655518f717f8e978a8d2dba8940cb4077d291a1 Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Wed, 30 Jul 2025 14:58:20 +0200 Subject: [PATCH 3/6] Add functions `ecef_2_satellite` and `compute_sun_ecef_position` --- src/prx/util.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/prx/util.py b/src/prx/util.py index c87f4458..9345fec2 100644 --- a/src/prx/util.py +++ b/src/prx/util.py @@ -15,6 +15,8 @@ from imohash import imohash from astropy.utils import iers from astropy import time as astrotime +from astropy.coordinates import get_sun, ITRS +import astropy.units from prx import constants from prx.rinex_obs.parser import parse as prx_obs_parse @@ -441,6 +443,31 @@ def ecef_2_geodetic(pos_ecef): ) return [latitude_rad, longitude_rad, altitude_m] +def ecef_2_satellite(pos_ecef: np.array, pos_sat_ecef: np.array, epoch: np.array): + """ + Converts an ecef position to the satellite-fixed coordinate frame + + pos_sat_ecef: shape (n,3) + pos_ecef: shape (n,3) + epoch: shape (n,) + """ + k = -pos_sat_ecef / np.linalg.norm(pos_sat_ecef, axis=1).reshape(-1, 1) + pos_sun_ecef = compute_sun_ecef_position(epoch).T + unit_vector_sun_ecef = (pos_sun_ecef - pos_sat_ecef) / np.linalg.norm( + pos_sun_ecef - pos_sat_ecef, axis=1 + ).reshape(-1, 1) + j = np.cross(k, unit_vector_sun_ecef) + j = j / np.linalg.norm(j, axis=1).reshape(-1, 1) + i = np.cross(j, k) + + rot_mat_ecef2sat = np.stack([i, j, k], axis=1) + pos_sat_frame = np.stack( + [ + rot_mat_ecef2sat[i, :, :] @ (pos_ecef[i, :] - pos_sat_ecef[i, :]) + for i in range(pos_sat_ecef.shape[0]) + ] + ) + return pos_sat_frame, rot_mat_ecef2sat def obs_dataset_to_obs_dataframe(ds: xarray.Dataset): # Flatten the xarray DataSet into a pandas DataFrame: @@ -518,3 +545,12 @@ def compute_gps_utc_leap_seconds(yyyy: int, doy: int): break assert ~np.isnan(ls), "GPS leap second could not be retrieved" return ls + +def compute_sun_ecef_position(epochs: np.array) -> np.array: + """ + Compute the Sun's ECEF position using the Astropy library + """ + time = astrotime.Time(epochs, scale="utc") + sun_gcrs = get_sun(time) + sun_ecef = sun_gcrs.transform_to(ITRS(obstime=time)) + return sun_ecef.cartesian.xyz.to(astropy.units.meter).value \ No newline at end of file From ac18a28a90ca970e5752092da74f569ee1147cff Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Fri, 1 Aug 2025 14:50:53 +0200 Subject: [PATCH 4/6] Completed test coverage and refactored code --- .../antex/antex_file_discovery.py | 103 ++++++++---------- .../datasets/test_antex_file_discovery.py | 68 ++++++------ 2 files changed, 83 insertions(+), 88 deletions(-) diff --git a/src/prx/precise_corrections/antex/antex_file_discovery.py b/src/prx/precise_corrections/antex/antex_file_discovery.py index f5280ab1..e77d765f 100644 --- a/src/prx/precise_corrections/antex/antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/antex_file_discovery.py @@ -1,7 +1,5 @@ -import argparse -import logging from ftplib import FTP -import os +import ftplib from pathlib import Path import re @@ -12,12 +10,22 @@ from prx import converters from prx import util -import prx from prx.util import timestamp_to_mid_day +""" +Logic overview: +- The code first extracts the GPS week from the provided RINEX observation file. +- It then identifies the most recent ANTEX file available both locally and remotely. +- If both local and remote ANTEX files are the same, the local version is reused, as the local database is considered up to date. +- Otherwise, the latest remote file is downloaded and used. +- Note: If the most recent ANTEX file has a GPS week that is earlier than the observation file's GPS week +(e.g., the remote database has not yet been updated), a warning is displayed to indicate that the most recent ANTEX file +was still used despite being older than the observation file. +""" + log = util.get_logger(__name__) -atx_filename = f"igs[0-9][0-9]_????.atx" +atx_filename = f"igs20_????.atx" def date_to_gps_week(date:pd.Timestamp): """ @@ -55,27 +63,6 @@ def find_latest_local_antex_file(db_folder = atx_file_database_folder()): return None return max(candidates, key=lambda c: extract_gps_week(c)) -# def get_local_atx(gps_week : int, db_folder=atx_file_database_folder()): -# candidates = list( -# db_folder.glob(atx_filename) -# ) -# candidates = [ -# c for c in candidates if len(c) == 14 and extract_gps_week(c) != -1 -# ] -# if len(candidates) == 0: -# return None -# if len(candidates) > 1: -# candidates = sorted( -# candidates, -# key=lambda c: extract_gps_week(c), -# reverse=True -# ) -# log.warning( -# f"Found more than one ANTEX file for {gps_week}: \n{candidates} \n Will use the first one." -# ) -# log.info(f"Found the ANTEX local file : {candidates[0]}") -# return candidates[0] - def list_ftp_directory(server, folder): ftp = FTP(server) ftp.login() @@ -91,7 +78,6 @@ def fetch_latest_remote_antex_file(): server = 'gssc.esa.int' remote_folder = f"/igs/station/general" candidates = list_ftp_directory(server, remote_folder) - candidates = list_ftp_directory(server, remote_folder) candidates = [ c for c in candidates @@ -101,23 +87,28 @@ def fetch_latest_remote_antex_file(): return None return max(candidates, key=lambda c:extract_gps_week(c)) -def try_downloading_atx_ftp(remote_file : str, folder : Path): - server = 'gssc.esa.int' +def check_online_availability(file: str, folder: Path) -> Path | None: + """ + Need to keep the same inputs as try_downloading_sp3_ftp, in order to be able to use `unittest.mock.patch` in tests + """ + server = "gssc.esa.int" remote_folder = f"/igs/station/general" - candidates = list_ftp_directory(server, remote_folder) - candidates = [ - c - for c in candidates - if fnmatch.fnmatch(c, remote_file) - ] - if len(candidates) == 0: - log.warning(f"Could not find ANTEX file named {remote_file}") + ftp = FTP(server) + ftp.login() + ftp.cwd(remote_folder) + try: + ftp.size(file) + return folder.joinpath(file) + except ftplib.error_perm: + log.warning(f"{file} not available on {server}") return None - if len(candidates) > 1 : - log.warning( - f"Found more than one ANTEX file named {remote_file}: \n{candidates} \n Will use the first one." - ) - file = candidates[0] + +def try_downloading_atx_ftp(file : str, folder : Path): + """ + Download the wanted remote file + """ + server = 'gssc.esa.int' + remote_folder = f"/igs/station/general" ftp_file = f"ftp://{server}/{remote_folder}/{file}" local_file = folder / file urllib.request.urlretrieve(ftp_file, local_file) @@ -129,21 +120,19 @@ def try_downloading_atx_ftp(remote_file : str, folder : Path): def get_atx_file(date: pd.Timestamp, db_folder = atx_file_database_folder()): gps_week = date_to_gps_week(date) - atx_file = None - while atx_file is None: - latest_atx_local = find_latest_local_antex_file(db_folder) - latest_atx_remote = fetch_latest_remote_antex_file() - if latest_atx_remote == latest_atx_local : - return latest_atx_local - elif latest_atx_remote and (not latest_atx_local or extract_gps_week(latest_atx_remote)> extract_gps_week(latest_atx_local)): - # Download the latest file - atx_file = try_downloading_atx_ftp(latest_atx_remote, db_folder) - if atx_file is not None : - if gps_week > extract_gps_week(atx_file.name) : - log.warning( - f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." - ) - return atx_file + latest_atx_local = find_latest_local_antex_file(db_folder) + latest_atx_remote = fetch_latest_remote_antex_file() + if latest_atx_remote == latest_atx_local : + return latest_atx_local + elif latest_atx_remote and (not latest_atx_local or extract_gps_week(latest_atx_remote)> extract_gps_week(latest_atx_local)): + # Download the latest file + atx_file = try_downloading_atx_ftp(latest_atx_remote, db_folder) + if atx_file is not None : + if gps_week > extract_gps_week(atx_file.name) : + log.warning( + f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." + ) + return atx_file raise FileNotFoundError("No file ANTEX found locally or online.") diff --git a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py index e6121d69..d9fc92ef 100644 --- a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py @@ -1,4 +1,3 @@ - import os from pathlib import Path import shutil @@ -6,8 +5,8 @@ import pandas as pd import pytest from unittest.mock import patch -from types import SimpleNamespace +import prx from prx import util from prx.precise_corrections.antex import antex_file_discovery as atx @@ -74,28 +73,29 @@ def test_download_if_remote_is_newer(set_up_test): - remote file = igs20_2375.atx -> The function should download igs20_2375.atx ''' - latest_local = SimpleNamespace(name="igs20_2370.atx") - latest_remote = SimpleNamespace(name="igs20_2375.atx") + latest_local = "igs20_2370.atx" + latest_remote = "igs20_2375.atx" date = pd.Timestamp('2025-01-01 12:00:00') db_folder = set_up_test["test_obs_file"].parent - - # Mock the dependencies : download, local discovery, and remote fetch - # The first patch mock the function `try_downloading_atx_ftp` in order to create a fake downloaded file ('igs20_2375.atx') - # so that the files are not downloaded each time the tests are run - # The second and third patch are mocking the functions `find_latest_local_antex_file` and `fetch_latest_remote_antex_file` - # to be in the scenario where the latest local file is 'igs20_2370.atx', and the remote one is 'igs20_2375.atx' + with ( - patch("prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp") as mock_download, \ - patch('prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', return_value=latest_local.name), \ - patch('prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', return_value=latest_remote.name), \ + patch( # replace download function by online availability check + "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp", + new=prx.precise_corrections.antex.antex_file_discovery.check_online_availability + ), + patch( # simulate that the latest local file found is 'igs20_2370.atx' + 'prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', + return_value=latest_local + ), + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + 'prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', + return_value=latest_remote + ), ): - mock_download.return_value = Path("fake/path/igs20_2375.atx") - + result = atx.get_atx_file(date,db_folder) - - # Ensure `try_downloading_atx_ftp` was triggered with correct arguments - mock_download.assert_called_once_with(latest_remote.name, db_folder) - assert result.name == "igs20_2375.atx" + assert result is not None + assert result.name == latest_remote def test_skip_download_if_same_week(set_up_test): ''' @@ -107,22 +107,28 @@ def test_skip_download_if_same_week(set_up_test): - remote file = igs20_2375.atx → The function should skip download and return the local file ''' - latest_local = SimpleNamespace(name="igs20_2375.atx") - latest_remote = SimpleNamespace(name="igs20_2375.atx") + latest_local = "igs20_2375.atx" + latest_remote = "igs20_2375.atx" date = pd.Timestamp('2025-01-01 12:00:00') db_folder = set_up_test["test_obs_file"].parent - # Mock the dependencies : download, local discovery, and remote fetch - # The first patch mock the function `try_downloading_atx_ftp` in order to check if it was used by `get_atx_file` - # The second and third patch are mocking the functions `find_latest_local_antex_file` and `fetch_latest_remote_antex_file` - # to be in the scenario where the latest local and remote files are both 'igs20_2375.atx'. + with ( - patch("prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp") as mock_download, \ - patch('prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', return_value=latest_local.name), \ - patch('prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', return_value=latest_remote.name), \ + patch( # mock download function + "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp" + ) as mock_download, + patch( # simulate that the latest local file found is 'igs20_2375.atx' + 'prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', + return_value=latest_local + ), + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + 'prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', + return_value=latest_remote + ), ): result = atx.get_atx_file(date,db_folder) - - # Ensure `try_downloading_atx_ftp` was not triggered. + + assert result is not None + # Ensure that the download function was not called mock_download.assert_not_called() # Ensure the returned file is the local one - assert result == latest_local.name + assert result == latest_local From 31e4109bad433e5f297818f673cb21f3dfa4a140 Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Fri, 1 Aug 2025 14:53:15 +0200 Subject: [PATCH 5/6] ruff format --- .../antex/antex_file_discovery.py | 73 ++++++++------- .../datasets/test_antex_file_discovery.py | 93 ++++++++++--------- 2 files changed, 91 insertions(+), 75 deletions(-) diff --git a/src/prx/precise_corrections/antex/antex_file_discovery.py b/src/prx/precise_corrections/antex/antex_file_discovery.py index e77d765f..aabbda17 100644 --- a/src/prx/precise_corrections/antex/antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/antex_file_discovery.py @@ -27,7 +27,8 @@ atx_filename = f"igs20_????.atx" -def date_to_gps_week(date:pd.Timestamp): + +def date_to_gps_week(date: pd.Timestamp): """ Convert a Timestamp to GPS week @@ -38,6 +39,7 @@ def date_to_gps_week(date:pd.Timestamp): gps_week = delta.days // 7 return gps_week + def extract_gps_week(filename: str) -> int: """ Extract GPS week from filenames like 'igsxx_WWWW.atx'. @@ -47,22 +49,25 @@ def extract_gps_week(filename: str) -> int: return int(match.group(1)) return -1 + def atx_file_database_folder(): """ Returns the path to the folder where ATX database files are stored. """ - db_folder = util.prx_repository_root() / "src/prx/precise_corrections/antex/atx_files" + db_folder = ( + util.prx_repository_root() / "src/prx/precise_corrections/antex/atx_files" + ) db_folder.mkdir(exist_ok=True) return db_folder -def find_latest_local_antex_file(db_folder = atx_file_database_folder()): - candidates = list( - db_folder.glob(atx_filename) - ) - if not candidates : + +def find_latest_local_antex_file(db_folder=atx_file_database_folder()): + candidates = list(db_folder.glob(atx_filename)) + if not candidates: return None return max(candidates, key=lambda c: extract_gps_week(c)) + def list_ftp_directory(server, folder): ftp = FTP(server) ftp.login() @@ -71,21 +76,19 @@ def list_ftp_directory(server, folder): ftp.dir(dir_list.append) return [c.split()[-1].strip() for c in dir_list] + def fetch_latest_remote_antex_file(): """ List the ANTEX files available online and returns the latest """ - server = 'gssc.esa.int' + server = "gssc.esa.int" remote_folder = f"/igs/station/general" candidates = list_ftp_directory(server, remote_folder) - candidates = [ - c - for c in candidates - if fnmatch.fnmatch(c, atx_filename) - ] - if not candidates : + candidates = [c for c in candidates if fnmatch.fnmatch(c, atx_filename)] + if not candidates: return None - return max(candidates, key=lambda c:extract_gps_week(c)) + return max(candidates, key=lambda c: extract_gps_week(c)) + def check_online_availability(file: str, folder: Path) -> Path | None: """ @@ -103,11 +106,12 @@ def check_online_availability(file: str, folder: Path) -> Path | None: log.warning(f"{file} not available on {server}") return None -def try_downloading_atx_ftp(file : str, folder : Path): + +def try_downloading_atx_ftp(file: str, folder: Path): """ Download the wanted remote file """ - server = 'gssc.esa.int' + server = "gssc.esa.int" remote_folder = f"/igs/station/general" ftp_file = f"ftp://{server}/{remote_folder}/{file}" local_file = folder / file @@ -118,24 +122,29 @@ def try_downloading_atx_ftp(file : str, folder : Path): log.info(f"Downloaded ANTEX file {ftp_file}") return local_file -def get_atx_file(date: pd.Timestamp, db_folder = atx_file_database_folder()): + +def get_atx_file(date: pd.Timestamp, db_folder=atx_file_database_folder()): gps_week = date_to_gps_week(date) - latest_atx_local = find_latest_local_antex_file(db_folder) + latest_atx_local = find_latest_local_antex_file(db_folder) latest_atx_remote = fetch_latest_remote_antex_file() - if latest_atx_remote == latest_atx_local : + if latest_atx_remote == latest_atx_local: return latest_atx_local - elif latest_atx_remote and (not latest_atx_local or extract_gps_week(latest_atx_remote)> extract_gps_week(latest_atx_local)): - # Download the latest file + elif latest_atx_remote and ( + not latest_atx_local + or extract_gps_week(latest_atx_remote) > extract_gps_week(latest_atx_local) + ): + # Download the latest file atx_file = try_downloading_atx_ftp(latest_atx_remote, db_folder) - if atx_file is not None : - if gps_week > extract_gps_week(atx_file.name) : + if atx_file is not None: + if gps_week > extract_gps_week(atx_file.name): log.warning( - f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." - ) + f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." + ) return atx_file - + raise FileNotFoundError("No file ANTEX found locally or online.") + def discover_or_download_atx_file(observation_file_path=Path()): """ Returns the path to a valid antes file (local or downloaded) corresponding to the observation file. @@ -144,8 +153,10 @@ def discover_or_download_atx_file(observation_file_path=Path()): rinex_3_obs_file = converters.anything_to_rinex_3(observation_file_path) header = georinex.rinexheader(rinex_3_obs_file) - t_start = timestamp_to_mid_day(util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) - - pd.Timedelta(200, unit="milliseconds")) + t_start = timestamp_to_mid_day( + util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) + - pd.Timedelta(200, unit="milliseconds") + ) - atx_file = get_atx_file(t_start) - return atx_file \ No newline at end of file + atx_file = get_atx_file(t_start) + return atx_file diff --git a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py index d9fc92ef..9535a0da 100644 --- a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py +++ b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py @@ -4,9 +4,9 @@ import georinex import pandas as pd import pytest -from unittest.mock import patch +from unittest.mock import patch -import prx +import prx from prx import util from prx.precise_corrections.antex import antex_file_discovery as atx @@ -28,107 +28,112 @@ def set_up_test(): test_obs_file, ) assert test_obs_file.exists() - + yield {"test_obs_file": test_obs_file} shutil.rmtree(test_directory) + def test_extract_gps_week(): filenames = [ - 'igs20_2134.atx', - 'igs_10.atx', - 'igs14_abc.atx', + "igs20_2134.atx", + "igs_10.atx", + "igs14_abc.atx", ] expected_returns = [2134, -1, -1] gps_week = [atx.extract_gps_week(f) for f in filenames] assert gps_week == expected_returns - + + def test_download_if_not_local(set_up_test): """ - Tests that the ANTEX file is downloaded when no local file is available, + Tests that the ANTEX file is downloaded when no local file is available, but a valid remote file exists. - Scenario : + Scenario : - No local ANTEX file available - A remote ANTEX file exists - -> The function should trigger the download of the remote file. + -> The function should trigger the download of the remote file. """ obs_file = set_up_test["test_obs_file"] header = georinex.rinexheader(obs_file) - t_start = util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) - pd.Timedelta(200, unit="milliseconds") + t_start = util.rinex_header_time_string_2_timestamp_ns( + header["TIME OF FIRST OBS"] + ) - pd.Timedelta(200, unit="milliseconds") db_folder = set_up_test["test_obs_file"].parent downloaded_atx = atx.get_atx_file(t_start, db_folder) - + # Ensure the file was downloaded and exists assert downloaded_atx is not None assert downloaded_atx.exists() - + + def test_download_if_remote_is_newer(set_up_test): - ''' - Tests that the ANTEX file is downloaded when the remote file has a newer GPS week + """ + Tests that the ANTEX file is downloaded when the remote file has a newer GPS week than the local one. - Scenario : + Scenario : - local file = igs20_2370.atx - remote file = igs20_2375.atx -> The function should download igs20_2375.atx - ''' + """ latest_local = "igs20_2370.atx" latest_remote = "igs20_2375.atx" - date = pd.Timestamp('2025-01-01 12:00:00') + date = pd.Timestamp("2025-01-01 12:00:00") db_folder = set_up_test["test_obs_file"].parent - + with ( - patch( # replace download function by online availability check - "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp", - new=prx.precise_corrections.antex.antex_file_discovery.check_online_availability + patch( # replace download function by online availability check + "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp", + new=prx.precise_corrections.antex.antex_file_discovery.check_online_availability, ), - patch( # simulate that the latest local file found is 'igs20_2370.atx' - 'prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', - return_value=latest_local + patch( # simulate that the latest local file found is 'igs20_2370.atx' + "prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file", + return_value=latest_local, ), - patch( # simulate that the latest remote file found is 'igs20_2375.atx' - 'prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', - return_value=latest_remote + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file", + return_value=latest_remote, ), ): - - result = atx.get_atx_file(date,db_folder) + result = atx.get_atx_file(date, db_folder) assert result is not None assert result.name == latest_remote + def test_skip_download_if_same_week(set_up_test): - ''' - Tests that the ANTEX file is not downloaded when the remote and local files + """ + Tests that the ANTEX file is not downloaded when the remote and local files are from the same GPS week. Scenario: - local file = igs20_2375.atx - remote file = igs20_2375.atx → The function should skip download and return the local file - ''' + """ latest_local = "igs20_2375.atx" latest_remote = "igs20_2375.atx" - date = pd.Timestamp('2025-01-01 12:00:00') + date = pd.Timestamp("2025-01-01 12:00:00") db_folder = set_up_test["test_obs_file"].parent with ( - patch( # mock download function + patch( # mock download function "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp" ) as mock_download, - patch( # simulate that the latest local file found is 'igs20_2375.atx' - 'prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file', - return_value=latest_local + patch( # simulate that the latest local file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file", + return_value=latest_local, ), - patch( # simulate that the latest remote file found is 'igs20_2375.atx' - 'prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file', - return_value=latest_remote + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file", + return_value=latest_remote, ), ): - result = atx.get_atx_file(date,db_folder) - + result = atx.get_atx_file(date, db_folder) + assert result is not None - # Ensure that the download function was not called + # Ensure that the download function was not called mock_download.assert_not_called() # Ensure the returned file is the local one assert result == latest_local From a262a1e9cbcd12837f6b6097709d25314f42edf1 Mon Sep 17 00:00:00 2001 From: eulaliesa Date: Mon, 4 Aug 2025 16:03:00 +0200 Subject: [PATCH 6/6] Add antex files parser --- .../precise_corrections/antex/antex_parser.py | 240 ++++++++++++++++++ .../antex/test/test_antex_parser.py | 130 ++++++++++ src/prx/test/test_helpers.py | 69 +++++ src/prx/util.py | 5 +- 4 files changed, 443 insertions(+), 1 deletion(-) create mode 100644 src/prx/precise_corrections/antex/antex_parser.py create mode 100644 src/prx/precise_corrections/antex/test/test_antex_parser.py diff --git a/src/prx/precise_corrections/antex/antex_parser.py b/src/prx/precise_corrections/antex/antex_parser.py new file mode 100644 index 00000000..95061333 --- /dev/null +++ b/src/prx/precise_corrections/antex/antex_parser.py @@ -0,0 +1,240 @@ +from pathlib import Path +import pandas as pd +import numpy as np + +from prx.util import ecef_2_satellite + +# def parquet_loading(func): +# """ +# This is a decorator to check if a .parquet file exists and loads it, +# rather than parsing the original file. + +# NOTE: the parquet files must be manually erased when a modification +# is introduced in the parser. + +# To use it, use the @parquet_loading decorator before the parsing +# function definition. + +# Example: +# @parquet_loading +# def prx_to_pandas(path_csv: Path, observation_filter: dict = {}): +# ... +# """ + +# def wrapper(path, **kwargs): +# if path.with_suffix(".parquet").exists(): +# print(f"Reading '{path.with_suffix('.parquet')}'") +# df = pd.read_parquet(path.with_suffix(".parquet")) +# else: +# print(f"Reading '{path.resolve()}' file and saving it as .parquet") +# df = func(path, **kwargs) +# df.to_parquet(path.with_suffix(".parquet")) +# return df + +# return wrapper + + +# @parquet_loading +def parse_atx(filepath: Path): + """ + ANTEX file format description is available at https://files.igs.org/pub/data/format/antex14.txt + """ + + def read_antenna(file): + atx_df = pd.DataFrame( + columns=[ + "antenna_type", + "satellite_or_serial_no", + "valid_from", + "valid_until", + "constellation", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ] + ) + # line with TYPE / SERIAL NO + line = file.readline() + antenna_type = line[0:20].strip() + satellite = line[20:40].strip() + + # line with # of FREQUENCIES + for _ in range(4): + line = file.readline() + nb_freq = int(line[0:6]) + + # line with VALID FROM + line = file.readline() + if "VALID FROM" in line: + valid_from = pd.Timestamp( + year=int(line[0:6]), + month=int(line[6:12]), + day=int(line[12:18]), + hour=int(line[18:24]), + minute=int(line[24:30]), + second=int(line[30:35]), + microsecond=int(np.floor(float(line[36:42]))), + ) + line = file.readline() + else: + valid_from = pd.NaT + + # line with VALID UNTIL + if "VALID UNTIL" in line: + valid_until = pd.Timestamp( + year=int(line[0:6]), + month=int(line[6:12]), + day=int(line[12:18]), + hour=int(line[18:24]), + minute=int(line[24:30]), + second=int(line[30:35]), + microsecond=int(np.floor(float(line[36:42]))), + ) + else: + valid_until = pd.NaT + + # skip line with SINEX CODE or COMMENT + while "START OF FREQUENCY" not in line: + line = file.readline() + + for index in range(nb_freq): + # line with START OF FREQUENCY + constellation = line[3] + carrier_freq_id = int(line[4:6]) + + # line with NORTH / EAST / UP + line = file.readline() + north = float(line[0:10]) * 1e-3 + east = float(line[10:20]) * 1e-3 + up = float(line[20:30]) * 1e-3 + + atx_df.loc[index] = pd.Series( + { + "antenna_type": antenna_type, + "satellite_or_serial_no": satellite, + "valid_from": valid_from, + "valid_until": valid_until, + "constellation": constellation, + "carrier_freq_id": carrier_freq_id, + "pco_north_m": north, + "pco_east_m": east, + "pco_up_m": up, + } + ) + + # skip lines until line with END OF FREQUENCY + while "END OF FREQUENCY" not in line: + line = file.readline() + line = file.readline() + + return file, atx_df + + with open(filepath) as file: + atx_df = pd.DataFrame() + while True: + line = file.readline() + if not line: # check if the end of file has been reached + break + if "START OF ANTENNA" in line: + file, atx_df_ant = read_antenna(file) + atx_df = pd.concat([atx_df, atx_df_ant], ignore_index=True) + + return atx_df + + +def compute_pco_sat( + sat_id: np.array, sat_pos: np.array, rx_pos: np.array, epochs: np.array, atx_df +) -> np.array: + """ + Compute the satellite Phase Center Offset + + Reference: RTKLIB manual v2.4.2, p 173 + """ + query = pd.DataFrame( + { + "satellite": sat_id, + "epoch": epochs, + } + ) + + pco_sat = pd.DataFrame( + columns=[ + "satellite_or_serial_no", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ] + ) + for sat, group in query.groupby("satellite"): + epoch_min = group.epoch.min() + epoch_max = group.epoch.max() + to_be_apended = atx_df.loc[ + (atx_df.satellite_or_serial_no == sat) + & (atx_df.valid_from < epoch_min) + & ((atx_df.valid_until > epoch_max) | (atx_df.valid_until.isnull())), + [ + "satellite_or_serial_no", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ], + ] + if len(pco_sat) == 0: + pco_sat = to_be_apended + else: + pco_sat = pd.concat([pco_sat, to_be_apended], ignore_index=True) + # rename columns + pco_sat = pco_sat.rename( + columns={ + "satellite_or_serial_no": "satellite", + "pco_north_m": "pco_x_m", + "pco_east_m": "pco_y_m", + "pco_up_m": "pco_z_m", + } + ) + + freq_list = pco_sat.carrier_freq_id.unique() + + # unstack carrier_freq_id + pco_sat = pco_sat.set_index(["satellite", "carrier_freq_id"]).unstack(1) + # rename columns + pco_sat.columns = [ + cols[0][0:4] + str(cols[1]) + cols[0][3:] for cols in pco_sat.columns + ] + pco_sat = pco_sat.reindex(sorted(pco_sat.columns), axis=1) + + _, rot_mat_ecef2sat = ecef_2_satellite(sat_pos, sat_pos, epochs) + + pco_effect_list = [] + for freq in freq_list: + pco_ecef = np.array( + [ + rot_mat_ecef2sat[idx, :, :].T + @ pco_sat.loc[ + sat, + [ + "pco_" + str(freq) + "_x_m", + "pco_" + str(freq) + "_y_m", + "pco_" + str(freq) + "_z_m", + ], + ] + .to_numpy() + .reshape(-1, 1) + for idx, sat in enumerate(sat_id) + ] + ).reshape(len(sat_id), 3) + + pco_effect_list.append( + np.vecdot( + pco_ecef, + (sat_pos - rx_pos) + / np.linalg.norm(sat_pos - rx_pos, axis=1).reshape(-1, 1), + ) + ) + + pco_effect = np.stack(pco_effect_list, axis=1) + return pco_effect, freq_list + diff --git a/src/prx/precise_corrections/antex/test/test_antex_parser.py b/src/prx/precise_corrections/antex/test/test_antex_parser.py new file mode 100644 index 00000000..6c61023b --- /dev/null +++ b/src/prx/precise_corrections/antex/test/test_antex_parser.py @@ -0,0 +1,130 @@ +import os +from pathlib import Path +import shutil +import numpy as np +import pandas as pd +import pytest + +from prx import util + +@pytest.fixture +def input_for_test(): + test_directory = Path(f"./tmp_test_directory_{__name__}").resolve() + if test_directory.exists(): + # Make sure the expected file has not been generated before and is still on disk due to e.g. a previous + # test run having crashed: + shutil.rmtree(test_directory) + os.makedirs(test_directory) + compressed_compact_rinex_file = "TLSE00FRA_R_20230010100_10S_01S_MO.crx.gz" + test_file = {"obs": test_directory.joinpath(compressed_compact_rinex_file)} + shutil.copy( + Path(__file__).parent + / f"datasets/TLSE_2023001/{compressed_compact_rinex_file}", + test_file["obs"], + ) + assert test_file["obs"].exists() + + # Also provide ephemerides so the test does not have to download them: + ephemerides_file = "BRDC00IGS_R_20230010000_01D_MN.rnx.zip" + test_file["nav"] = test_directory.joinpath(ephemerides_file) + shutil.copy( + Path(__file__).parent / f"datasets/TLSE_2023001/{ephemerides_file}", + test_file["nav"].parent.joinpath(ephemerides_file), + ) + assert test_file["nav"].parent.joinpath(ephemerides_file).exists() + + # sp3 file + sp3_file = "GFZ0MGXRAP_20230010000_01D_05M_ORB.SP3" + test_file["sp3"] = test_directory.joinpath(sp3_file) + shutil.copy( + Path(__file__).parent / f"datasets/TLSE_2023001/{sp3_file}", + test_file["sp3"].parent.joinpath(sp3_file), + ) + assert test_file["sp3"].parent.joinpath(sp3_file).exists() + + yield test_file + shutil.rmtree(test_directory) + +def test_pco_sat(): + # sat frame: z pointing to the geocenter, y perpendicular to the plane (geocenter,sun,sat), x = cross(e_y,e_z) + pco_corr_list = [] + # First scenario: (sun) + # - sat, sun, geocenter at z_ecef = 0 + # - sun at [0, y_ecef = 1 AU, 0] ↑ y_ecef + # - sat at [x_ecef = a_sat, 0, 0] (earth) (sat) |--> x_ecef + # ==> the 3 axis of the sat frame are approximately x_sat = +y_ecef, y_sat = -z_ecef, z_sat = -x_ecef + # R_sat2ecef = [ 0 0 -1 + # +1 0 0 + # 0 -1 0] + # With a pco of [1,0,1] in sat frame, we should obtain a pco of [-1,1,0] in ecef frame + sun_pos_ecef = np.array([0, 149_597_870_700, 0]) + sat_pos_ecef = np.array([20_200 + 6_400, 0, 0]) + rx_pos_ecef = np.array([6_400, 0, 0]) + pco_sat = np.array([1, 0, 1]).reshape(-1, 1) + + e_z = -sat_pos_ecef / np.linalg.norm(sat_pos_ecef) + e_s = (sun_pos_ecef - rx_pos_ecef) / np.linalg.norm(sun_pos_ecef - rx_pos_ecef) + e_y = np.cross(e_z, e_s) / np.linalg.norm(np.cross(e_z, e_s)) + e_x = np.cross(e_y, e_z) + + rot_sat2mat = np.stack([e_x, e_y, e_z]).T + assert (rot_sat2mat == np.array([[0, 0, -1], [1, 0, 0], [0, -1, 0]])).all() + pco_ecef = (rot_sat2mat @ pco_sat).reshape(3) + assert (pco_ecef == np.array([-1, 1, 0])).all() + pco_corr_list.append(pco_ecef.dot(rx_pos_ecef) / np.linalg.norm(rx_pos_ecef)) + + # Second scenario: (earth) (sat) + # - sat, sun, geocenter at z_ecef = 0 + # - sun at [0, y_ecef = -1 AU, 0] ↑ y_ecef + # - sat at [x_ecef = a_sat, 0, 0] (sun) |--> x_ecef + # ==> the 3 axis of the sat frame are approximately x_sat = -y_ecef, y_sat = +z_ecef, z_sat = -x_ecef + # R_sat2ecef = [ 0 0 -1 + # -1 0 0 + # 0 +1 0] + # With a pco of [1,0,1] in sat frame, we should obtain a pco of [-1,-1,0] in ecef frame + sun_pos_ecef = np.array([0, -149_597_870_700, 0]) + sat_pos_ecef = np.array([20_200 + 6_400, 0, 0]) + rx_pos_ecef = np.array([6_400, 0, 0]) + pco_sat = np.array([1, 0, 1]).reshape(-1, 1) + + e_z = -sat_pos_ecef / np.linalg.norm(sat_pos_ecef) + e_s = (sun_pos_ecef - rx_pos_ecef) / np.linalg.norm(sun_pos_ecef - rx_pos_ecef) + e_y = np.cross(e_z, e_s) / np.linalg.norm(np.cross(e_z, e_s)) + e_x = np.cross(e_y, e_z) + + rot_sat2mat = np.stack([e_x, e_y, e_z]).T + assert (rot_sat2mat == np.array([[0, 0, -1], [-1, 0, 0], [0, 1, 0]])).all() + pco_ecef = (rot_sat2mat @ pco_sat).reshape(3) + assert (pco_ecef == np.array([-1, -1, 0])).all() + pco_corr_list.append(pco_ecef.dot(rx_pos_ecef) / np.linalg.norm(rx_pos_ecef)) + + # pco correction computation with function + pco_corr_function, _ = util.compute_pco_sat( + np.array(["G01"] * 2), + np.array([sat_pos_ecef] * 2), + np.array([rx_pos_ecef] * 2), + np.array( + [ + pd.Timestamp( + year=2020, month=6, day=20, hour=6, minute=6 + ), # approx time when the sun is "close" to [0,1,0] + pd.Timestamp( + year=2020, month=6, day=20, hour=18, minute=6 + ), # approx time when the sun is "close" to [0,-1,0] + ] + ), + pd.DataFrame( + data={ + "satellite_or_serial_no": ["G01"] * 5, + "valid_from": [pd.Timestamp("2020-01-01")] * 5, + "valid_until": [pd.Timestamp("2030-01-01")] * 5, + "constellation": ["G"] * 5, + "carrier_freq_id": [1, 2, 5, 6, 7], + "pco_north_m": [1] * 5, + "pco_east_m": [0] * 5, + "pco_up_m": [1] * 5, + }, + ), + ) + assert pco_corr_list == pytest.approx(pco_corr_function[:, 0]) + diff --git a/src/prx/test/test_helpers.py b/src/prx/test/test_helpers.py index c37bc1c8..ad7208b0 100644 --- a/src/prx/test/test_helpers.py +++ b/src/prx/test/test_helpers.py @@ -344,3 +344,72 @@ def test_timestamp_to_mid_day(): assert util.timestamp_to_mid_day( pd.Timestamp("2023-01-01T01:02:03") ) == pd.Timestamp("2023-01-01T12:00:00") + + +def test_sun_pos(): + pos_sun = ( + util.compute_sun_ecef_position( + np.array( + [ + pd.Timestamp( + year=2020, month=6, day=20, hour=6 + ), # approx time when the sun is "close" to [0,1,xx] AU (Equinox)pd.Timestamp( + pd.Timestamp( + year=2020, month=6, day=20, hour=12 + ), # approx time when the sun is "close" to [1,0,xx] AU (Equinox) + pd.Timestamp( + year=2020, month=6, day=20, hour=18 + ), # approx time when the sun is "close" to [0,-1,xx] AU + ] + ) + ) + / np.array([149_597_870_700] * 3) + ) + assert pos_sun[0:2, 0] == pytest.approx(np.array([0, 1]), abs=0.1) + assert pos_sun[0:2, 1] == pytest.approx(np.array([1, 0]), abs=0.1) + assert pos_sun[0:2, 2] == pytest.approx(np.array([0, -1]), abs=0.1) + + +def test_sat_frame(): + """ + Some tests to validate the conversion between ECEF and satellite-fixed frame. + + In order to control the sun position, the function util.ecef_2_satellite is not used, but its content is + replicated here. + """ + pos_sat_ecef = np.array([26_600_000, 0, 0]).reshape(1, -1) + pos_ecef = np.array([6_400_000, 0, 0]).reshape(1, -1) + + k = -pos_sat_ecef / np.linalg.norm(pos_sat_ecef, axis=1).reshape(1, -1) + pos_sun_ecef = np.array([0, 149_597_870_700, 0]).reshape(1, -1) # 1 AU on y + unit_vector_sun_ecef = (pos_sun_ecef - pos_sat_ecef) / np.linalg.norm( + pos_sun_ecef - pos_sat_ecef, axis=1 + ).reshape(-1, 1) + j = np.cross(k, unit_vector_sun_ecef) + i = np.cross(j, k) + + rot_mat_ecef2sat = np.stack([i, j, k], axis=1) + pos_sat_frame = np.stack( + [ + rot_mat_ecef2sat[i, :, :] @ (pos_ecef[i, :] - pos_sat_ecef[i, :]) + for i in range(pos_sat_ecef.shape[0]) + ] + ) + assert pos_sat_frame[0, :] == pytest.approx(np.array([0, 0, 20_200_000])) + + # other simple examples + pos_sun = np.array([0, 149_597_870_700, 0]) # 1 AU on y + pos_rx = np.array([6_400_000, 0, 0]) # on +x + pos_sat = np.array([26_600_000, 0, 0]) # on +x + + k = -pos_sat / np.linalg.norm(pos_sat) + unit_vector_sun_ecef = (pos_sun - pos_sat) / np.linalg.norm(pos_sun - pos_sat) + j = np.cross(k, unit_vector_sun_ecef) + i = np.cross(j, k) + + pos_rx_sat = np.stack([i, j, k], axis=0) @ (pos_rx - pos_sat) + assert (pos_rx_sat == np.array([0, 0, 20_200_000])).all() + + pos_rx = np.array([0, 6_400_000, 0]) # on +y + pos_rx_sat = np.stack([i, j, k], axis=0) @ (pos_rx - pos_sat) + assert pos_rx_sat == pytest.approx(np.array([6_400_000, 0, 20_200_000 + 6_400_000])) diff --git a/src/prx/util.py b/src/prx/util.py index 9345fec2..56f68265 100644 --- a/src/prx/util.py +++ b/src/prx/util.py @@ -443,6 +443,7 @@ def ecef_2_geodetic(pos_ecef): ) return [latitude_rad, longitude_rad, altitude_m] + def ecef_2_satellite(pos_ecef: np.array, pos_sat_ecef: np.array, epoch: np.array): """ Converts an ecef position to the satellite-fixed coordinate frame @@ -469,6 +470,7 @@ def ecef_2_satellite(pos_ecef: np.array, pos_sat_ecef: np.array, epoch: np.array ) return pos_sat_frame, rot_mat_ecef2sat + def obs_dataset_to_obs_dataframe(ds: xarray.Dataset): # Flatten the xarray DataSet into a pandas DataFrame: logger.info("Converting Dataset into flat Dataframe of observations") @@ -546,6 +548,7 @@ def compute_gps_utc_leap_seconds(yyyy: int, doy: int): assert ~np.isnan(ls), "GPS leap second could not be retrieved" return ls + def compute_sun_ecef_position(epochs: np.array) -> np.array: """ Compute the Sun's ECEF position using the Astropy library @@ -553,4 +556,4 @@ def compute_sun_ecef_position(epochs: np.array) -> np.array: time = astrotime.Time(epochs, scale="utc") sun_gcrs = get_sun(time) sun_ecef = sun_gcrs.transform_to(ITRS(obstime=time)) - return sun_ecef.cartesian.xyz.to(astropy.units.meter).value \ No newline at end of file + return sun_ecef.cartesian.xyz.to(astropy.units.meter).value