# astra/orbit.py
"""ASTRA Core orbit propagation module.
Responsible for SGP4-based orbit propagation. Generates position and velocity
vectors at specified times. Uses `sgp4` for actual propagation and `skyfield`
for coordinate frame conversions. No manual orbital equations allowed.
Supports both legacy ``SatelliteTLE`` objects (via ``Satrec.twoline2rv``)
and modern ``SatelliteOMM`` objects (via ``Satrec.sgp4init``). Use the
``_build_satrec()`` helper which dispatches automatically based on type.
"""
from __future__ import annotations
import numpy as np
from typing import Generator
from sgp4.api import Satrec, WGS84, SatrecArray
from astra.errors import AstraError
from astra.models import (
OrbitalState,
SatelliteTLE,
SatelliteOMM,
SatelliteState,
TrajectoryMap,
VelocityMap,
)
from astra.log import get_logger
logger = get_logger(__name__)
def _split_jd(jd: float | np.ndarray) -> tuple[float | np.ndarray, float | np.ndarray]:
"""Split Julian Date into whole and fractional parts for SGP4 precision.
SGP4 propagation must use the requested catalog/UTC epoch. UT1 belongs in
Earth-rotation transforms such as TEME -> ECEF, not in the propagation
epoch supplied to the SGP4 model.
"""
jd_whole = np.floor(jd)
return jd_whole, jd - jd_whole
# ---------------------------------------------------------------------------
# Internal Satrec Factory
# ---------------------------------------------------------------------------
def _build_satrec(satellite: SatelliteState) -> Satrec:
"""Build a ``Satrec`` SGP4 object from either a TLE or an OMM source.
This is the polymorphic dispatch point for the physics engine.
All propagation functions route through here so the math core
never needs to inspect the format of incoming data directly.
Known Limitations:
SGP4 returns raw TEME states. Apply EOP/UT1 only when converting into
Earth-fixed frames.
Args:
satellite: Either a ``SatelliteTLE`` or a ``SatelliteOMM`` instance.
Returns:
A configured ``Satrec`` object ready for ``sgp4()`` calls.
"""
if isinstance(satellite, SatelliteTLE):
# Legacy path: fast C++ string parsing.
return Satrec.twoline2rv(satellite.line1, satellite.line2)
if isinstance(satellite, SatelliteOMM):
# Modern path: explicit sgp4init with pre-converted radian fields.
satrec = Satrec()
# epoch days since 1949-12-31 00:00 UT (sgp4 epoch reference)
# JD of 1949-12-31 00:00 UT = 2433281.5
epoch_days = satellite.epoch_jd - 2433281.5
satrec.sgp4init(
WGS84,
"i", # afspc_mode
int(satellite.norad_id) if satellite.norad_id.isdigit() else 0,
epoch_days,
satellite.bstar,
satellite.mean_motion_dot,
satellite.mean_motion_ddot,
satellite.eccentricity,
satellite.argpo_rad,
satellite.inclination_rad,
satellite.mo_rad,
satellite.mean_motion_rad_min,
satellite.raan_rad,
)
return satrec
raise AstraError(
f"Unsupported satellite type: {type(satellite).__name__}. "
"Expected SatelliteTLE or SatelliteOMM."
)
[docs]
def propagate_orbit(
satellite: SatelliteState, epoch_jd: float, t_since_minutes: float
) -> OrbitalState:
"""Propagate a single satellite to a single point in time using SGP4.
Data formats: ✓ SatelliteTLE ✓ SatelliteOMM
Args:
satellite: Parsed SatelliteTLE or SatelliteOMM object to propagate.
epoch_jd: Reference epoch as Julian Date (typically satellite.epoch_jd).
t_since_minutes: Minutes elapsed since epoch.
Returns:
OrbitalState containing position (km) and velocity (km/s).
Frame:
TEME (True Equator, Mean Equinox) — raw SGP4 output frame.
Do NOT feed directly into ECEF-expecting functions without conversion.
"""
satrec = _build_satrec(satellite)
t_jd = epoch_jd + (t_since_minutes / 1440.0)
# Verify epoch staleness (SE-J)
from astra.tle import check_tle_staleness
check_tle_staleness(satellite, t_jd)
jd_whole, fraction = _split_jd(t_jd)
error_code, position, velocity = satrec.sgp4(float(jd_whole), float(fraction))
return OrbitalState(
norad_id=satellite.norad_id,
t_jd=t_jd,
position_km=np.array(position, dtype=np.float64),
velocity_km_s=np.array(velocity, dtype=np.float64),
error_code=error_code,
)
[docs]
def propagate_many(
satellites: list[SatelliteState], times_jd: np.ndarray
) -> tuple[TrajectoryMap, "VelocityMap"]:
"""Vectorized batch propagation of multiple satellites across a time array.
Data formats: ✓ SatelliteTLE ✓ SatelliteOMM
Args:
satellites: List of SatelliteState objects to propagate.
times_jd: 1D NumPy array of T absolute Julian Dates. (Shape: (T,), dtype: float64)
Returns:
Tuple ``(traj_map, vel_map)``. Each value is a dict keyed by NORAD ID with
arrays of shape ``(T, 3)`` in TEME: positions in km and velocities in km/s.
Satellites with propagation errors at any timestep store ``nan`` in that row.
**Frame:** TEME (True Equator, Mean Equinox), i.e. raw SGP4 output.
Do not feed positions directly into ECEF-expecting functions without conversion.
**Note:** Unpack as ``traj_map, vel_map = propagate_many(...)``.
"""
results: TrajectoryMap = {}
velocities: dict[str, np.ndarray] = {}
N = len(satellites)
T = len(times_jd)
if N == 0 or T == 0:
return results, velocities
logger.info(
f"Vector-propagating {N} orbits across {T} discrete time steps using SatrecArray..."
)
satrecs = [_build_satrec(sat) for sat in satellites]
satrec_array = SatrecArray(satrecs)
# 0. Verify epoch staleness for all satellites in batch (SE-J)
from astra.tle import check_tle_staleness
for sat in satellites:
check_tle_staleness(sat, times_jd)
# SGP4 uses the requested catalog/UTC epoch. UT1/EOP corrections belong in
# Earth-fixed frame conversion, not in the TEME propagation epoch.
jd_array, jd_fraction_array = _split_jd(times_jd)
# 2. Vectorized SGP4 call
e, r, v = satrec_array.sgp4(jd_array, jd_fraction_array)
r[e > 0] = np.nan
v[e > 0] = np.nan
for i, sat in enumerate(satellites):
results[sat.norad_id] = r[i]
velocities[sat.norad_id] = v[i]
return results, velocities
[docs]
def propagate_many_generator(
satellites: list[SatelliteState], times_jd: np.ndarray, chunk_size: int = 1440
) -> Generator[tuple[np.ndarray, TrajectoryMap, "VelocityMap"], None, None]:
"""Memory-efficient batch propagation yielding time-chunked results.
Data formats: ✓ SatelliteTLE ✓ SatelliteOMM
Prevents Out-Of-Memory (OOM) fatal kills during massive all-vs-all STM queries
by yielding rolling spatial windows.
"""
N = len(satellites)
T = len(times_jd)
if N == 0 or T == 0:
return
satrecs = [_build_satrec(sat) for sat in satellites]
satrec_array = SatrecArray(satrecs)
# Verify epoch staleness for all satellites ONCE over the full time bounds.
# and obliterating batch throughput for long multi-month ephemeris queries.
from astra.tle import check_tle_staleness
for sat in satellites:
check_tle_staleness(sat, times_jd)
for start_idx in range(0, T, chunk_size):
end_idx = min(start_idx + chunk_size, T)
jd_chunk = times_jd[start_idx:end_idx]
jd_whole_chunk, frac_chunk = _split_jd(jd_chunk)
e, r, v = satrec_array.sgp4(jd_whole_chunk, frac_chunk)
r[e > 0] = np.nan
v[e > 0] = np.nan
chunk_map = {sat.norad_id: r[i] for i, sat in enumerate(satellites)}
vel_map = {sat.norad_id: v[i] for i, sat in enumerate(satellites)}
yield jd_chunk, chunk_map, vel_map
[docs]
def propagate_trajectory(
satellite: SatelliteState,
t_start_jd: float,
t_end_jd: float,
step_minutes: float = 5.0,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Propagate a single satellite over a defined time window at a fixed step.
Data formats: ✓ SatelliteTLE ✓ SatelliteOMM
Args:
satellite: Source SatelliteTLE or SatelliteOMM to propagate.
t_start_jd: Window start as Julian Date.
t_end_jd: Window end as Julian Date (must be strictly > t_start_jd).
step_minutes: Step size in minutes (must be positive; default 5.0).
Returns:
Tuple of (time_array, position_array, velocity_array):
time_array: shape (T,), Julian Dates for each step.
position_array: shape (T, 3), TEME positions in km.
velocity_array: shape (T, 3), TEME velocities in km/s.
Frame:
TEME (True Equator, Mean Equinox) — raw SGP4 output frame.
Do NOT feed directly into ECEF-expecting functions without conversion.
Raises:
ValueError: If t_end_jd <= t_start_jd or step_minutes <= 0.
"""
if t_end_jd <= t_start_jd:
raise ValueError(
f"t_end_jd ({t_end_jd}) must be strictly greater than t_start_jd ({t_start_jd}). "
"Check argument order."
)
if step_minutes <= 0:
raise ValueError(f"step_minutes must be positive, got {step_minutes}.")
start_offset_mins = (t_start_jd - satellite.epoch_jd) * 1440.0
end_offset_mins = (t_end_jd - satellite.epoch_jd) * 1440.0
# Adding a tiny epsilon to end_offset_mins to ensure inclusive bound if it aligns exactly
time_steps = np.arange(start_offset_mins, end_offset_mins + 1e-9, step_minutes)
times_jd = satellite.epoch_jd + (time_steps / 1440.0)
trajectory_map, vel_map = propagate_many([satellite], times_jd)
positions = trajectory_map[satellite.norad_id]
velocities = vel_map[satellite.norad_id]
return times_jd, positions, velocities
[docs]
def ground_track(
positions_teme: np.ndarray, times_jd: np.ndarray
) -> list[tuple[float, float, float]]:
"""Convert TEME Cartesian positions into geodetic coordinates for ground track.
Args:
positions_teme: TEME position array from propagation. Shape: (T, 3)
times_jd: Corresponding Julian Date array. Shape: (T,)
Returns:
List of (latitude_deg, longitude_deg, altitude_km) tuples, length T.
Frame:
Input must be in TEME (True Equator, Mean Equinox) frame — the native
SGP4 output. This function internally converts TEME → GCRS → WGS84.
"""
if len(times_jd) == 0:
return []
from astra.frames import teme_to_ecef, ecef_to_geodetic_wgs84
r_itrs_km = teme_to_ecef(positions_teme, times_jd, use_spacebook_eop=True)
x, y, z = r_itrs_km.T
lat_deg, lon_deg, alt_km = ecef_to_geodetic_wgs84(x, y, z)
if np.isscalar(lat_deg) or getattr(lat_deg, "ndim", 0) == 0:
return [(float(lat_deg), float(lon_deg), float(alt_km))]
return list(zip(lat_deg.tolist(), lon_deg.tolist(), alt_km.tolist()))