Source code for astra.frames

from typing import Any
import os  # Moved from inside hot function bodies to module level.
# astra/frames.py
"""ASTRA Core Coordinate Frame Transformations.
High-performance Numba-accelerated kernels for orbital frame rotations (VNB, RTN).
This module acts as a base for both propagator and maneuver logic,
resolving circular dependencies.
"""
import numpy as np  # noqa: E402
# DEF-003: Graceful Numba fallback — mirrors the pattern in propagator.py.
# Without this, importing frames.py would hard-crash in environments where
# Numba is not installed, breaking the propagator's own graceful fallback.
from astra._numba_compat import njit  # noqa: E402
@njit(cache=True)
def _build_vnb_matrix_njit(pos: np.ndarray, vel: np.ndarray) -> np.ndarray:
    """Build the 3x3 rotation matrix from ECI to VNB frame.
    V (Velocity): Along the velocity vector.
    N (Normal): Along the angular momentum vector (R x V).
    B (Binormal): Completes the right-handed triad (V x N).
    """
    v_unit = vel / np.linalg.norm(vel)
    h = np.cross(pos, vel)
    n_unit = h / np.linalg.norm(h)
    b_unit = np.cross(v_unit, n_unit)
    # Matrix for ECI -> VNB: rows are basis vectors
    mat = np.zeros((3, 3))
    mat[0, 0], mat[0, 1], mat[0, 2] = v_unit[0], v_unit[1], v_unit[2]
    mat[1, 0], mat[1, 1], mat[1, 2] = n_unit[0], n_unit[1], n_unit[2]
    mat[2, 0], mat[2, 1], mat[2, 2] = b_unit[0], b_unit[1], b_unit[2]
    return mat  # type: ignore[no-any-return]
@njit(cache=True)
def _build_rtn_matrix_njit(pos: np.ndarray, vel: np.ndarray) -> np.ndarray:
    """Build the 3x3 rotation matrix from ECI to RTN (RIC) frame.
    R (Radial): Along the geocentric position vector.
    T (Transverse/In-track): In the orbital plane, orthogonal to R.
    N (Normal): Along the angular momentum vector (R x V).
    """
    r_unit = pos / np.linalg.norm(pos)
    h = np.cross(pos, vel)
    n_unit = h / np.linalg.norm(h)
    t_unit = np.cross(n_unit, r_unit)
    # Matrix for ECI -> RTN: rows are basis vectors
    mat = np.zeros((3, 3))
    mat[0, 0], mat[0, 1], mat[0, 2] = r_unit[0], r_unit[1], r_unit[2]
    mat[1, 0], mat[1, 1], mat[1, 2] = t_unit[0], t_unit[1], t_unit[2]
    mat[2, 0], mat[2, 1], mat[2, 2] = n_unit[0], n_unit[1], n_unit[2]
    return mat  # type: ignore[no-any-return]
[docs] def get_eop_correction( times_jd: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray] | tuple[float, float, float]: """Fetch Spacebook Earth Orientation Parameters for a set of epochs. DEF-006: Batches calls by calendar day rather than per-timestep. For a 24-hour propagation at 5-minute steps (288 points), this reduces Spacebook API calls from 288 to ~1, with intra-day values taken from the representative midday fetch (EOP variation within a day is sub-ms). Returns: (xp_arcsec, yp_arcsec, dut1_s) arrays matching times_jd shape. If Spacebook is disabled or offline, returns arrays of zeros. """ is_scalar_input = np.isscalar(times_jd) or ( hasattr(times_jd, "ndim") and np.asarray(times_jd).ndim == 0 ) times_arr = np.atleast_1d(np.asarray(times_jd, dtype=float)) zeros = np.zeros(len(times_arr)) from astra.config import SPACEBOOK_ENABLED if not SPACEBOOK_ENABLED: if is_scalar_input: return 0.0, 0.0, 0.0 return zeros.copy(), zeros.copy(), zeros.copy() # type: ignore[no-any-return] try: from astra.spacebook import get_eop_sb # --- DEF-006: Group by calendar date, fetch once per unique day --- # Floor each JD to midnight of that day (integer part). # EOP values change by at most ~0.3 ms/day (dUT1), well below # the 1-second timestep of most propagations. day_keys = np.floor(times_arr).astype(int) unique_days = np.unique(day_keys) eop_cache: dict[int, Any] = {} for day_jd in unique_days: # Query at midday of that calendar date (more representative than midnight) eop_cache[int(day_jd)] = get_eop_sb(float(day_jd) + 0.5) xps = np.array([eop_cache[int(dk)][0] for dk in day_keys]) yps = np.array([eop_cache[int(dk)][1] for dk in day_keys]) dut1s = np.array([eop_cache[int(dk)][2] for dk in day_keys]) # Return scalars when a scalar JD was passed in if is_scalar_input: return float(xps[0]), float(yps[0]), float(dut1s[0]) # type: ignore[no-any-return] return xps, yps, dut1s # type: ignore[no-any-return] except Exception as e: import logging from astra import config from astra.errors import EphemerisError _log = logging.getLogger(__name__) if config.ASTRA_STRICT_MODE: raise EphemerisError( f"[ASTRA STRICT] EOP fetch failed: {e}. " "Cannot continue with degraded accuracy. " "Check network connectivity or set ASTRA_STRICT_MODE=False for relaxed mode." ) from e _log.error( "EOP fetch failed (%s); defaulting to zero correction. " "This may introduce up to ~35 km ground track error in HEO geometries. " "Enable strict mode to prevent silent degradation.", e ) if is_scalar_input: return 0.0, 0.0, 0.0 return (zeros.copy(), zeros.copy(), zeros.copy()) # type: ignore[no-any-return]
[docs] def teme_to_ecef( r_teme: np.ndarray, times_jd: np.ndarray, use_spacebook_eop: bool = True ) -> np.ndarray: """Convert TEME position vectors to ECEF (ITRS) coordinates. If `use_spacebook_eop` is True and Spacebook is enabled, precise COMSPOC EOP metrics are fetched and mathematically integrated into the GCRS-ITRS rotation differential. Otherwise, it natively uses Skyfield's fallback IERS tables. Args: r_teme: (N, 3) matrix of TEME Cartesian coordinates (km) times_jd: (N,) array of Julian Dates use_spacebook_eop: Boolean toggle to inject precise parameters. Returns: (N, 3) matrix of ECEF (ITRS) coordinates. """ from astra import data_pipeline as _dp from skyfield.sgp4lib import TEME from skyfield.framelib import itrs _dp._ensure_skyfield() ts = _dp._skyfield_ts # 1. TEME -> GCRS (Precession / Nutation is native and identical to both) _dp._ensure_skyfield() ts = _dp._skyfield_ts assert ts is not None, "_ensure_skyfield() failed to initialise Skyfield timescale" # times_jd in ASTRA is universally passed around as UTC. # The previous code forcibly cast them to TT (t = ts.tt_jd(times_jd)), # which created a ~69s (32.184s + 37s leap) offset in Earth rotation phase, # causing ~35km ground-track errors in HEO geometries. t = ts._utc_jd(times_jd, 0.0) R_teme_gcrs = ( np.transpose(TEME.rotation_at(t), axes=(1, 0, 2)) if hasattr(t, "shape") else np.transpose(TEME.rotation_at(t)) ) r_gcrs = ( np.einsum("ij...,j...->i...", R_teme_gcrs, r_teme.T).T if hasattr(t, "shape") else R_teme_gcrs.dot(r_teme.T).T ) # 2. Base GCRS -> ITRS using Skyfield cached baseline R_gcrs_itrs = ( np.transpose(itrs.rotation_at(t), axes=(1, 0, 2)) if hasattr(t, "shape") else np.transpose(itrs.rotation_at(t)) ) r_itrs = ( np.einsum("ij...,j...->i...", R_gcrs_itrs, r_gcrs.T).T if hasattr(t, "shape") else R_gcrs_itrs.dot(r_gcrs.T).T ) from astra.config import SPACEBOOK_ENABLED as _sb_enabled spacebook_enabled = _sb_enabled if not use_spacebook_eop or not spacebook_enabled: return r_itrs # type: ignore[no-any-return] # 3. Apply Spacebook EOP Differential (Correction Matrix) xp_sb, yp_sb, dut1_sb = get_eop_correction(times_jd) # Only correct if we got actual data (non-zero combined norms) if ( np.sum(np.abs(xp_sb)) == 0 and np.sum(np.abs(yp_sb)) == 0 and np.sum(np.abs(dut1_sb)) == 0 ): return r_itrs # type: ignore[no-any-return] # Skyfield base parameters # AUDIT-C-02 Verification: Skyfield's Time.polar_motion_angles() returns # (xp, yp, era_rad) where xp and yp are in ARCSECONDS (the IERS Bulletin A # storage convention). This matches the unit of xp_sb / yp_sb from # Spacebook, so the delta below is correctly arc-second-valued before the # (pi/648000) arcsec→radians conversion factor. ERA (the third element) is # in radians but is not used here. dut1_skyfield = t.dut1 xp_sky, yp_sky, _ = t.polar_motion_angles() # both xp_sky, yp_sky in arcsec # Deltas d_dut1 = dut1_sb - dut1_skyfield d_xp_rc = (xp_sb - xp_sky) * (np.pi / (180.0 * 3600.0)) # arcsec to rad d_yp_rc = (yp_sb - yp_sky) * (np.pi / (180.0 * 3600.0)) # EOP sanity clamp: reject physically implausible deltas that # indicate stale, corrupt, or unit-mismatched Spacebook EOP data. # Limits: |ΔUT1| ≤ 1.0 s (IERS spec bound), |Δxp|/|Δyp| ≤ 0.01 rad (~2000 arcsec). import logging as _frames_log _flog = _frames_log.getLogger(__name__) # Define omega_earth BEFORE it is referenced in _MAX_THETA below. # causing an UnboundLocalError whenever the EOP correction branch executed. omega_earth = 7.292115146706979e-5 # rad/s — IAU/IERS 2010 Earth rotation rate _MAX_DUT1 = 1.0 # seconds — IERS hard limit for UT1-UTC _MAX_PM_RAD = 0.01 # radians ≈ 2062 arcsec — well beyond any observed value if np.any(np.abs(d_dut1) > _MAX_DUT1): _flog.warning( "EOP anomaly: |Δ-DUT1|=%.4f s exceeds %.1f s limit — clamping. " "Check Spacebook EOP data freshness.", float(np.max(np.abs(d_dut1))), _MAX_DUT1, ) d_dut1 = np.clip(d_dut1, -_MAX_DUT1, _MAX_DUT1) if np.any(np.abs(d_xp_rc) > _MAX_PM_RAD): _flog.warning( "EOP anomaly: |Δxp|=%.4e rad exceeds %.4e rad limit — clamping.", float(np.max(np.abs(d_xp_rc))), _MAX_PM_RAD, ) d_xp_rc = np.clip(d_xp_rc, -_MAX_PM_RAD, _MAX_PM_RAD) if np.any(np.abs(d_yp_rc) > _MAX_PM_RAD): _flog.warning( "EOP anomaly: |Δyp|=%.4e rad exceeds %.4e rad limit — clamping.", float(np.max(np.abs(d_yp_rc))), _MAX_PM_RAD, ) d_yp_rc = np.clip(d_yp_rc, -_MAX_PM_RAD, _MAX_PM_RAD) # Earth rotation phase delta — omega_earth already defined above. d_theta = d_dut1 * omega_earth # Apply correcting rotational approximations # R_diff = R_y(d_xp) @ R_x(d_yp) @ R_z(d_theta) cos_t = np.cos(d_theta) sin_t = np.sin(d_theta) # Pre-allocate array for corrected ITRS vectors r_itrs_corrected = np.empty_like(r_itrs) # Vectorized application if r_itrs.ndim == 2: for i in range(len(r_itrs)): x, y, z = r_itrs[i] # R_z Earth rotation adjustment x_rot = x * cos_t[i] + y * sin_t[i] y_rot = -x * sin_t[i] + y * cos_t[i] z_rot = z # R_x, R_y Polar motion tilt adjustment (small-angle approximation) # R_y(dx) @ R_x(dy) = [[1, 0, dx], [0, 1, -dy], [-dx, dy, 1]] dx = d_xp_rc[i] dy = d_yp_rc[i] x_f = x_rot + dx * z_rot y_f = y_rot - dy * z_rot z_f = -dx * x_rot + dy * y_rot + z_rot r_itrs_corrected[i] = [x_f, y_f, z_f] else: x, y, z = r_itrs x_rot = x * cos_t + y * sin_t y_rot = -x * sin_t + y * cos_t z_rot = z dx = d_xp_rc dy = d_yp_rc x_f = x_rot + dx * z_rot y_f = y_rot - dy * z_rot z_f = -dx * x_rot + dy * y_rot + z_rot r_itrs_corrected = np.array([x_f, y_f, z_f]) return r_itrs_corrected # type: ignore[no-any-return]
[docs] @njit(cache=True) def ecef_to_geodetic_wgs84( x: np.ndarray, y: np.ndarray, z: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Convert ECEF Cartesian coordinates (km) to WGS84 Geodetic coordinates. Uses the Bowring (1976) single-pass iterative formula, which achieves ~0.1 mm accuracy at sea level and better at orbital altitudes. COORD-01 Fix: Handles the polar singularity (cos(lat)→0 at |lat|≥80°) by using the z-based altitude formula for high latitudes: alt = |z|/sin(lat) - N*(1-e²) instead of the equatorial formula (alt = p/cos(lat) - N) which produces infinite values when the satellite passes directly over a polar station. Args: x, y, z: 1D arrays of cartesian coordinates in km. Returns: tuple of (latitude_deg, longitude_deg, altitude_km). """ a = 6378.137 # WGS84 Semi-major axis (km) f = 1.0 / 298.257223563 # Flattening b = a * (1.0 - f) e2 = 1.0 - (b**2 / a**2) ep2 = (a**2 - b**2) / b**2 p = np.sqrt(x**2 + y**2) th = np.arctan2(a * z, b * p) lon = np.arctan2(y, x) lat = np.arctan2(z + ep2 * b * np.sin(th) ** 3, p - e2 * a * np.cos(th) ** 3) N = a / np.sqrt(1.0 - e2 * np.sin(lat) ** 2) # COORD-01: Use z-based altitude formula at high latitudes where cos(lat)→0. # Threshold |lat| > 80° (~1.396 rad) matches the standard geodetic convention # used by IERS, Bowring, and Vermeille altitude calculators. lat_abs = np.abs(lat) polar_mask = lat_abs > (80.0 * np.pi / 180.0) # Guard the equatorial branch against cos(lat)→0 at poles. # Numba with fastmath=True evaluates BOTH branches of np.where before # selecting, so p/cos(lat) would produce inf at the poles even though the # polar_mask would select the other branch. An inf intermediate can poison # downstream computations under fastmath associativity relaxation. cos_lat = np.cos(lat) safe_cos = np.where(np.abs(cos_lat) < 1e-10, 1e-10, cos_lat) alt_equatorial = p / safe_cos - N # Polar formula (safe for |lat| > 0, avoids 1/cos(lat) singularity) sin_lat = np.sin(lat) # Guard against |lat|=0 in the polar branch (shouldn't happen when polar_mask is True, # but Numba requires scalar-safe expressions) safe_sin = np.where(np.abs(sin_lat) < 1e-15, 1e-15, sin_lat) alt_polar = np.abs(z) / np.abs(safe_sin) - N * (1.0 - e2) alt = np.where(polar_mask, alt_polar, alt_equatorial) # Radians to degrees lat_deg = lat * (180.0 / np.pi) lon_deg = lon * (180.0 / np.pi) return lat_deg, lon_deg, alt # type: ignore[no-any-return]
[docs] def geodetic_to_ecef_wgs84( lat_deg: np.ndarray, lon_deg: np.ndarray, alt_km: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Convert WGS84 Geodetic coordinates to ECEF Cartesian coordinates. Inverse of :func:`ecef_to_geodetic_wgs84`. Converts latitude, longitude, and altitude to Earth-Centered Earth-Fixed (ECEF) Cartesian coordinates. Uses the WGS84 ellipsoid parameters: a = 6378.137 km (semi-major axis) f = 1/298.257223563 (flattening) Args: lat_deg: Latitude in degrees (WGS84), shape (N,). lon_deg: Longitude in degrees (WGS84), shape (N,). alt_km: Altitude above WGS84 ellipsoid in km, shape (N,). Returns: Tuple of (x_km, y_km, z_km) ECEF coordinates in km. Reference: Borkowski, K. M. (1989). "Accurate algorithms for transforming geocentric to geodetic coordinates". Bulletin Géodésique. """ a = 6378.137 # WGS84 Semi-major axis (km) f = 1.0 / 298.257223563 # Flattening e2 = 2.0 * f - f * f # First eccentricity squared lat = np.radians(lat_deg) lon = np.radians(lon_deg) sin_lat = np.sin(lat) cos_lat = np.cos(lat) sin_lon = np.sin(lon) cos_lon = np.cos(lon) # Radius of curvature in the prime vertical N = a / np.sqrt(1.0 - e2 * sin_lat * sin_lat) # ECEF coordinates x_km = (N + alt_km) * cos_lat * cos_lon y_km = (N + alt_km) * cos_lat * sin_lon z_km = (N * (1.0 - e2) + alt_km) * sin_lat return x_km, y_km, z_km # type: ignore[no-any-return]