diff --git a/src/prx/precise_corrections/antex/antex_file_discovery.py b/src/prx/precise_corrections/antex/antex_file_discovery.py new file mode 100644 index 00000000..aabbda17 --- /dev/null +++ b/src/prx/precise_corrections/antex/antex_file_discovery.py @@ -0,0 +1,162 @@ +from ftplib import FTP +import ftplib +from pathlib import Path +import re + +import georinex +import pandas as pd +import urllib +import fnmatch + +from prx import converters +from prx import util +from prx.util import timestamp_to_mid_day + +""" +Logic overview: +- The code first extracts the GPS week from the provided RINEX observation file. +- It then identifies the most recent ANTEX file available both locally and remotely. +- If both local and remote ANTEX files are the same, the local version is reused, as the local database is considered up to date. +- Otherwise, the latest remote file is downloaded and used. +- Note: If the most recent ANTEX file has a GPS week that is earlier than the observation file's GPS week +(e.g., the remote database has not yet been updated), a warning is displayed to indicate that the most recent ANTEX file +was still used despite being older than the observation file. +""" + +log = util.get_logger(__name__) + +atx_filename = f"igs20_????.atx" + + +def date_to_gps_week(date: pd.Timestamp): + """ + Convert a Timestamp to GPS week + + GPS week starts on 01/06/1980 (sunday) + """ + gps_epoch = pd.Timestamp("1980-01-06T00:00:00Z") + delta = date.tz_localize("UTC") - gps_epoch + gps_week = delta.days // 7 + return gps_week + + +def extract_gps_week(filename: str) -> int: + """ + Extract GPS week from filenames like 'igsxx_WWWW.atx'. + """ + match = re.search(r"igs...(\d{4})\.atx", filename) + if match: + return int(match.group(1)) + return -1 + + +def atx_file_database_folder(): + """ + Returns the path to the folder where ATX database files are stored. + """ + db_folder = ( + util.prx_repository_root() / "src/prx/precise_corrections/antex/atx_files" + ) + db_folder.mkdir(exist_ok=True) + return db_folder + + +def find_latest_local_antex_file(db_folder=atx_file_database_folder()): + candidates = list(db_folder.glob(atx_filename)) + if not candidates: + return None + return max(candidates, key=lambda c: extract_gps_week(c)) + + +def list_ftp_directory(server, folder): + ftp = FTP(server) + ftp.login() + ftp.cwd(folder) + dir_list = [] + ftp.dir(dir_list.append) + return [c.split()[-1].strip() for c in dir_list] + + +def fetch_latest_remote_antex_file(): + """ + List the ANTEX files available online and returns the latest + """ + server = "gssc.esa.int" + remote_folder = f"/igs/station/general" + candidates = list_ftp_directory(server, remote_folder) + candidates = [c for c in candidates if fnmatch.fnmatch(c, atx_filename)] + if not candidates: + return None + return max(candidates, key=lambda c: extract_gps_week(c)) + + +def check_online_availability(file: str, folder: Path) -> Path | None: + """ + Need to keep the same inputs as try_downloading_sp3_ftp, in order to be able to use `unittest.mock.patch` in tests + """ + server = "gssc.esa.int" + remote_folder = f"/igs/station/general" + ftp = FTP(server) + ftp.login() + ftp.cwd(remote_folder) + try: + ftp.size(file) + return folder.joinpath(file) + except ftplib.error_perm: + log.warning(f"{file} not available on {server}") + return None + + +def try_downloading_atx_ftp(file: str, folder: Path): + """ + Download the wanted remote file + """ + server = "gssc.esa.int" + remote_folder = f"/igs/station/general" + ftp_file = f"ftp://{server}/{remote_folder}/{file}" + local_file = folder / file + urllib.request.urlretrieve(ftp_file, local_file) + if not local_file.exists(): + log.warning(f"Could not download {ftp_file}") + return None + log.info(f"Downloaded ANTEX file {ftp_file}") + return local_file + + +def get_atx_file(date: pd.Timestamp, db_folder=atx_file_database_folder()): + gps_week = date_to_gps_week(date) + latest_atx_local = find_latest_local_antex_file(db_folder) + latest_atx_remote = fetch_latest_remote_antex_file() + if latest_atx_remote == latest_atx_local: + return latest_atx_local + elif latest_atx_remote and ( + not latest_atx_local + or extract_gps_week(latest_atx_remote) > extract_gps_week(latest_atx_local) + ): + # Download the latest file + atx_file = try_downloading_atx_ftp(latest_atx_remote, db_folder) + if atx_file is not None: + if gps_week > extract_gps_week(atx_file.name): + log.warning( + f"No ANTEX file found for the target GPS week {gps_week} — using the most recent available instead." + ) + return atx_file + + raise FileNotFoundError("No file ANTEX found locally or online.") + + +def discover_or_download_atx_file(observation_file_path=Path()): + """ + Returns the path to a valid antes file (local or downloaded) corresponding to the observation file. + """ + log.info(f"Finding auxiliary files for {observation_file_path} ...") + rinex_3_obs_file = converters.anything_to_rinex_3(observation_file_path) + header = georinex.rinexheader(rinex_3_obs_file) + + t_start = timestamp_to_mid_day( + util.rinex_header_time_string_2_timestamp_ns(header["TIME OF FIRST OBS"]) + - pd.Timedelta(200, unit="milliseconds") + ) + + atx_file = get_atx_file(t_start) + return atx_file diff --git a/src/prx/precise_corrections/antex/antex_parser.py b/src/prx/precise_corrections/antex/antex_parser.py new file mode 100644 index 00000000..95061333 --- /dev/null +++ b/src/prx/precise_corrections/antex/antex_parser.py @@ -0,0 +1,240 @@ +from pathlib import Path +import pandas as pd +import numpy as np + +from prx.util import ecef_2_satellite + +# def parquet_loading(func): +# """ +# This is a decorator to check if a .parquet file exists and loads it, +# rather than parsing the original file. + +# NOTE: the parquet files must be manually erased when a modification +# is introduced in the parser. + +# To use it, use the @parquet_loading decorator before the parsing +# function definition. + +# Example: +# @parquet_loading +# def prx_to_pandas(path_csv: Path, observation_filter: dict = {}): +# ... +# """ + +# def wrapper(path, **kwargs): +# if path.with_suffix(".parquet").exists(): +# print(f"Reading '{path.with_suffix('.parquet')}'") +# df = pd.read_parquet(path.with_suffix(".parquet")) +# else: +# print(f"Reading '{path.resolve()}' file and saving it as .parquet") +# df = func(path, **kwargs) +# df.to_parquet(path.with_suffix(".parquet")) +# return df + +# return wrapper + + +# @parquet_loading +def parse_atx(filepath: Path): + """ + ANTEX file format description is available at https://files.igs.org/pub/data/format/antex14.txt + """ + + def read_antenna(file): + atx_df = pd.DataFrame( + columns=[ + "antenna_type", + "satellite_or_serial_no", + "valid_from", + "valid_until", + "constellation", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ] + ) + # line with TYPE / SERIAL NO + line = file.readline() + antenna_type = line[0:20].strip() + satellite = line[20:40].strip() + + # line with # of FREQUENCIES + for _ in range(4): + line = file.readline() + nb_freq = int(line[0:6]) + + # line with VALID FROM + line = file.readline() + if "VALID FROM" in line: + valid_from = pd.Timestamp( + year=int(line[0:6]), + month=int(line[6:12]), + day=int(line[12:18]), + hour=int(line[18:24]), + minute=int(line[24:30]), + second=int(line[30:35]), + microsecond=int(np.floor(float(line[36:42]))), + ) + line = file.readline() + else: + valid_from = pd.NaT + + # line with VALID UNTIL + if "VALID UNTIL" in line: + valid_until = pd.Timestamp( + year=int(line[0:6]), + month=int(line[6:12]), + day=int(line[12:18]), + hour=int(line[18:24]), + minute=int(line[24:30]), + second=int(line[30:35]), + microsecond=int(np.floor(float(line[36:42]))), + ) + else: + valid_until = pd.NaT + + # skip line with SINEX CODE or COMMENT + while "START OF FREQUENCY" not in line: + line = file.readline() + + for index in range(nb_freq): + # line with START OF FREQUENCY + constellation = line[3] + carrier_freq_id = int(line[4:6]) + + # line with NORTH / EAST / UP + line = file.readline() + north = float(line[0:10]) * 1e-3 + east = float(line[10:20]) * 1e-3 + up = float(line[20:30]) * 1e-3 + + atx_df.loc[index] = pd.Series( + { + "antenna_type": antenna_type, + "satellite_or_serial_no": satellite, + "valid_from": valid_from, + "valid_until": valid_until, + "constellation": constellation, + "carrier_freq_id": carrier_freq_id, + "pco_north_m": north, + "pco_east_m": east, + "pco_up_m": up, + } + ) + + # skip lines until line with END OF FREQUENCY + while "END OF FREQUENCY" not in line: + line = file.readline() + line = file.readline() + + return file, atx_df + + with open(filepath) as file: + atx_df = pd.DataFrame() + while True: + line = file.readline() + if not line: # check if the end of file has been reached + break + if "START OF ANTENNA" in line: + file, atx_df_ant = read_antenna(file) + atx_df = pd.concat([atx_df, atx_df_ant], ignore_index=True) + + return atx_df + + +def compute_pco_sat( + sat_id: np.array, sat_pos: np.array, rx_pos: np.array, epochs: np.array, atx_df +) -> np.array: + """ + Compute the satellite Phase Center Offset + + Reference: RTKLIB manual v2.4.2, p 173 + """ + query = pd.DataFrame( + { + "satellite": sat_id, + "epoch": epochs, + } + ) + + pco_sat = pd.DataFrame( + columns=[ + "satellite_or_serial_no", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ] + ) + for sat, group in query.groupby("satellite"): + epoch_min = group.epoch.min() + epoch_max = group.epoch.max() + to_be_apended = atx_df.loc[ + (atx_df.satellite_or_serial_no == sat) + & (atx_df.valid_from < epoch_min) + & ((atx_df.valid_until > epoch_max) | (atx_df.valid_until.isnull())), + [ + "satellite_or_serial_no", + "carrier_freq_id", + "pco_north_m", + "pco_east_m", + "pco_up_m", + ], + ] + if len(pco_sat) == 0: + pco_sat = to_be_apended + else: + pco_sat = pd.concat([pco_sat, to_be_apended], ignore_index=True) + # rename columns + pco_sat = pco_sat.rename( + columns={ + "satellite_or_serial_no": "satellite", + "pco_north_m": "pco_x_m", + "pco_east_m": "pco_y_m", + "pco_up_m": "pco_z_m", + } + ) + + freq_list = pco_sat.carrier_freq_id.unique() + + # unstack carrier_freq_id + pco_sat = pco_sat.set_index(["satellite", "carrier_freq_id"]).unstack(1) + # rename columns + pco_sat.columns = [ + cols[0][0:4] + str(cols[1]) + cols[0][3:] for cols in pco_sat.columns + ] + pco_sat = pco_sat.reindex(sorted(pco_sat.columns), axis=1) + + _, rot_mat_ecef2sat = ecef_2_satellite(sat_pos, sat_pos, epochs) + + pco_effect_list = [] + for freq in freq_list: + pco_ecef = np.array( + [ + rot_mat_ecef2sat[idx, :, :].T + @ pco_sat.loc[ + sat, + [ + "pco_" + str(freq) + "_x_m", + "pco_" + str(freq) + "_y_m", + "pco_" + str(freq) + "_z_m", + ], + ] + .to_numpy() + .reshape(-1, 1) + for idx, sat in enumerate(sat_id) + ] + ).reshape(len(sat_id), 3) + + pco_effect_list.append( + np.vecdot( + pco_ecef, + (sat_pos - rx_pos) + / np.linalg.norm(sat_pos - rx_pos, axis=1).reshape(-1, 1), + ) + ) + + pco_effect = np.stack(pco_effect_list, axis=1) + return pco_effect, freq_list + diff --git a/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py new file mode 100644 index 00000000..9535a0da --- /dev/null +++ b/src/prx/precise_corrections/antex/test/datasets/test_antex_file_discovery.py @@ -0,0 +1,139 @@ +import os +from pathlib import Path +import shutil +import georinex +import pandas as pd +import pytest +from unittest.mock import patch + +import prx +from prx import util +from prx.precise_corrections.antex import antex_file_discovery as atx + + +@pytest.fixture +def set_up_test(): + test_directory = Path(f"./tmp_test_directory_{__name__}").resolve() + if test_directory.exists(): + # Make sure the expected files has not been generated before and is still on disk due to e.g. a previous + # test run having crashed: + shutil.rmtree(test_directory) + os.makedirs(test_directory) + test_obs_file = test_directory.joinpath("TLSE00FRA_R_20230010100_10S_01S_MO.crx.gz") + # local_database = atx.atx_file_folder(pd.Timestamp('2023-01-01'), Path('src/prx/precise_corrections/atx/test/datasets')) + + shutil.copy( + util.prx_repository_root() + / f"src/prx/test/datasets/TLSE_2023001/{test_obs_file.name}", + test_obs_file, + ) + assert test_obs_file.exists() + + yield {"test_obs_file": test_obs_file} + shutil.rmtree(test_directory) + + +def test_extract_gps_week(): + filenames = [ + "igs20_2134.atx", + "igs_10.atx", + "igs14_abc.atx", + ] + expected_returns = [2134, -1, -1] + gps_week = [atx.extract_gps_week(f) for f in filenames] + assert gps_week == expected_returns + + +def test_download_if_not_local(set_up_test): + """ + Tests that the ANTEX file is downloaded when no local file is available, + but a valid remote file exists. + + Scenario : + - No local ANTEX file available + - A remote ANTEX file exists + -> The function should trigger the download of the remote file. + """ + obs_file = set_up_test["test_obs_file"] + header = georinex.rinexheader(obs_file) + t_start = util.rinex_header_time_string_2_timestamp_ns( + header["TIME OF FIRST OBS"] + ) - pd.Timedelta(200, unit="milliseconds") + + db_folder = set_up_test["test_obs_file"].parent + downloaded_atx = atx.get_atx_file(t_start, db_folder) + + # Ensure the file was downloaded and exists + assert downloaded_atx is not None + assert downloaded_atx.exists() + + +def test_download_if_remote_is_newer(set_up_test): + """ + Tests that the ANTEX file is downloaded when the remote file has a newer GPS week + than the local one. + + Scenario : + - local file = igs20_2370.atx + - remote file = igs20_2375.atx + -> The function should download igs20_2375.atx + """ + latest_local = "igs20_2370.atx" + latest_remote = "igs20_2375.atx" + date = pd.Timestamp("2025-01-01 12:00:00") + db_folder = set_up_test["test_obs_file"].parent + + with ( + patch( # replace download function by online availability check + "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp", + new=prx.precise_corrections.antex.antex_file_discovery.check_online_availability, + ), + patch( # simulate that the latest local file found is 'igs20_2370.atx' + "prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file", + return_value=latest_local, + ), + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file", + return_value=latest_remote, + ), + ): + result = atx.get_atx_file(date, db_folder) + assert result is not None + assert result.name == latest_remote + + +def test_skip_download_if_same_week(set_up_test): + """ + Tests that the ANTEX file is not downloaded when the remote and local files + are from the same GPS week. + + Scenario: + - local file = igs20_2375.atx + - remote file = igs20_2375.atx + → The function should skip download and return the local file + """ + latest_local = "igs20_2375.atx" + latest_remote = "igs20_2375.atx" + date = pd.Timestamp("2025-01-01 12:00:00") + db_folder = set_up_test["test_obs_file"].parent + + with ( + patch( # mock download function + "prx.precise_corrections.antex.antex_file_discovery.try_downloading_atx_ftp" + ) as mock_download, + patch( # simulate that the latest local file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.find_latest_local_antex_file", + return_value=latest_local, + ), + patch( # simulate that the latest remote file found is 'igs20_2375.atx' + "prx.precise_corrections.antex.antex_file_discovery.fetch_latest_remote_antex_file", + return_value=latest_remote, + ), + ): + result = atx.get_atx_file(date, db_folder) + + assert result is not None + # Ensure that the download function was not called + mock_download.assert_not_called() + # Ensure the returned file is the local one + assert result == latest_local diff --git a/src/prx/precise_corrections/antex/test/test_antex_parser.py b/src/prx/precise_corrections/antex/test/test_antex_parser.py new file mode 100644 index 00000000..6c61023b --- /dev/null +++ b/src/prx/precise_corrections/antex/test/test_antex_parser.py @@ -0,0 +1,130 @@ +import os +from pathlib import Path +import shutil +import numpy as np +import pandas as pd +import pytest + +from prx import util + +@pytest.fixture +def input_for_test(): + test_directory = Path(f"./tmp_test_directory_{__name__}").resolve() + if test_directory.exists(): + # Make sure the expected file has not been generated before and is still on disk due to e.g. a previous + # test run having crashed: + shutil.rmtree(test_directory) + os.makedirs(test_directory) + compressed_compact_rinex_file = "TLSE00FRA_R_20230010100_10S_01S_MO.crx.gz" + test_file = {"obs": test_directory.joinpath(compressed_compact_rinex_file)} + shutil.copy( + Path(__file__).parent + / f"datasets/TLSE_2023001/{compressed_compact_rinex_file}", + test_file["obs"], + ) + assert test_file["obs"].exists() + + # Also provide ephemerides so the test does not have to download them: + ephemerides_file = "BRDC00IGS_R_20230010000_01D_MN.rnx.zip" + test_file["nav"] = test_directory.joinpath(ephemerides_file) + shutil.copy( + Path(__file__).parent / f"datasets/TLSE_2023001/{ephemerides_file}", + test_file["nav"].parent.joinpath(ephemerides_file), + ) + assert test_file["nav"].parent.joinpath(ephemerides_file).exists() + + # sp3 file + sp3_file = "GFZ0MGXRAP_20230010000_01D_05M_ORB.SP3" + test_file["sp3"] = test_directory.joinpath(sp3_file) + shutil.copy( + Path(__file__).parent / f"datasets/TLSE_2023001/{sp3_file}", + test_file["sp3"].parent.joinpath(sp3_file), + ) + assert test_file["sp3"].parent.joinpath(sp3_file).exists() + + yield test_file + shutil.rmtree(test_directory) + +def test_pco_sat(): + # sat frame: z pointing to the geocenter, y perpendicular to the plane (geocenter,sun,sat), x = cross(e_y,e_z) + pco_corr_list = [] + # First scenario: (sun) + # - sat, sun, geocenter at z_ecef = 0 + # - sun at [0, y_ecef = 1 AU, 0] ↑ y_ecef + # - sat at [x_ecef = a_sat, 0, 0] (earth) (sat) |--> x_ecef + # ==> the 3 axis of the sat frame are approximately x_sat = +y_ecef, y_sat = -z_ecef, z_sat = -x_ecef + # R_sat2ecef = [ 0 0 -1 + # +1 0 0 + # 0 -1 0] + # With a pco of [1,0,1] in sat frame, we should obtain a pco of [-1,1,0] in ecef frame + sun_pos_ecef = np.array([0, 149_597_870_700, 0]) + sat_pos_ecef = np.array([20_200 + 6_400, 0, 0]) + rx_pos_ecef = np.array([6_400, 0, 0]) + pco_sat = np.array([1, 0, 1]).reshape(-1, 1) + + e_z = -sat_pos_ecef / np.linalg.norm(sat_pos_ecef) + e_s = (sun_pos_ecef - rx_pos_ecef) / np.linalg.norm(sun_pos_ecef - rx_pos_ecef) + e_y = np.cross(e_z, e_s) / np.linalg.norm(np.cross(e_z, e_s)) + e_x = np.cross(e_y, e_z) + + rot_sat2mat = np.stack([e_x, e_y, e_z]).T + assert (rot_sat2mat == np.array([[0, 0, -1], [1, 0, 0], [0, -1, 0]])).all() + pco_ecef = (rot_sat2mat @ pco_sat).reshape(3) + assert (pco_ecef == np.array([-1, 1, 0])).all() + pco_corr_list.append(pco_ecef.dot(rx_pos_ecef) / np.linalg.norm(rx_pos_ecef)) + + # Second scenario: (earth) (sat) + # - sat, sun, geocenter at z_ecef = 0 + # - sun at [0, y_ecef = -1 AU, 0] ↑ y_ecef + # - sat at [x_ecef = a_sat, 0, 0] (sun) |--> x_ecef + # ==> the 3 axis of the sat frame are approximately x_sat = -y_ecef, y_sat = +z_ecef, z_sat = -x_ecef + # R_sat2ecef = [ 0 0 -1 + # -1 0 0 + # 0 +1 0] + # With a pco of [1,0,1] in sat frame, we should obtain a pco of [-1,-1,0] in ecef frame + sun_pos_ecef = np.array([0, -149_597_870_700, 0]) + sat_pos_ecef = np.array([20_200 + 6_400, 0, 0]) + rx_pos_ecef = np.array([6_400, 0, 0]) + pco_sat = np.array([1, 0, 1]).reshape(-1, 1) + + e_z = -sat_pos_ecef / np.linalg.norm(sat_pos_ecef) + e_s = (sun_pos_ecef - rx_pos_ecef) / np.linalg.norm(sun_pos_ecef - rx_pos_ecef) + e_y = np.cross(e_z, e_s) / np.linalg.norm(np.cross(e_z, e_s)) + e_x = np.cross(e_y, e_z) + + rot_sat2mat = np.stack([e_x, e_y, e_z]).T + assert (rot_sat2mat == np.array([[0, 0, -1], [-1, 0, 0], [0, 1, 0]])).all() + pco_ecef = (rot_sat2mat @ pco_sat).reshape(3) + assert (pco_ecef == np.array([-1, -1, 0])).all() + pco_corr_list.append(pco_ecef.dot(rx_pos_ecef) / np.linalg.norm(rx_pos_ecef)) + + # pco correction computation with function + pco_corr_function, _ = util.compute_pco_sat( + np.array(["G01"] * 2), + np.array([sat_pos_ecef] * 2), + np.array([rx_pos_ecef] * 2), + np.array( + [ + pd.Timestamp( + year=2020, month=6, day=20, hour=6, minute=6 + ), # approx time when the sun is "close" to [0,1,0] + pd.Timestamp( + year=2020, month=6, day=20, hour=18, minute=6 + ), # approx time when the sun is "close" to [0,-1,0] + ] + ), + pd.DataFrame( + data={ + "satellite_or_serial_no": ["G01"] * 5, + "valid_from": [pd.Timestamp("2020-01-01")] * 5, + "valid_until": [pd.Timestamp("2030-01-01")] * 5, + "constellation": ["G"] * 5, + "carrier_freq_id": [1, 2, 5, 6, 7], + "pco_north_m": [1] * 5, + "pco_east_m": [0] * 5, + "pco_up_m": [1] * 5, + }, + ), + ) + assert pco_corr_list == pytest.approx(pco_corr_function[:, 0]) + diff --git a/src/prx/test/test_helpers.py b/src/prx/test/test_helpers.py index c37bc1c8..ad7208b0 100644 --- a/src/prx/test/test_helpers.py +++ b/src/prx/test/test_helpers.py @@ -344,3 +344,72 @@ def test_timestamp_to_mid_day(): assert util.timestamp_to_mid_day( pd.Timestamp("2023-01-01T01:02:03") ) == pd.Timestamp("2023-01-01T12:00:00") + + +def test_sun_pos(): + pos_sun = ( + util.compute_sun_ecef_position( + np.array( + [ + pd.Timestamp( + year=2020, month=6, day=20, hour=6 + ), # approx time when the sun is "close" to [0,1,xx] AU (Equinox)pd.Timestamp( + pd.Timestamp( + year=2020, month=6, day=20, hour=12 + ), # approx time when the sun is "close" to [1,0,xx] AU (Equinox) + pd.Timestamp( + year=2020, month=6, day=20, hour=18 + ), # approx time when the sun is "close" to [0,-1,xx] AU + ] + ) + ) + / np.array([149_597_870_700] * 3) + ) + assert pos_sun[0:2, 0] == pytest.approx(np.array([0, 1]), abs=0.1) + assert pos_sun[0:2, 1] == pytest.approx(np.array([1, 0]), abs=0.1) + assert pos_sun[0:2, 2] == pytest.approx(np.array([0, -1]), abs=0.1) + + +def test_sat_frame(): + """ + Some tests to validate the conversion between ECEF and satellite-fixed frame. + + In order to control the sun position, the function util.ecef_2_satellite is not used, but its content is + replicated here. + """ + pos_sat_ecef = np.array([26_600_000, 0, 0]).reshape(1, -1) + pos_ecef = np.array([6_400_000, 0, 0]).reshape(1, -1) + + k = -pos_sat_ecef / np.linalg.norm(pos_sat_ecef, axis=1).reshape(1, -1) + pos_sun_ecef = np.array([0, 149_597_870_700, 0]).reshape(1, -1) # 1 AU on y + unit_vector_sun_ecef = (pos_sun_ecef - pos_sat_ecef) / np.linalg.norm( + pos_sun_ecef - pos_sat_ecef, axis=1 + ).reshape(-1, 1) + j = np.cross(k, unit_vector_sun_ecef) + i = np.cross(j, k) + + rot_mat_ecef2sat = np.stack([i, j, k], axis=1) + pos_sat_frame = np.stack( + [ + rot_mat_ecef2sat[i, :, :] @ (pos_ecef[i, :] - pos_sat_ecef[i, :]) + for i in range(pos_sat_ecef.shape[0]) + ] + ) + assert pos_sat_frame[0, :] == pytest.approx(np.array([0, 0, 20_200_000])) + + # other simple examples + pos_sun = np.array([0, 149_597_870_700, 0]) # 1 AU on y + pos_rx = np.array([6_400_000, 0, 0]) # on +x + pos_sat = np.array([26_600_000, 0, 0]) # on +x + + k = -pos_sat / np.linalg.norm(pos_sat) + unit_vector_sun_ecef = (pos_sun - pos_sat) / np.linalg.norm(pos_sun - pos_sat) + j = np.cross(k, unit_vector_sun_ecef) + i = np.cross(j, k) + + pos_rx_sat = np.stack([i, j, k], axis=0) @ (pos_rx - pos_sat) + assert (pos_rx_sat == np.array([0, 0, 20_200_000])).all() + + pos_rx = np.array([0, 6_400_000, 0]) # on +y + pos_rx_sat = np.stack([i, j, k], axis=0) @ (pos_rx - pos_sat) + assert pos_rx_sat == pytest.approx(np.array([6_400_000, 0, 20_200_000 + 6_400_000])) diff --git a/src/prx/util.py b/src/prx/util.py index c87f4458..56f68265 100644 --- a/src/prx/util.py +++ b/src/prx/util.py @@ -15,6 +15,8 @@ from imohash import imohash from astropy.utils import iers from astropy import time as astrotime +from astropy.coordinates import get_sun, ITRS +import astropy.units from prx import constants from prx.rinex_obs.parser import parse as prx_obs_parse @@ -442,6 +444,33 @@ def ecef_2_geodetic(pos_ecef): return [latitude_rad, longitude_rad, altitude_m] +def ecef_2_satellite(pos_ecef: np.array, pos_sat_ecef: np.array, epoch: np.array): + """ + Converts an ecef position to the satellite-fixed coordinate frame + + pos_sat_ecef: shape (n,3) + pos_ecef: shape (n,3) + epoch: shape (n,) + """ + k = -pos_sat_ecef / np.linalg.norm(pos_sat_ecef, axis=1).reshape(-1, 1) + pos_sun_ecef = compute_sun_ecef_position(epoch).T + unit_vector_sun_ecef = (pos_sun_ecef - pos_sat_ecef) / np.linalg.norm( + pos_sun_ecef - pos_sat_ecef, axis=1 + ).reshape(-1, 1) + j = np.cross(k, unit_vector_sun_ecef) + j = j / np.linalg.norm(j, axis=1).reshape(-1, 1) + i = np.cross(j, k) + + rot_mat_ecef2sat = np.stack([i, j, k], axis=1) + pos_sat_frame = np.stack( + [ + rot_mat_ecef2sat[i, :, :] @ (pos_ecef[i, :] - pos_sat_ecef[i, :]) + for i in range(pos_sat_ecef.shape[0]) + ] + ) + return pos_sat_frame, rot_mat_ecef2sat + + def obs_dataset_to_obs_dataframe(ds: xarray.Dataset): # Flatten the xarray DataSet into a pandas DataFrame: logger.info("Converting Dataset into flat Dataframe of observations") @@ -518,3 +547,13 @@ def compute_gps_utc_leap_seconds(yyyy: int, doy: int): break assert ~np.isnan(ls), "GPS leap second could not be retrieved" return ls + + +def compute_sun_ecef_position(epochs: np.array) -> np.array: + """ + Compute the Sun's ECEF position using the Astropy library + """ + time = astrotime.Time(epochs, scale="utc") + sun_gcrs = get_sun(time) + sun_ecef = sun_gcrs.transform_to(ITRS(obstime=time)) + return sun_ecef.cartesian.xyz.to(astropy.units.meter).value