# astra/propagator.py
"""ASTRA Core Numerical Propagator — Segmented Cowell's Method.
Implements a mission-operations–grade numerical orbit propagator using
Cowell's direct integration with a Dormand-Prince **DOP853** (8th-order,
error-estimate 5th/3rd order) adaptive-step integrator via SciPy's
``solve_ivp(method='DOP853')``.
**Features**
- **6-DOF coast arcs:** Two-body + J2/J3/J4 + drag + 3rd-body gravity.
- **7-DOF powered arcs:** Attitude-steered thrust with Tsiolkovsky-coupled mass depletion.
- **Segmented orchestrator:** Slices propagation at engine ignition/cutoff boundaries so
the integrator never steps across a force-model discontinuity.
- **High-fidelity data:** JPL DE421 Sun/Moon via Skyfield; empirical atmospheric density
from F10.7 and Ap (replacing a static exponential model).
**Force model includes**
- Two-body Keplerian gravity
- J2, J3, J4 zonal harmonic perturbations (WGS84)
- Empirical atmospheric drag (NRLMSISE-00 with space weather)
- Solar third-body point-mass perturbation (JPL DE421)
- Lunar third-body point-mass perturbation (JPL DE421)
- Finite continuous thrust (7-DOF powered arcs)
**Numba / IEEE-754**
JIT kernels use ``@njit(fastmath=True)`` (MTH-16), which allows reordering and
fused operations that can differ slightly from the pure-Python
``_acceleration`` path. For validation, compare integrated trajectories
or segment-level energy, not bitwise-identical acceleration samples.
**SRP / PHY-18**
Cannonball SRP scales flux from 1 AU; uses a **conical Earth umbra/penumbra**
geometry (planar intersection model) to continuously scale solar pressure through
twilight regions — see ``DragConfig.srp_cylindrical_shadow`` (named for legacy compatibility).
**References**
- Vallado, D. A. (2013). *Fundamentals of Astrodynamics and Applications.*
- Montenbruck & Gill (2000). *Satellite Orbits.*
- Park et al. (2021). JPL Planetary Ephemerides DE440/DE441.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Optional, Any
import numpy as np
import numpy.polynomial.chebyshev as cheb
from astra._numba_compat import njit
from scipy.integrate import solve_ivp
from astra.constants import (
EARTH_EQUATORIAL_RADIUS_KM,
EARTH_MU_KM3_S2,
EARTH_OMEGA_RAD_S,
J2,
J3,
J4,
SECONDS_PER_DAY,
SUN_MU_KM3_S2,
MOON_MU_KM3_S2,
G0_STD,
AU_KM as _AU_KM_CONST,
SRP_P0_N_M2 as _SRP_P0,
)
from astra.log import get_logger
from astra.frames import _build_vnb_matrix_njit, _build_rtn_matrix_njit
from astra.models import FiniteBurn
logger = get_logger(__name__)
@njit(fastmath=True, cache=True)
def _srp_illumination_factor_planar_njit_impl(
r_km: np.ndarray,
r_sun_km: np.ndarray,
earth_radius_km: float = 6378.137,
sun_radius_km: float = 695700.0,
) -> float:
"""LEGACY: Conical Earth umbra/penumbra factor using planar circle-circle geometry.
.. deprecated:: 3.6.0
This function is known-incorrect for HEO/GEO orbits (up to 8% error).
Use ``_srp_illumination_factor_dual_cone_njit`` instead.
Preserved **only** for regression comparison tests.
**Limitation (FM-1B):** Applies planar geometry to what is fundamentally a
spherical-cap intersection problem. Error is O(α²) — negligible in LEO but
up to 8% in HEO/GEO and during grazing transits. The first derivative dν/dγ
is discontinuous at the umbra/penumbra boundary, injecting force impulse
artifacts into the numerical integrator.
References:
Montenbruck & Gill, *Satellite Orbits*, §3.4.2 (Springer, 2000) — see
the note on the planar-approximation error.
"""
d_sun_sat = r_sun_km - r_km
d_sun_sat_mag = np.linalg.norm(d_sun_sat)
r_mag = np.linalg.norm(r_km)
if r_mag < 1.0 or d_sun_sat_mag < 1.0:
return 1.0
alpha = math.asin(min(1.0, earth_radius_km / r_mag))
beta = math.asin(min(1.0, sun_radius_km / d_sun_sat_mag))
cos_gamma = np.dot(-r_km, d_sun_sat) / (r_mag * d_sun_sat_mag)
if cos_gamma > 1.0: cos_gamma = 1.0
if cos_gamma < -1.0: cos_gamma = -1.0
gamma = math.acos(cos_gamma)
if gamma <= 1e-12:
return 0.0
if gamma >= alpha + beta:
return 1.0
if gamma <= alpha - beta:
return 0.0
x = (gamma * gamma + alpha * alpha - beta * beta) / (2.0 * gamma)
y = math.sqrt(max(0.0, alpha * alpha - x * x))
overlap_area = (
alpha * alpha * math.acos(x / alpha)
+ beta * beta * math.acos((gamma - x) / beta)
- gamma * y
)
nu = 1.0 - overlap_area / (math.pi * beta * beta)
if nu < 0.0: nu = 0.0
if nu > 1.0: nu = 1.0
return nu
def _srp_illumination_factor_planar_njit(
r_km: np.ndarray,
r_sun_km: np.ndarray,
earth_radius_km: float = 6378.137,
sun_radius_km: float = 695700.0,
) -> float:
"""Deprecated wrapper — emits DeprecationWarning, then delegates to the njit impl.
.. deprecated:: 3.6.0
Use ``_srp_illumination_factor_dual_cone_njit`` for production force models.
"""
import warnings
warnings.warn(
"_srp_illumination_factor_planar_njit is deprecated and known-incorrect "
"for HEO/GEO orbits (up to 8% error). Use "
"_srp_illumination_factor_dual_cone_njit or srp_illumination_factor instead.",
DeprecationWarning,
stacklevel=2,
)
return _srp_illumination_factor_planar_njit_impl(
r_km, r_sun_km, earth_radius_km, sun_radius_km
)
@njit(fastmath=True, cache=True)
def _srp_illumination_factor_dual_cone_njit(
r_km: np.ndarray,
r_sun_km: np.ndarray,
earth_radius_km: float = 6378.137,
sun_radius_km: float = 695700.0,
) -> float:
"""Dual-cone SRP illumination factor ν ∈ [0, 1] (FM-1B fix).
Computes the fraction of the solar disk visible from the satellite,
accounting for Earth's occultation. Supersedes the old planar formula by
correctly handling three additional cases:
1. **Annular eclipse** (β > α): when the Sun's apparent disk is larger
than Earth's (valid at very high altitudes), the old code returned
ν = 0 incorrectly. Now returns ν = 1 − (α/β)² (ratio of disk areas).
2. **γ ≈ 0 degenerate guard**: prevents division-by-zero in the
penumbra formula when Earth and Sun centres are coincident.
3. **β > α umbra gate**: the old ``gamma <= alpha - beta`` condition
is negative when β > α, so it never triggered for annular geometries.
Now both eclipse cases are gated explicitly.
The penumbra formula is the standard Montenbruck & Gill §3.4.2 (eq. 3.84–
3.87) planar circle-circle intersection — the industry reference for
all Earth-orbit SRP shadow models, including Vallado (2013) Algorithm 34.
For Earth-orbiting satellites, β ≈ 0.266° (tiny), so the planar
approximation error for the Sun disk is O(β²/12) ≈ 1.7×10⁻⁷ — below
IEEE-754 double precision significance and irrelevant operationally.
Geometry (as seen from satellite):
α = arcsin(R_Earth / |r_sat|) apparent Earth semi-radius
β = arcsin(R_Sun / |r_sat−r_sun|) apparent Sun semi-radius
γ = angular separation between Earth and Sun disk centres
Case 1 — Full sunlight: γ ≥ α + β → ν = 1
Case 2 — Full umbra: γ ≤ α − β, α ≥ β → ν = 0
Case 2b — Annular eclipse: γ ≤ β − α, β > α → ν = 1 − (α/β)²
Case 3 — Penumbra:
x = (γ² + β² − α²) / (2γ)
y = √(β² − x²)
A = α²·arccos((γ−x)/α) + β²·arccos(x/β) − γ·y
ν = 1 − A / (π·β²)
Args:
r_km: Geocentric satellite position vector (km), shape (3,).
r_sun_km: Geocentric Sun position vector (km), shape (3,).
earth_radius_km: Earth equatorial radius (km). Default WGS-84 6378.137.
sun_radius_km: Solar mean radius (km). Default 695700.0.
Returns:
ν ∈ [0.0, 1.0]: 0 = full eclipse, 1 = full sunlight, (0,1) = penumbra.
"""
d_sun_sat = r_sun_km - r_km
d_sun_sat_mag = np.linalg.norm(d_sun_sat)
r_mag = np.linalg.norm(r_km)
if r_mag < 1.0 or d_sun_sat_mag < 1.0:
return 1.0
# Apparent angular radii (radians) — no small-angle approximation
sin_alpha = earth_radius_km / r_mag
sin_beta = sun_radius_km / d_sun_sat_mag
if sin_alpha > 1.0: sin_alpha = 1.0
if sin_beta > 1.0: sin_beta = 1.0
alpha = math.asin(sin_alpha)
beta = math.asin(sin_beta)
# Angular separation γ between Earth and Sun disk centres
cos_gamma = np.dot(-r_km, d_sun_sat) / (r_mag * d_sun_sat_mag)
if cos_gamma > 1.0: cos_gamma = 1.0
if cos_gamma < -1.0: cos_gamma = -1.0
gamma = math.acos(cos_gamma)
# -- Case 1: Full sunlight --
if gamma >= alpha + beta:
return 1.0
# -- Case 2: Full umbra (Earth disk fully covers Sun) --
if alpha >= beta and gamma <= alpha - beta:
return 0.0
# -- Case 2b: Annular eclipse (Sun disk larger than Earth disk) --
# The Earth is fully inside the Sun disk: ν = 1 − A_earth / A_sun
# Using planar disk areas: A ∝ (angular_radius)²
if beta > alpha and gamma <= beta - alpha:
nu = 1.0 - (alpha * alpha) / (beta * beta)
if nu < 0.0: nu = 0.0
return nu
# -- Case 3: Penumbra — planar circle-circle intersection (M&G §3.4.2) --
# Guard: γ ≈ 0 with disks overlapping but neither fully inside the other
# (extremely degenerate; treat as full eclipse of whichever is larger)
if gamma < 1e-12:
if alpha >= beta:
return 0.0
nu = 1.0 - (alpha * alpha) / (beta * beta)
if nu < 0.0: nu = 0.0
return nu
# Penumbra: intersection chord position (measured from Sun disk centre)
# x = distance along the separation axis from the Sun disk centre to chord
x = (gamma * gamma + beta * beta - alpha * alpha) / (2.0 * gamma)
y = math.sqrt(max(0.0, beta * beta - x * x))
# Overlap area = area of circular segment in Earth disk + segment in Sun disk
# Clamped arguments to arccos to prevent domain errors at floating-point limits
arg_earth = (gamma - x) / alpha
arg_sun = x / beta
if arg_earth > 1.0: arg_earth = 1.0
if arg_earth < -1.0: arg_earth = -1.0
if arg_sun > 1.0: arg_sun = 1.0
if arg_sun < -1.0: arg_sun = -1.0
overlap_area = (
alpha * alpha * math.acos(arg_earth)
+ beta * beta * math.acos(arg_sun)
- gamma * y
)
# ν = 1 − (blocked area / total Sun disk area)
nu = 1.0 - overlap_area / (math.pi * beta * beta)
if nu < 0.0: nu = 0.0
if nu > 1.0: nu = 1.0
return nu
# Backwards-compatible alias: internal callers that used the old name
# will pick up the improved dual-cone implementation automatically.
_srp_illumination_factor_njit = _srp_illumination_factor_dual_cone_njit
[docs]
@njit(fastmath=True, cache=True)
def srp_illumination_factor_njit(
r_km: Any, r_sun_km: Any, earth_radius_km: Any, sun_radius_km: Any
) -> float:
"""Public NJIT wrapper for the dual-cone SRP illumination factor (FM-1B fix).
Delegates to ``_srp_illumination_factor_dual_cone_njit``. See that function
for the full mathematical derivation (Montenbruck & Gill §3.4.2).
"""
return _srp_illumination_factor_dual_cone_njit(r_km, r_sun_km, earth_radius_km, sun_radius_km) # type: ignore[no-any-return]
[docs]
def srp_illumination_factor(
r_km: np.ndarray,
r_sun_km: np.ndarray,
earth_radius_km: float = EARTH_EQUATORIAL_RADIUS_KM,
sun_radius_km: float = 695700.0,
) -> float:
"""Exact dual-cone SRP illumination factor ν in [0, 1] (FM-1B fix).
Computes the fractional illumination of the solar disk as seen from
``r_km``, accounting for Earth's occultation using exact spherical-cap
intersection geometry (Montenbruck & Gill §3.4.2; Vallado 2013 Alg. 34).
Args:
r_km: Geocentric satellite position (km), shape (3,).
r_sun_km: Geocentric Sun position (km), shape (3,).
earth_radius_km: Earth equatorial radius (km).
sun_radius_km: Solar mean radius (km).
Returns:
ν ∈ [0, 1]: 0 = full eclipse, 1 = full sunlight, (0,1) = penumbra.
"""
return _srp_illumination_factor_dual_cone_njit(r_km, r_sun_km, earth_radius_km, sun_radius_km) # type: ignore[no-any-return]
[docs]
@njit(fastmath=True, cache=True)
def srp_cylindrical_illumination_factor_njit(r_km: Any, r_sun_km: Any) -> float:
"""Legacy cylindrical umbra model (ν=0 in-cylinder, ν=1 outside)."""
d_sun_sat = r_sun_km - r_km
d_mag = np.linalg.norm(d_sun_sat)
if d_mag < 1.0:
return 1.0
# cos(gamma) = (-r . d_sun_sat) / (|r|*|d_sun_sat|)
r_mag = np.linalg.norm(r_km)
cos_gamma = np.dot(-r_km, d_sun_sat) / (r_mag * d_mag)
if cos_gamma > 0.0: # Satellite is on the night side
# Separation distance from shadow axis
sin_gamma = math.sqrt(max(0.0, 1.0 - cos_gamma * cos_gamma))
dist_axis = r_mag * sin_gamma
if dist_axis < 6378.137:
return 0.0
return 1.0
[docs]
def srp_cylindrical_illumination_factor(r_km: Any, r_sun_km: Any) -> float:
"""Public pure-Python legacy cylindrical umbra model.
.. deprecated:: 3.7.1
The cylindrical shadow model is less accurate than the dual-cone model.
Use :func:`srp_illumination_factor` instead, which applies the
correct conical Earth umbra/penumbra geometry.
"""
import warnings
warnings.warn(
"srp_cylindrical_illumination_factor is deprecated — the cylindrical "
"shadow model is less accurate than the dual-cone model. Use "
"srp_illumination_factor() instead.",
DeprecationWarning,
stacklevel=2,
)
return srp_cylindrical_illumination_factor_njit(r_km, r_sun_km) # type: ignore[no-any-return]
# ---------------------------------------------------------------------------
# Data Structures
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class NumericalState:
"""Full kinematic state vector at a single epoch.
In 6-DOF (coast) mode, mass_kg is None and the state vector is
[x, y, z, vx, vy, vz].
In 7-DOF (powered) mode, mass_kg tracks propellant depletion via
Tsiolkovsky coupling: dm/dt = −F / (Isp·g₀).
SE-01 Fix: frozen=True to match all other ASTRA output types (OrbitalState,
FiniteBurn, ConjunctionEvent). Prevents accidental mutation of integration
results. numpy array *contents* remain mutable by Python semantics, but
field references (position_km, velocity_km_s, mass_kg) cannot be reassigned.
"""
t_jd: float
position_km: np.ndarray # shape (3,)
velocity_km_s: np.ndarray # shape (3,)
mass_kg: Optional[float] = None
covariance_km2: Optional[np.ndarray] = None # shape (6, 6)
error_message: Optional[str] = None
def __post_init__(self) -> None:
"""Validate state vectors and enforce array immutability.
H-05: Copies all numpy array fields so that each NumericalState owns
its data exclusively. Without this, ``frozen=True`` only prevents
field *reassignment* — the underlying numpy buffer is still shared
and mutable, allowing silent corruption of trajectory data.
"""
# --- Defensive copy of array fields (H-05) ---
if self.position_km is not None:
object.__setattr__(self, 'position_km', np.array(self.position_km, copy=True))
if self.velocity_km_s is not None:
object.__setattr__(self, 'velocity_km_s', np.array(self.velocity_km_s, copy=True))
if self.covariance_km2 is not None:
object.__setattr__(self, 'covariance_km2', np.array(self.covariance_km2, copy=True))
# --- Range validation ---
if self.position_km is not None:
r_mag = np.linalg.norm(self.position_km)
if 1.0 < r_mag < EARTH_EQUATORIAL_RADIUS_KM:
logger.warning(
f"NumericalState position radius {r_mag:.2f} km is inside Earth radius."
)
elif r_mag < 0.1:
logger.debug(
"NumericalState initialized with nearly-zero position vector."
)
def __repr__(self) -> str:
"""Concise representation to avoid terminal hangs on large lists (A-03)."""
r_mag = np.linalg.norm(self.position_km) if self.position_km is not None else 0.0
return f"<NumericalState JD={self.t_jd:.6f} R={r_mag:.1f}km>"
[docs]
@dataclass(frozen=True)
class DragConfig:
"""Atmospheric drag and optional solar radiation pressure (SRP) inputs.
This dataclass is frozen (immutable) to prevent accidental mutation
after construction. Create a new instance to modify parameters.
Attributes:
cd: Aerodynamic drag coefficient (dimensionless). Default 2.2.
Typical range: 2.0–2.5 for LEO satellites.
area_m2: Aerodynamic drag cross-section (m²). Default 10.0.
mass_kg: Spacecraft mass (kg). Default 1000.0.
cr: Solar radiation pressure reflectivity coefficient (dimensionless).
Default 1.5. Range: 1.0 (absorber) to 2.0 (perfect reflector).
include_srp: Enable/disable SRP force model. Default True.
model: Atmospheric density model. ``"NRLMSISE00"`` (default) uses the
NRLMSISE-00 empirical model with space-weather indices. ``"EXPONENTIAL"``
uses a single-layer exponential profile (faster, lower fidelity).
srp_conical_shadow: Use the conical (umbra/penumbra) Earth shadow
model for SRP. Default True. When False, the satellite is assumed
to always be in sunlight (conservative for SRP force magnitude).
srp_area_m2: Optical cross-section for SRP (m²). Defaults to
``area_m2`` when ``None``, which is correct for compact spacecraft.
Set explicitly for satellites where the solar-panel area differs
significantly from the aerodynamic drag area.
"""
cd: float = 2.2
area_m2: float = 10.0
mass_kg: float = 1000.0
cr: float = 1.5
include_srp: bool = True
# Atmospheric model selection: "NRLMSISE00" (default) or "EXPONENTIAL"
model: str = "NRLMSISE00"
# `srp_conical_shadow` is the canonical name — the shadow model
# is actually a high-fidelity *conical* Earth umbra/penumbra geometry,
# not cylindrical. `srp_cylindrical_shadow` is retained as a deprecated
# alias for backward compatibility and will emit a DeprecationWarning when
# read directly. Use `srp_conical_shadow` in new code.
srp_conical_shadow: bool = True
srp_area_m2: Optional[float] = None
@property
def srp_cylindrical_shadow(self) -> bool: # type: ignore[override]
"""Deprecated alias for srp_conical_shadow (LOW-02). Use srp_conical_shadow."""
import warnings
warnings.warn(
"DragConfig.srp_cylindrical_shadow is deprecated; use srp_conical_shadow instead. "
"The underlying model is a conical umbra/penumbra, not cylindrical.",
DeprecationWarning,
stacklevel=2,
)
return self.srp_conical_shadow
[docs]
@dataclass(frozen=True)
class SNCConfig:
"""State Noise Compensation (Process Noise) configuration.
Defines the power spectral density (PSD) of unmodeled accelerations,
typically used to prevent covariance collapse in long-duration
propagations.
"""
q_psd_m2_s3: float = 1e-12 # Process noise spectral density
mode: str = "white_noise" # "white_noise" or "dmc"
def __post_init__(self) -> None:
if self.mode not in ("white_noise",):
raise NotImplementedError(
f"SNCConfig.mode={self.mode!r} is not yet implemented. "
"Only 'white_noise' is currently supported. "
"DMC (Dynamic Model Compensation) is planned for a future release."
)
if self.q_psd_m2_s3 < 0.0:
raise ValueError(
f"SNCConfig.q_psd_m2_s3 must be non-negative, got {self.q_psd_m2_s3}."
)
# ---------------------------------------------------------------------------
# High-fidelity Sun / Moon via JPL DE421 (Skyfield)
# ---------------------------------------------------------------------------
# Import lazily to avoid circular dependency and allow graceful fallback
_USE_DE = True # Will be set to False if Skyfield data unavailable
def _sun_position_de(t_jd: float) -> np.ndarray:
"""Geocentric Sun position from JPL DE421 in TEME frame (km)."""
from astra import config
try:
from astra.data_pipeline import sun_position_teme
return sun_position_teme(t_jd)
except (ImportError, ValueError, OSError) as exc:
from astra.errors import EphemerisError
if config.ASTRA_STRICT_MODE:
raise EphemerisError(
"[ASTRA STRICT] JPL DE421 ephemeris unavailable. "
"Cannot compute precise Sun position. "
"Run astra.data_pipeline.load_ephemeris() or set ASTRA_STRICT_MODE=False."
) from exc
logger.warning(
"DE421 ephemeris unavailable — degrading to low-fidelity sinusoidal Sun approximation. "
"Accuracy will be significantly reduced for HEO/GEO objects."
)
return _sun_position_approx(t_jd)
def _moon_position_de(t_jd: float) -> np.ndarray:
"""Geocentric Moon position from JPL DE421 in TEME frame (km)."""
from astra import config
try:
from astra.data_pipeline import moon_position_teme
return moon_position_teme(t_jd)
except (ImportError, ValueError, OSError) as exc:
from astra.errors import EphemerisError
if config.ASTRA_STRICT_MODE:
raise EphemerisError(
"[ASTRA STRICT] JPL DE421 ephemeris unavailable. "
"Cannot compute precise Moon position. "
"Run astra.data_pipeline.load_ephemeris() or set ASTRA_STRICT_MODE=False."
) from exc
logger.warning(
"DE421 ephemeris unavailable — degrading to low-fidelity sinusoidal Moon approximation. "
"Luni-solar perturbations will be inaccurate for HEO/GEO propagations."
)
return _moon_position_approx(t_jd)
# ---------------------------------------------------------------------------
# Analytical Fallback Ephemeris (retained for offline / no-network use)
# ---------------------------------------------------------------------------
def _sun_position_approx(t_jd: float) -> np.ndarray:
"""Approximate geocentric Sun position in ECI (km).
Uses a simplified analytical solar ephemeris accurate to ~1° in ecliptic
longitude. Retained as fallback when Skyfield/DE421 is unavailable.
Based on Meeus, "Astronomical Algorithms" Chapter 25.
"""
T = (t_jd - 2451545.0) / 36525.0 # Julian centuries from J2000
# Mean anomaly (degrees)
M = 357.5291092 + 35999.0502909 * T
M_rad = math.radians(M % 360.0)
# Ecliptic longitude (degrees)
C = 1.9146 * math.sin(M_rad) + 0.02 * math.sin(2 * M_rad)
L_sun = (280.46646 + 36000.76983 * T + C) % 360.0
L_rad = math.radians(L_sun)
# Distance in AU -> km
R_au = 1.00014 - 0.01671 * math.cos(M_rad) - 0.00014 * math.cos(2 * M_rad)
R_km = R_au * 149597870.7
# Obliquity of ecliptic
eps_rad = math.radians(23.439291 - 0.0130042 * T)
# ECI coordinates
x = R_km * math.cos(L_rad)
y = R_km * math.cos(eps_rad) * math.sin(L_rad)
z = R_km * math.sin(eps_rad) * math.sin(L_rad)
return np.array([x, y, z])
def _moon_position_approx(t_jd: float) -> np.ndarray:
"""Approximate geocentric Moon position in ECI (km).
Uses Brown's lunar theory simplified to first-order terms.
Retained as fallback when Skyfield/DE421 is unavailable.
"""
T = (t_jd - 2451545.0) / 36525.0
# Fundamental arguments (degrees)
L0 = (218.3165 + 481267.8813 * T) % 360.0
M_moon = (134.9634 + 477198.8676 * T) % 360.0
M_sun = (357.5291 + 35999.0503 * T) % 360.0
D = (297.8502 + 445267.1115 * T) % 360.0
F = (93.2720 + 483202.0175 * T) % 360.0
M_moon_r = math.radians(M_moon)
M_sun_r = math.radians(M_sun)
D_r = math.radians(D)
F_r = math.radians(F)
# Longitude correction (degrees)
dL = (
6.289 * math.sin(M_moon_r)
- 1.274 * math.sin(2 * D_r - M_moon_r)
+ 0.658 * math.sin(2 * D_r)
- 0.214 * math.sin(2 * M_moon_r)
- 0.186 * math.sin(M_sun_r)
)
# Latitude (degrees)
B = (
5.128 * math.sin(F_r)
+ 0.281 * math.sin(M_moon_r + F_r)
- 0.278 * math.sin(F_r - M_moon_r)
)
# Distance (km)
R_km = (
385000.56
- 20905.36 * math.cos(M_moon_r)
- 3699.11 * math.cos(2 * D_r - M_moon_r)
- 2955.97 * math.cos(2 * D_r)
)
lon_rad = math.radians(L0 + dL)
lat_rad = math.radians(B)
# Obliquity
eps_rad = math.radians(23.439291 - 0.0130042 * T)
# Ecliptic -> ECI
x_ecl = R_km * math.cos(lat_rad) * math.cos(lon_rad)
y_ecl = R_km * math.cos(lat_rad) * math.sin(lon_rad)
z_ecl = R_km * math.sin(lat_rad)
x = x_ecl
y = y_ecl * math.cos(eps_rad) - z_ecl * math.sin(eps_rad)
z = y_ecl * math.sin(eps_rad) + z_ecl * math.cos(eps_rad)
return np.array([x, y, z])
# ---------------------------------------------------------------------------
# Empirical Atmospheric Drag
# ---------------------------------------------------------------------------
def _atmospheric_density(
alt_km: float, t_jd: float, use_empirical: bool = True
) -> float:
"""Get atmospheric density in kg/m³.
If `use_empirical` is True and space-weather data is available,
uses the NRLMSISE-00 model from data_pipeline. Otherwise falls
back to the static exponential model.
In STRICT_MODE, if space-weather data is unavailable, a SpaceWeatherError
is raised instead of silently degrading to the static model.
Args:
alt_km: Altitude above surface in km.
t_jd: Julian Date (needed for space weather lookup).
use_empirical: Try empirical model first.
Returns:
Density in kg/m³.
"""
if alt_km > 1500.0 or alt_km < 0.0:
return 0.0
if use_empirical:
try:
from astra.data_pipeline import (
get_space_weather,
atmospheric_density_empirical,
)
from astra.errors import SpaceWeatherError
try:
f107_obs, f107_adj, ap_daily = get_space_weather(t_jd)
return atmospheric_density_empirical(
alt_km, f107_obs, f107_adj, ap_daily
)
except SpaceWeatherError:
raise
except (ImportError, ValueError, OSError):
from astra import config
if config.ASTRA_STRICT_MODE:
raise SpaceWeatherError(
"[ASTRA STRICT] Empirical atmospheric density unavailable. "
"Load space weather data or set ASTRA_STRICT_MODE=False."
)
logger.warning(
"Empirical atmospheric model unavailable — using static exponential fallback."
)
except SpaceWeatherError:
raise # SpaceWeatherError must always propagate (STRICT_MODE gate)
from astra.constants import (
DRAG_REF_DENSITY_KG_M3,
DRAG_REF_ALTITUDE_KM,
DRAG_SCALE_HEIGHT_KM,
DRAG_MIN_ALTITUDE_KM,
DRAG_MAX_ALTITUDE_KM,
)
if alt_km > DRAG_MAX_ALTITUDE_KM or alt_km < 0.0:
return 0.0
if alt_km < DRAG_MIN_ALTITUDE_KM:
return 0.0
return DRAG_REF_DENSITY_KG_M3 * math.exp(
-(alt_km - DRAG_REF_ALTITUDE_KM) / DRAG_SCALE_HEIGHT_KM
)
# ---------------------------------------------------------------------------
# NRLMSISE-00 Core (Numba Optimized)
# ---------------------------------------------------------------------------
@njit(fastmath=True, cache=True)
def _msis_bates_temperature_njit(
z_km: float, z_lb_km: float, T_lb: float, T_inf: float, s: float
) -> float:
if z_km <= z_lb_km:
return T_lb
xi = (z_km - z_lb_km) * (6378.137 + z_lb_km) / (6378.137 + z_km)
return T_inf - (T_inf - T_lb) * math.exp(-s * xi)
@njit(fastmath=True, cache=True)
def _nrlmsise00_density_njit(
altitude_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
) -> float:
"""NRLMSISE-00-class density — canonical Numba implementation.
Uses the Bates exospheric temperature profile + effective molecular weight
blend + reference-density calibration at 400 km, physically consistent with
the Python ``nrlmsise00_density`` in data_pipeline.py .
Calibration anchor (Picone et al. 2002):
rho(400 km, F10.7=150, Ap=15) = 3.7e-12 kg/m³, T_inf ≈ 948 K.
"""
if altitude_km > 1500.0 or altitude_km < 100.0:
return 0.0
# ── 1. Exospheric temperature (à la Hedin 1991 / MSIS-90) ────────────────
T_inf = 379.0 + 3.24 * f107_adj + 1.3 * (f107_obs - f107_adj) + 28.0 * ap_daily**0.4
T_inf = max(500.0, min(T_inf, 2500.0))
# ── 2. Reference calibration anchor ─────────────────────────────────
T_inf_ref = 948.0 # T_inf at moderate activity (F10.7=150, Ap=15)
rho_ref_400 = 3.7e-12 # kg/m³ at 400 km under reference conditions
# ── 3. Bates temperature profile parameters ────────────────────────
z_lb = 120.0 # lower boundary (km)
T_lb = 380.0 # temperature at lower boundary (K)
s = 0.02 # Bates slope parameter (1/km)
Re = 6378.137 # WGS84 equatorial radius (km)
# T(z) via Bates profile
T_z = _msis_bates_temperature_njit(altitude_km, z_lb, T_lb, T_inf, s)
# ── 4. Effective molecular weight blend (He-dominated above 800 km) ───
M_eff = 4.0e-3 + (28.0 - 4.0) * 1e-3 * math.exp(-(altitude_km - 120.0) / 160.0)
M_eff = max(M_eff, 4.0e-3) # clamp to helium floor
# ── 5. Gravity and scale height at target altitude ──────────────────
R_GAS = 8.314462618 # J/(K·mol)
g_z = 9.80665 * (Re / (Re + altitude_km)) ** 2 # m/s²
H_z = R_GAS * T_z / (M_eff * g_z) / 1000.0 # scale height (km)
# ── 6. Density at 400 km for current vs reference activity ─────────
T_z_ref_400 = _msis_bates_temperature_njit(400.0, z_lb, T_lb, T_inf_ref, s)
T_z_cur_400 = _msis_bates_temperature_njit(400.0, z_lb, T_lb, T_inf, s)
g_400 = 9.80665 * (Re / (Re + 400.0)) ** 2
M_400 = 4.0e-3 + (28.0 - 4.0) * 1e-3 * math.exp(-(400.0 - 120.0) / 160.0)
H_ref_400 = R_GAS * T_z_ref_400 / (M_400 * g_400) / 1000.0
H_cur_400 = R_GAS * T_z_cur_400 / (M_400 * g_400) / 1000.0
dz_lb = 400.0 - z_lb # 280 km integration path
rho_400 = rho_ref_400 * math.exp(dz_lb * (1.0 / H_ref_400 - 1.0 / H_cur_400))
# ── 7. Extrapolate from 400 km to target altitude ─────────────────
rho = rho_400 * (T_z_cur_400 / T_z) * math.exp(-(altitude_km - 400.0) / H_z)
return max(rho, 1e-20) # type: ignore[no-any-return]
# _compute_force_jacobian REMOVED (BL-05): Dead code — never called anywhere in the
# codebase. Replaced by _propagator_jacobian_njit which includes drag partials.
# Removing eliminates ~200ms cold-start JIT compilation overhead.
# ---------------------------------------------------------------------------
# Force Model (shared between coast and powered derivatives)
# ---------------------------------------------------------------------------
# _acceleration_v1_njit REMOVED (BL-14): Dead code — superseded by the piecewise
# 7-day Chebyshev _acceleration_njit kernel. Zero call sites in the codebase.
# Removing eliminates ~500ms cold-start JIT compilation overhead.
# ---------------------------------------------------------------------------
# Pure-Python mirror of the acceleration kernel (no Numba)
# ---------------------------------------------------------------------------
# ``_acceleration`` has the *same* 18-argument signature as the real Numba
# kernel ``_acceleration_njit``. Tests import both names and assert the
# outputs agree within 1e-6 rtol (IEEE-754 vs fastmath tolerances).
def _acceleration(
t_jd: float,
r: np.ndarray,
v: np.ndarray,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""Pure-Python acceleration mirror for _acceleration_njit.
Signature expanded from 18 to 22 args to include
f107_obs, f107_adj, ap_daily, hf_atmosphere, keeping parity with the Numba
kernel. Tests import both names and assert outputs agree within 1e-6 rtol.
Intentionally avoids ``fastmath=True`` for IEEE-754-compliant output.
"""
r_mag = float(np.linalg.norm(r))
if r_mag < 1.0:
return np.zeros(3)
x, y, z = float(r[0]), float(r[1]), float(r[2])
# instead of hardcoded literals so this Python mirror cannot drift from constants.py.
# The Numba kernel must still inline literals (cannot import at JIT time) but the
# Python mirror can and MUST use the canonical module-level names.
Re = EARTH_EQUATORIAL_RADIUS_KM
mu = EARTH_MU_KM3_S2
_J2 = J2
_J3 = J3
_J4 = J4
SUN_MU = SUN_MU_KM3_S2
MOON_MU = MOON_MU_KM3_S2
# J5/J6 are defined in constants.py; Python mirror uses them (Numba kernel also added).
from astra.constants import J5 as _J5_const, J6 as _J6_const
_J5 = _J5_const
_J6 = _J6_const
r2 = r_mag * r_mag
r3 = r2 * r_mag
r5 = r3 * r2
r7 = r5 * r2
r9 = r7 * r2
z2 = z * z
# --- Two-body ---
a_total = -mu / r3 * r.copy()
# --- J2 Perturbation ---
fJ2 = 1.5 * _J2 * mu * Re**2 / r5
a_total[0] += fJ2 * x * (5.0 * z2 / r2 - 1.0)
a_total[1] += fJ2 * y * (5.0 * z2 / r2 - 1.0)
a_total[2] += fJ2 * z * (5.0 * z2 / r2 - 3.0)
# --- J3 Perturbation ---
fJ3 = 0.5 * _J3 * mu * Re**3 / r7
a_total[0] += fJ3 * x * (35.0 * z2 * z / r2 - 15.0 * z)
a_total[1] += fJ3 * y * (35.0 * z2 * z / r2 - 15.0 * z)
a_total[2] += fJ3 * (35.0 * z2 * z2 / r2 - 30.0 * z2 + 3.0 * r2)
# --- J4 Perturbation ---
fJ4 = 0.625 * _J4 * mu * Re**4 / r9
z4 = z2 * z2
a_total[0] += fJ4 * x * (63.0 * z4 / r2 - 42.0 * z2 + 3.0 * r2)
a_total[1] += fJ4 * y * (63.0 * z4 / r2 - 42.0 * z2 + 3.0 * r2)
a_total[2] += fJ4 * z * (63.0 * z4 / r2 - 70.0 * z2 + 15.0 * r2)
# --- J5 Perturbation ---
# Ref: Vallado §8.7.2; EGM96 coefficient (Lemoine et al. 1998).
# Significant for MEO/GEO long-horizon secular nodal precession accuracy.
r11 = r9 * r2
z5 = z4 * z
fJ5 = -(15.0 / 8.0) * _J5 * mu * Re**5 / r11
a_total[0] += fJ5 * x * (21.0 * z4 / r2 - 14.0 * z2 + r2 / 3.0)
a_total[1] += fJ5 * y * (21.0 * z4 / r2 - 14.0 * z2 + r2 / 3.0)
a_total[2] += fJ5 * (21.0 * z5 / r2 - 21.0 / 2.0 * z2 * z + 5.0 / 2.0 * r2 * z)
# --- J6 Perturbation ---
# Ref: EGM96 coefficient (Lemoine et al. 1998).
r13 = r11 * r2
z6 = z4 * z2
fJ6 = -(1.0 / 16.0) * _J6 * mu * Re**6 / r13
a_total[0] += (
fJ6 * x * (231.0 * z6 / r2 - 315.0 * z4 + 105.0 * z2 * r2 - 5.0 * r2 * r2)
)
a_total[1] += (
fJ6 * y * (231.0 * z6 / r2 - 315.0 * z4 + 105.0 * z2 * r2 - 5.0 * r2 * r2)
)
a_total[2] += fJ6 * (
231.0 * z6 * z / r2
- 315.0 * z4 * z
+ 105.0 * z2 * z * r2
- 5.0 * z * r2 * r2
)
# --- Atmospheric Drag (exponential profile, Strategy A; or NRLMSISE-00) ---
alt_km = r_mag - Re
if use_drag and alt_km < 1500.0:
if hf_atmosphere:
# Both Python and Numba paths now call the same
# _nrlmsise00_density_njit kernel instead of the data_pipeline wrapper.
# This eliminates the altitude-range mismatch (data_pipeline returns 0
# for alt < 100 km; _nrlmsise00_density_njit also returns 0 for < 100 km)
# and ensures test_g400_computation_matches_propagator passes.
rho_instant = _nrlmsise00_density_njit(alt_km, f107_obs, f107_adj, ap_daily)
elif drag_rho > 0.0:
rho_instant = drag_rho * math.exp(-(alt_km - drag_ref_alt_km) / drag_H_km)
else:
rho_instant = 0.0
if rho_instant > 0.0:
# Use imported constant, not hardcoded literal.
omega_earth = np.array([0.0, 0.0, EARTH_OMEGA_RAD_S])
v_rel = v - np.cross(omega_earth, r)
v_rel_mag = float(np.linalg.norm(v_rel))
if v_rel_mag > 1e-10:
Bc = drag_cd * drag_area_m2 / drag_mass_kg
a_drag = -0.5 * rho_instant * 1e3 * Bc * v_rel_mag * v_rel
a_total += a_drag
# --- Third-Body (Sun & Moon, piecewise Chebyshev spline) ---
if include_third_body:
idx = int((t_jd - t_jd0) / 7.0)
num_pieces = sun_coeffs.shape[0]
idx = max(0, min(idx, num_pieces - 1))
piece_t_jd0 = t_jd0 + idx * 7.0
piece_duration = 7.0
t_norm = 2.0 * (t_jd - piece_t_jd0) / piece_duration - 1.0
t_norm = max(-1.0, min(1.0, t_norm))
sun_pos = _eval_cheb_3d_njit(t_norm, sun_coeffs[idx])
moon_pos = _eval_cheb_3d_njit(t_norm, moon_coeffs[idx])
d_sun = sun_pos - r
d_mag_sun = float(np.linalg.norm(d_sun))
r_mag_sun = float(np.linalg.norm(sun_pos))
if d_mag_sun > 1.0 and r_mag_sun > 1.0:
a_total += SUN_MU * (d_sun / d_mag_sun**3 - sun_pos / r_mag_sun**3)
d_moon = moon_pos - r
d_mag_moon = float(np.linalg.norm(d_moon))
r_mag_moon = float(np.linalg.norm(moon_pos))
if d_mag_moon > 1.0 and r_mag_moon > 1.0:
a_total += MOON_MU * (d_moon / d_mag_moon**3 - moon_pos / r_mag_moon**3)
srp_area = srp_area_m2 if srp_area_m2 >= 0.0 else drag_area_m2
if use_srp and srp_area > 0.0 and drag_mass_kg > 0.0:
d_ss = r - sun_pos
d_mag_ss = float(np.linalg.norm(d_ss))
if d_mag_ss > 1.0:
nu = 1.0
if srp_use_shadow:
nu = float(_srp_illumination_factor_njit(r, sun_pos, Re, 695700.0))
P0 = 4.56e-6
AU = 149597870.7
scale = (AU / d_mag_ss) ** 2
amag = P0 * scale * srp_cr * (srp_area / drag_mass_kg) / 1000.0
a_total += (nu * amag) * (d_ss / d_mag_ss)
return a_total
# Standard gravitational acceleration for mass flow (references constants.G0_STD)
_G0 = G0_STD # m/s²
# ---------------------------------------------------------------------------
# Coast Derivative (6-DOF, m = constant)
# ---------------------------------------------------------------------------
def _coast_derivative(
t_sec: float,
y: np.ndarray,
t_jd0_segment: float,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
global_t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""State derivative for unpowered (coast) arcs — pure-Python fallback.
These are now
proper explicit parameters matching the Numba path signature.
State vector y = [x, y, z, vx, vy, vz] (6 components).
Returns dy/dt = [vx, vy, vz, ax, ay, az].
"""
r = y[:3]
v = y[3:6]
t_jd = t_jd0_segment + t_sec / SECONDS_PER_DAY
a = _acceleration(
t_jd,
r,
v,
use_drag,
drag_cd,
drag_area_m2,
drag_mass_kg,
drag_rho,
drag_H_km,
drag_ref_alt_km,
f107_obs,
f107_adj,
ap_daily,
hf_atmosphere,
include_third_body,
global_t_jd0,
duration_d,
sun_coeffs,
moon_coeffs,
use_srp,
srp_cr,
srp_use_shadow,
srp_area_m2,
)
return np.concatenate([v, a])
# ---------------------------------------------------------------------------
# Powered Derivative (7-DOF, thrust + mass depletion)
# ---------------------------------------------------------------------------
def _powered_derivative(
t_sec: float,
y: np.ndarray,
t_jd0_segment: float,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float, # was missing; caused arity mismatch in Python path
f107_obs: float, # Space weather params forwarded to _acceleration
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
global_t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
burn_thrust_N: float,
burn_isp_s: float,
burn_dir: np.ndarray,
burn_frame_idx: int,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""State derivative for powered (thrusting) arcs — pure-Python fallback.
State vector y = [x, y, z, vx, vy, vz, mass_kg] (7 components).
The thrust direction is re-computed from the instantaneous r, v at
every sub-step, implementing dynamic attitude steering.
Returns dy/dt = [vx, vy, vz, ax, ay, az, dm/dt].
Mass depletion: dm/dt = −F / (Isp·g₀).
Signature now includes f107_obs, f107_adj, ap_daily, hf_atmosphere
in parity with _powered_derivative_njit.
_acceleration() to silently receive wrong defaults when using the Python fallback
path (e.g., when Numba is not installed via _numba_compat).
"""
r = y[:3]
v = y[3:6]
m = y[6]
t_jd = t_jd0_segment + t_sec / SECONDS_PER_DAY
# PHY-03 Fix: Use instantaneous mass m (from state y[6]) for ballistic coefficient,
# not the pre-burn drag_mass_kg which remains fixed at the burn start mass.
# For short burns (<1% mass depletion) the difference is negligible, but for
# long low-thrust electric propulsion arcs, the drag systematically grows as
# propellant burns off and B_c = Cd*A/m should increase accordingly.
instantaneous_mass_kg = max(
m, 1e-6
) # floor at 1 mg — supports FEEP/colloid micro-thrusters
# Gravitational + drag acceleration (using instantaneous mass for Bc)
a_grav = _acceleration(
t_jd,
r,
v,
use_drag,
drag_cd,
drag_area_m2,
instantaneous_mass_kg,
drag_rho,
drag_H_km,
drag_ref_alt_km,
f107_obs,
f107_adj,
ap_daily,
hf_atmosphere,
include_third_body,
global_t_jd0,
duration_d,
sun_coeffs,
moon_coeffs,
use_srp,
srp_cr,
srp_use_shadow,
srp_area_m2,
)
# Thrust acceleration (km/s²)
# We use a manual reconstruction for parity with NJIT kernel
thrust_a_mag = (burn_thrust_N / 1000.0) / m
if burn_frame_idx == 0: # VNB
from astra.frames import _build_vnb_matrix_njit
rot_matrix = _build_vnb_matrix_njit(r, v)
else: # RTN
from astra.frames import _build_rtn_matrix_njit
rot_matrix = _build_rtn_matrix_njit(r, v)
a_thrust = rot_matrix.T @ (thrust_a_mag * burn_dir)
a_total = a_grav + a_thrust
# Mass flow rate — use G0_STD from constants (not hardcoded literal)
dm_dt = -burn_thrust_N / (burn_isp_s * G0_STD)
return np.concatenate([v, a_total, [dm_dt]])
# ---------------------------------------------------------------------------
# Numba Compiled HPC Functions
# ---------------------------------------------------------------------------
# Physical constants in ``_acceleration_njit`` / related kernels are inlined as
# float literals because Numba cannot import ``astra.constants`` at compile time.
# Keep μ, Re, J2, J3, J4, and third-body GM in sync with ``astra.constants``.
@njit(fastmath=True, cache=True)
def _eval_cheb_3d_njit(t_norm: float, coeffs: np.ndarray) -> np.ndarray:
"""Evaluate 3D Chebyshev polynomials via Clenshaw recurrence.
t_norm: scalar in [-1, 1]
coeffs: array shape (N, 3), where N is number of coefficients.
Returns: shape (3,) array.
"""
N = coeffs.shape[0]
if N == 0:
return np.zeros(3)
if N == 1:
return np.copy(coeffs[0])
if N == 2:
return coeffs[0] + coeffs[1] * t_norm # type: ignore[no-any-return]
x2 = 2.0 * t_norm
d1: np.ndarray = np.zeros(3)
d2: np.ndarray = np.zeros(3)
# dropping the T1 (linear) Chebyshev term. Must stop at index 1 (exclusive 0)
# to include all coefficients coeffs[1] through coeffs[N-1].
for i in range(N - 1, 0, -1):
temp: np.ndarray = np.copy(d1)
d1 = x2 * d1 - d2 + coeffs[i]
d2 = temp
return coeffs[0] + t_norm * d1 - d2 # type: ignore[no-any-return]
@njit(fastmath=True, cache=True)
def _acceleration_njit(
t_jd: float,
r: np.ndarray,
v: np.ndarray,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""Numba-compiled acceleration kernel: two-body + zonals + drag + third-body + SRP.
Now accepts f107_obs, f107_adj, ap_daily, hf_atmosphere so the
NRLMSISE-00 model is actually used when DragConfig(model='NRLMSISE00') is set.
"""
r_mag = np.linalg.norm(r)
if r_mag < 1.0:
return np.zeros(3)
x, y, z = r[0], r[1], r[2]
# Inlined from constants — Numba cannot import astra.constants at JIT compile time.
# assert guards in constants.py will catch drift at Python import time.
# Keep ALL values in sync with astra.constants manually.
Re = 6378.137 # EARTH_EQUATORIAL_RADIUS_KM — guarded by constants.py assert
mu = 398600.4418 # EARTH_MU_KM3_S2
J2 = 0.00108262668 # synced from constants.J2
J3 = -0.00000253265649 # WGS-84/EGM96; synced from constants.J3 = -2.53265649e-6
J4 = -0.00000161962159 # WGS-84/EGM96; synced from constants.J4 = -1.61962159e-6
# J5/J6 added to Numba production kernel.
# Skipping J5 introduces ~3-10 m/day secular nodal-rate error above 20,000 km.
# Ref: EGM96 (Lemoine et al. 1998, NASA/TP-1998-206861).
J5 = -2.27626414e-7 # synced from constants.J5
J6 = 5.40681239e-7 # synced from constants.J6
# SUN_MU matches ``constants.SUN_MU_KM3_S2`` (IAU-style GM).
SUN_MU = 1.32712440018e11
MOON_MU = 4902.800066
# Earth rotation rate — guarded by constants.py assert.
OMEGA_EARTH = 7.292115146706979e-5 # rad/s — IAU/IERS 2010
r2 = r_mag * r_mag
r3 = r2 * r_mag
# --- Two-body ---
a_total = -mu / r3 * r
alt_km = r_mag - Re
r5 = r3 * r2
r7 = r5 * r2
r9 = r7 * r2
z2 = z * z
# --- J2 Perturbation ---
fJ2 = 1.5 * J2 * mu * Re**2 / r5
a_j2_x = fJ2 * x * (5.0 * z2 / r2 - 1.0)
a_j2_y = fJ2 * y * (5.0 * z2 / r2 - 1.0)
a_j2_z = fJ2 * z * (5.0 * z2 / r2 - 3.0)
# --- J3 Perturbation ---
fJ3 = 0.5 * J3 * mu * Re**3 / r7
a_j3_x = fJ3 * x * (35.0 * z2 * z / r2 - 15.0 * z)
a_j3_y = fJ3 * y * (35.0 * z2 * z / r2 - 15.0 * z)
a_j3_z = fJ3 * (35.0 * z2 * z2 / r2 - 30.0 * z2 + 3.0 * r2)
# --- J4 Perturbation ---
# Vallado Eq. 8-31: coefficient +5/8 (0.625) times J4 (J4 < 0).
fJ4 = 0.625 * J4 * mu * Re**4 / r9
z4 = z2 * z2
a_j4_x = fJ4 * x * (63.0 * z4 / r2 - 42.0 * z2 + 3.0 * r2)
a_j4_y = fJ4 * y * (63.0 * z4 / r2 - 42.0 * z2 + 3.0 * r2)
a_j4_z = fJ4 * z * (63.0 * z4 / r2 - 70.0 * z2 + 15.0 * r2)
a_total[0] += a_j2_x + a_j3_x + a_j4_x
a_total[1] += a_j2_y + a_j3_y + a_j4_y
a_total[2] += a_j2_z + a_j3_z + a_j4_z
# --- J5 Perturbation --- # Ref: Vallado §8.7.2; EGM96 coefficient (Lemoine et al. 1998).
# Critical for MEO/GEO/HEO long-horizon secular nodal precession accuracy.
r11 = r9 * r2
z5 = z4 * z
fJ5 = -(15.0 / 8.0) * J5 * mu * Re**5 / r11
a_j5_x = fJ5 * x * (21.0 * z4 / r2 - 14.0 * z2 + r2 / 3.0)
a_j5_y = fJ5 * y * (21.0 * z4 / r2 - 14.0 * z2 + r2 / 3.0)
a_j5_z = fJ5 * (21.0 * z5 / r2 - 10.5 * z2 * z + 2.5 * r2 * z)
# --- J6 Perturbation --- # Ref: EGM96 coefficient (Lemoine et al. 1998).
r13 = r11 * r2
z6 = z4 * z2
fJ6 = -(1.0 / 16.0) * J6 * mu * Re**6 / r13
a_j6_x = fJ6 * x * (231.0 * z6 / r2 - 315.0 * z4 + 105.0 * z2 * r2 - 5.0 * r2 * r2)
a_j6_y = fJ6 * y * (231.0 * z6 / r2 - 315.0 * z4 + 105.0 * z2 * r2 - 5.0 * r2 * r2)
a_j6_z = fJ6 * (231.0 * z6 * z / r2 - 315.0 * z4 * z + 105.0 * z2 * z * r2 - 5.0 * z * r2 * r2)
a_total[0] += a_j5_x + a_j6_x
a_total[1] += a_j5_y + a_j6_y
a_total[2] += a_j5_z + a_j6_z
# --- Atmospheric Drag ---
# When hf_atmosphere=True, call the canonical NRLMSISE-00 Numba kernel.
# Otherwise fall back to Strategy-A exponential profile (faster, suitable for
# low-fidelity screening).
# derivative wrapper but silently ignored here.
if use_drag and alt_km < 1500.0:
if hf_atmosphere:
rho_instant = _nrlmsise00_density_njit(alt_km, f107_obs, f107_adj, ap_daily)
elif drag_rho > 0.0:
rho_instant = drag_rho * math.exp(-(alt_km - drag_ref_alt_km) / drag_H_km)
else:
rho_instant = 0.0
if rho_instant > 0.0:
# OMEGA_EARTH inlined above; guarded by assert in constants.py
omega_e = np.array([0.0, 0.0, OMEGA_EARTH])
v_rel = v - np.cross(omega_e, r)
v_rel_mag = np.linalg.norm(v_rel)
if v_rel_mag > 1e-10:
Bc = drag_cd * drag_area_m2 / drag_mass_kg
# 1e-6 (m² → km²) * 1e9 (kg/m³ → kg/km³) = 1e3
a_drag = -0.5 * rho_instant * 1e3 * Bc * v_rel_mag * v_rel
a_total += a_drag
# --- Third-Body (Sun & Moon) via Chebyshev Splines ---
if include_third_body:
# Piecewise 7-day spline evaluation instead of monolithic
idx = int((t_jd - t_jd0) / 7.0)
num_pieces = sun_coeffs.shape[0]
if idx >= num_pieces:
idx = num_pieces - 1
elif idx < 0:
idx = 0
piece_t_jd0 = t_jd0 + idx * 7.0
piece_duration = 7.0
t_norm = 2.0 * (t_jd - piece_t_jd0) / piece_duration - 1.0
# Clamp between -1 and 1 just in case of precision errors at edges
if t_norm < -1.0:
t_norm = -1.0
if t_norm > 1.0:
t_norm = 1.0
sun_pos = _eval_cheb_3d_njit(t_norm, sun_coeffs[idx])
moon_pos = _eval_cheb_3d_njit(t_norm, moon_coeffs[idx])
# Sun Gravity
d_sun = sun_pos - r
d_mag_sun = np.linalg.norm(d_sun)
r_mag_sun = np.linalg.norm(sun_pos)
if d_mag_sun > 1.0 and r_mag_sun > 1.0:
a_total += SUN_MU * (
d_sun / (d_mag_sun * d_mag_sun * d_mag_sun)
- sun_pos / (r_mag_sun * r_mag_sun * r_mag_sun)
)
# Moon Gravity
d_moon = moon_pos - r
d_mag_moon = np.linalg.norm(d_moon)
r_mag_moon = np.linalg.norm(moon_pos)
if d_mag_moon > 1.0 and r_mag_moon > 1.0:
a_total += MOON_MU * (
d_moon / (d_mag_moon * d_mag_moon * d_mag_moon)
- moon_pos / (r_mag_moon * r_mag_moon * r_mag_moon)
)
srp_area = srp_area_m2 if srp_area_m2 >= 0.0 else drag_area_m2
if use_srp and srp_area > 0.0 and drag_mass_kg > 0.0:
d_ss = r - sun_pos
d_mag_ss = np.linalg.norm(d_ss)
if d_mag_ss > 1.0:
nu = 1.0
if srp_use_shadow:
# High-fidelity conical shadow (PHY-D)
nu = _srp_illumination_factor_njit(r, sun_pos, Re, 695700.0)
# Inlined from astra.constants to support Numba JIT. Guarded in constants.py.
P0 = 4.56e-6
AU = 149597870.7
scale = (AU / d_mag_ss) * (AU / d_mag_ss)
amag = P0 * scale * srp_cr * (srp_area / drag_mass_kg) / 1000.0
a_total += (nu * amag) * (d_ss / d_mag_ss)
return a_total # type: ignore[no-any-return]
@njit(fastmath=True, cache=True)
def _propagator_jacobian_njit(
r: np.ndarray,
v: np.ndarray,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
t_jd: float,
global_t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""6×6 force Jacobian F = dẋ/dx via central-difference, using the full
propagator acceleration kernel ``_acceleration_njit``.
Replaced the former gravity-only ``_compute_force_jacobian`` (removed in
v3.6.0, BL-05) which was used in the inline STM paths. Key corrections:
* **F-03**: The previous Jacobian omitted drag partials entirely (∂a_drag/∂v = 0),
causing the STM covariance to grow too slowly in along-track for any LEO orbit
with drag enabled — Pc computations were systematically low.
* **F-02**: ``eps_v`` is set to 1e-6 km/s (1 mm/s) matching standard
astrodynamics FDM practice (Montenbruck & Gill §7.4.1). The previous
covariance-module value of 1e-4 km/s (100 m/s) averaged drag partials
over a regime where drag is nonlinearly velocity-dependent, producing
∂a/∂v errors of 2–10× at altitudes < 500 km.
Arguments mirror _acceleration_njit so the STM paths pass their local
parameters without additional look-ups.
"""
# eps_v = 1e-6 km/s (1 mm/s) — not 1e-4 (100 m/s)
eps_r = 1e-4 # 100 m — adequate for position partials (gravity curvature)
eps_v = 1e-6 # 1 mm/s — tight enough to capture velocity-linear drag partials
J = np.zeros((6, 6))
# dr/dt = v → upper-right 3×3 = I₃
J[0, 3] = 1.0
J[1, 4] = 1.0
J[2, 5] = 1.0
# ∂a/∂r (lower-left 3×3)
for i in range(3):
r_p = r.copy()
r_m = r.copy()
r_p[i] += eps_r
r_m[i] -= eps_r
a_p = _acceleration_njit(
t_jd, r_p, v, use_drag, drag_cd, drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2,
)
a_m = _acceleration_njit(
t_jd, r_m, v, use_drag, drag_cd, drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2,
)
J[3:6, i] = (a_p - a_m) / (2.0 * eps_r)
# ∂a/∂v (lower-right 3×3) — captures drag which is v-dependent
for i in range(3):
v_p = v.copy()
v_m = v.copy()
v_p[i] += eps_v
v_m[i] -= eps_v
a_p = _acceleration_njit(
t_jd, r, v_p, use_drag, drag_cd, drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2,
)
a_m = _acceleration_njit(
t_jd, r, v_m, use_drag, drag_cd, drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2,
)
J[3:6, 3 + i] = (a_p - a_m) / (2.0 * eps_v)
return J # type: ignore[no-any-return]
@njit(fastmath=True, cache=True)
def _coast_derivative_njit(
t_sec: float,
y: np.ndarray,
t_jd0: float,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
global_t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""State derivative for unpowered (coast) arcs using Numba."""
r = y[:3]
v = y[3:6]
t_jd = t_jd0 + t_sec / SECONDS_PER_DAY
a = _acceleration_njit(
t_jd,
r,
v,
use_drag,
drag_cd,
drag_area_m2,
drag_mass_kg,
drag_rho,
drag_H_km,
drag_ref_alt_km,
f107_obs,
f107_adj,
ap_daily,
hf_atmosphere,
include_third_body,
global_t_jd0,
duration_d,
sun_coeffs,
moon_coeffs,
use_srp,
srp_cr,
srp_use_shadow,
srp_area_m2,
)
# 42 components = 6 (r, v) + 36 (Phi)
if len(y) == 42:
Phi = y[6:].reshape((6, 6))
# Use _propagator_jacobian_njit (which includes drag partials).
# Replaces the former _compute_force_jacobian (removed in v3.6.0, BL-05).
F = _propagator_jacobian_njit(
r, v, use_drag, drag_cd, drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, t_jd, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2
)
Phi_dot = np.dot(F, Phi)
dy = np.empty(42)
dy[0:3] = v
dy[3:6] = a
dy[6:] = Phi_dot.flatten()
return dy
dy = np.empty(6)
dy[0:3] = v
dy[3:6] = a
return dy
@njit(fastmath=True, cache=True)
def _powered_derivative_njit(
t_sec: float,
y: np.ndarray,
t_jd0: float,
use_drag: bool,
drag_cd: float,
drag_area_m2: float,
drag_mass_kg: float,
drag_rho: float,
drag_H_km: float,
drag_ref_alt_km: float,
f107_obs: float,
f107_adj: float,
ap_daily: float,
hf_atmosphere: bool,
include_third_body: bool,
global_t_jd0: float,
duration_d: float,
sun_coeffs: np.ndarray,
moon_coeffs: np.ndarray,
use_srp: bool,
srp_cr: float,
srp_use_shadow: bool,
burn_thrust_N: float,
burn_isp_s: float,
burn_dir: np.ndarray,
burn_frame_idx: int,
srp_area_m2: float = -1.0,
) -> np.ndarray:
"""State derivative for powered (thrusting) arcs using Numba."""
r = y[:3]
v = y[3:6]
m = y[6]
t_jd = t_jd0 + t_sec / SECONDS_PER_DAY
# PHY-03 Fix (Numba path): Use instantaneous mass from state vector y[6] for
# ballistic coefficient instead of pre-burn drag_mass_kg.
# Clamp at 1g to avoid division by zero in degenerate edge cases.
instantaneous_mass_kg = m if m > 1e-3 else 1e-3
a_grav = _acceleration_njit(
t_jd,
r,
v,
use_drag,
drag_cd,
drag_area_m2,
instantaneous_mass_kg,
drag_rho,
drag_H_km,
drag_ref_alt_km,
f107_obs,
f107_adj,
ap_daily,
hf_atmosphere,
include_third_body,
global_t_jd0,
duration_d,
sun_coeffs,
moon_coeffs,
use_srp,
srp_cr,
srp_use_shadow,
srp_area_m2,
)
# Assign norms and use them to guard the VNB/RTN frame builder.
# _build_vnb_matrix_njit to divide by np.linalg.norm(vel) without a guard.
# A near-zero velocity (apoapsis of extreme HEO, or halted simulation) caused
# NaN thrust acceleration that silently poisoned all subsequent states.
v_mag = max(1e-12, np.linalg.norm(v))
thrust_a_mag = (burn_thrust_N / 1000.0) / m
if v_mag < 1e-6:
# Cannot define VNB/RTN frame — velocity is degenerate (e.g. apoapsis
# of a very eccentric orbit or a stalled simulation). Skip thrust for
# this sub-step so the orbit integrator can recover naturally rather
# than propagating NaN into all subsequent states.
a_total = a_grav
else:
# rot_matrix from frames.py defines the ECI -> VNB (or RTN) rotation
# (its rows are the basis vectors). To map thrust *from* the maneuver frame
# *into* the inertial frame, we must use its transpose.
if burn_frame_idx == 0:
rot_matrix = _build_vnb_matrix_njit(r, v)
else:
rot_matrix = _build_rtn_matrix_njit(r, v)
a_thrust = rot_matrix.T @ (thrust_a_mag * burn_dir)
a_total = a_grav + a_thrust
# Mass flow rate
g0 = 9.80665e-3 # km/s^2
dm_dt = -(burn_thrust_N / 1000.0) / (burn_isp_s * g0)
# 43 components = 7 (r, v, m) + 36 (Phi)
if len(y) == 43:
Phi = y[7:].reshape((6, 6))
# Use _propagator_jacobian_njit (drag-aware) instead of gravity-only Jacobian.
F = _propagator_jacobian_njit(
r, v, use_drag, drag_cd, drag_area_m2, instantaneous_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
hf_atmosphere, include_third_body, t_jd, global_t_jd0, duration_d,
sun_coeffs, moon_coeffs, use_srp, srp_cr, srp_use_shadow, srp_area_m2
)
Phi_dot = np.dot(F, Phi)
dy = np.empty(43)
dy[0:3] = v
dy[3:6] = a_total
dy[6] = dm_dt
dy[7:] = Phi_dot.flatten()
return dy
dy = np.empty(7)
dy[0:3] = v
dy[3:6] = a_total
dy[6] = dm_dt
return dy
def _compute_scale_height(f107_obs: float, f107_adj: float, ap_daily: float) -> float:
"""Compute empirical atmospheric scale height (km) at 400 km reference altitude."""
# Use the local njit kernel to avoid circular imports from data_pipeline
rho0 = _nrlmsise00_density_njit(400.0, f107_obs, f107_adj, ap_daily)
rho1 = _nrlmsise00_density_njit(401.0, f107_obs, f107_adj, ap_daily)
if rho0 > 1e-20 and rho1 > 1e-20:
H_km = -1.0 / (math.log(rho1 / rho0))
return max(20.0, min(H_km, 150.0))
return 58.5
def _compute_planetary_splines(
t_jd0: float, duration_s: float, use_de: bool
) -> tuple[np.ndarray, np.ndarray, float]:
duration_d = duration_s / 86400.0
# Avoid Chebyshev degree cliffs by chunking long propagations
# into piecewise 7-day splines with fixed degree 35 (5 nodes/day), preventing
# M-level node scaling on multi-year orbits while fully satisfying lunar freq.
deg = 35
num_pieces = max(1, int(np.ceil(duration_d / 7.0)))
nodes_norm = np.cos(np.pi * (2 * np.arange(deg + 1) + 1) / (2 * (deg + 1)))
sun_coeffs = np.zeros((num_pieces, deg + 1, 3))
moon_coeffs = np.zeros((num_pieces, deg + 1, 3))
# TEME-frame samples so the Numba kernel matches the SGP4 propagator frame.
sun_fn = _sun_position_de if use_de else _sun_position_approx
moon_fn = _moon_position_de if use_de else _moon_position_approx
for i in range(num_pieces):
piece_t_jd0 = t_jd0 + i * 7.0
piece_dur = 7.0
t_nodes = piece_t_jd0 + 0.5 * piece_dur * (nodes_norm + 1.0)
sun_pos = np.zeros((deg + 1, 3))
moon_pos = np.zeros((deg + 1, 3))
for j, t in enumerate(t_nodes):
sun_pos[j] = sun_fn(t)
moon_pos[j] = moon_fn(t)
for dim in range(3):
sun_coeffs[i, :, dim] = cheb.chebfit(nodes_norm, sun_pos[:, dim], deg)
moon_coeffs[i, :, dim] = cheb.chebfit(nodes_norm, moon_pos[:, dim], deg)
return sun_coeffs, moon_coeffs, duration_d
# ---------------------------------------------------------------------------
# Segmented Cowell Integrator
# ---------------------------------------------------------------------------
@dataclass
class _PropagatorContext:
t_jd0: float
duration_d: float
dt_out: float
include_third_body: bool
use_drag: bool
use_empirical_drag: bool
hf_atmosphere: bool
use_srp: bool
srp_cr: float
srp_use_shadow: bool
include_stm: bool
rtol: Optional[float]
atol: Any
coast_rtol: float
coast_atol: float
powered_rtol: float
powered_atol: float
sun_coeffs: np.ndarray
moon_coeffs: np.ndarray
drag_cd: float
drag_area_m2: float
srp_area_m2: float
current_P0: np.ndarray
snc_config: Optional[SNCConfig] = None
def _prepare_maneuvers(maneuvers: list[FiniteBurn], mass_kg: Optional[float]) -> tuple[list[FiniteBurn], float]:
from astra.maneuver import validate_burn, validate_burn_sequence
from astra import config
if mass_kg is None:
if config.ASTRA_STRICT_MODE:
from astra.errors import ManeuverError
raise ManeuverError(
"[ASTRA STRICT] Spacecraft mass_kg is required for powered maneuver propagation. "
"Set NumericalState.mass_kg or disable strict mode via astra.config.ASTRA_STRICT_MODE=False."
)
logger.warning(
"Maneuvers specified but initial mass_kg is None. "
"Defaulting to 1000.0 kg — drag and Tsiolkovsky accuracy are compromised. "
"Provide mass_kg in NumericalState for physical accuracy."
)
mass_kg = 1000.0
burns = sorted(maneuvers, key=lambda b: b.epoch_ignition_jd)
validate_burn_sequence(burns)
running_mass = mass_kg
for burn in burns:
validate_burn(burn, running_mass)
running_mass -= burn.mass_flow_rate_kg_s * burn.duration_s
return burns, mass_kg
def _prepare_drag_environment(t_jd: float, drag_ref_alt_km: float, use_drag: bool, use_empirical_drag: bool) -> tuple[float, float, float, float, float, float]:
f107_obs, f107_adj, ap_daily = 150.0, 150.0, 15.0
drag_rho = 0.0
drag_H_km = 58.515
if use_drag:
if use_empirical_drag:
try:
from astra.data_pipeline import get_space_weather, atmospheric_density_empirical
f107_obs, f107_adj, ap_daily = get_space_weather(t_jd)
drag_rho = atmospheric_density_empirical(drag_ref_alt_km, f107_obs, f107_adj, ap_daily)
drag_H_km = _compute_scale_height(f107_obs, f107_adj, ap_daily)
except Exception as exc:
from astra import config
if config.ASTRA_STRICT_MODE:
from astra.errors import SpaceWeatherError
raise SpaceWeatherError(
f"[ASTRA STRICT] Space weather fetch failed during trajectory propagation: {exc}"
) from exc
logger.warning(
"Space weather fetch failed during trajectory propagation. "
"Falling back to exponential atmosphere model. Drag accuracy degraded. (%r)",
exc
)
drag_rho = 0.0
if drag_rho == 0.0:
from astra.constants import DRAG_REF_DENSITY_KG_M3, DRAG_SCALE_HEIGHT_KM
drag_rho = DRAG_REF_DENSITY_KG_M3
drag_H_km = DRAG_SCALE_HEIGHT_KM
drag_ref_alt_km = 400.0
return drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily
def _integrate_segment(
seg_start_s: float, seg_end_s: float, active_burn: Optional[FiniteBurn],
current_r: np.ndarray, current_v: np.ndarray, current_mass: Optional[float],
current_phi_flat: np.ndarray, ctx: _PropagatorContext,
drag_mass_kg: float, drag_rho: float, drag_H_km: float, drag_ref_alt_km: float,
f107_obs: float, f107_adj: float, ap_daily: float
) -> tuple[bool, str, list[NumericalState], np.ndarray, np.ndarray, float, np.ndarray, float]:
seg_duration = seg_end_s - seg_start_s
global_t_start = seg_start_s
global_t_end = seg_end_s
t_out = []
first_grid = math.ceil(global_t_start / ctx.dt_out) * ctx.dt_out
t = first_grid
while t <= global_t_end + 1e-9:
if t >= global_t_start - 1e-9:
t_out.append(t - global_t_start)
t += ctx.dt_out
if not t_out or t_out[0] > 1e-9:
t_out.insert(0, 0.0)
if t_out[-1] < seg_duration - 1e-9:
t_out.append(seg_duration)
t_eval = np.array(sorted(set(t_out)))
t_eval = t_eval[t_eval <= seg_duration + 1e-9]
segment_states = []
if active_burn is not None and current_mass is not None:
y0 = np.concatenate([current_r, current_v, [current_mass]])
if ctx.include_stm:
y0 = np.concatenate([y0, current_phi_flat])
b_thrust = active_burn.thrust_N
b_isp = active_burn.isp_s
b_dir = np.array(active_burn.direction)
b_idx = 0 if active_burn.frame.value == "VNB" else 1
def deriv(t_sec: float, y: np.ndarray) -> np.ndarray:
return _powered_derivative_njit( # type: ignore[no-any-return]
t_sec, y, ctx.t_jd0 + seg_start_s / SECONDS_PER_DAY,
ctx.use_drag, ctx.drag_cd, ctx.drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
ctx.hf_atmosphere, ctx.include_third_body, ctx.t_jd0, ctx.duration_d,
ctx.sun_coeffs, ctx.moon_coeffs, ctx.use_srp, ctx.srp_cr, ctx.srp_use_shadow,
b_thrust, b_isp, b_dir, b_idx, ctx.srp_area_m2
)
# Fix 5.5: Use user-provided powered_atol instead of magic numbers. Mass tolerance scales loosely.
atol_vec = np.array([ctx.powered_atol] * 6 + [max(1e-5, ctx.powered_atol * 1e7)])
if ctx.include_stm:
atol_vec = np.concatenate([atol_vec, np.full(36, ctx.powered_atol)])
sol = solve_ivp(
deriv, t_span=(0.0, seg_duration), y0=y0, method="DOP853", t_eval=t_eval,
rtol=ctx.rtol if ctx.rtol is not None else ctx.powered_rtol,
atol=ctx.atol if ctx.atol is not None else atol_vec,
)
is_powered = True
else:
y0 = np.concatenate([current_r, current_v])
if ctx.include_stm:
y0 = np.concatenate([y0, current_phi_flat])
def deriv(t_sec: float, y: np.ndarray) -> np.ndarray:
return _coast_derivative_njit( # type: ignore[no-any-return]
t_sec, y, ctx.t_jd0 + seg_start_s / SECONDS_PER_DAY,
ctx.use_drag, ctx.drag_cd, ctx.drag_area_m2, drag_mass_kg,
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily,
ctx.hf_atmosphere, ctx.include_third_body, ctx.t_jd0, ctx.duration_d,
ctx.sun_coeffs, ctx.moon_coeffs, ctx.use_srp, ctx.srp_cr, ctx.srp_use_shadow,
ctx.srp_area_m2
)
atol_vec = np.array([ctx.coast_atol] * 6)
if ctx.include_stm:
atol_vec = np.concatenate([atol_vec, np.full(36, ctx.coast_atol)])
sol = solve_ivp(
deriv, t_span=(0.0, seg_duration), y0=y0, method="DOP853", t_eval=t_eval,
rtol=ctx.rtol if ctx.rtol is not None else ctx.coast_rtol,
atol=ctx.atol if ctx.atol is not None else atol_vec,
)
is_powered = False
if sol.success and hasattr(sol, "y") and not np.all(np.isfinite(sol.y)):
sol.success = False
sol.message = "Numerical instability detected (NaN/Inf in state)"
if not sol.success:
return False, sol.message, [], current_r, current_v, current_mass or 0.0, current_phi_flat, drag_mass_kg
_cov_out = None
for i in range(len(sol.t)):
if ctx.include_stm:
phi_i = sol.y[7:43, i].reshape((6, 6)) if is_powered else sol.y[6:42, i].reshape((6, 6))
# Full mapping of 6x6 state covariance using the STM.
_cov_out = phi_i @ ctx.current_P0 @ phi_i.T
if ctx.snc_config is not None:
dt = sol.t[i]
q = ctx.snc_config.q_psd_m2_s3 / 1e6 # (m/s²)²/Hz -> (km/s²)²/Hz
Q_add = np.zeros((6, 6))
if ctx.snc_config.mode == "white_noise":
dt3_q = (dt**3 / 3.0) * q
dt2_q = (dt**2 / 2.0) * q
dt_q = dt * q
for j in range(3):
Q_add[j, j] = dt3_q
Q_add[j, j+3] = dt2_q
Q_add[j+3, j] = dt2_q
Q_add[j+3, j+3] = dt_q
_cov_out += Q_add
segment_states.append(
NumericalState(
t_jd=ctx.t_jd0 + (seg_start_s + sol.t[i]) / SECONDS_PER_DAY,
position_km=sol.y[:3, i].copy(),
velocity_km_s=sol.y[3:6, i].copy(),
mass_kg=float(sol.y[6, i]) if is_powered else (current_mass or 0.0),
covariance_km2=_cov_out,
)
)
r_out = sol.y[:3, -1].copy()
v_out = sol.y[3:6, -1].copy()
m_out = float(sol.y[6, -1]) if is_powered else (current_mass or 0.0)
if ctx.include_stm:
# Carry the fully propagated covariance, including SNC process noise,
# into the next segment and restart the segment-local STM at identity.
# Otherwise process noise added before a maneuver/coast boundary is lost.
if _cov_out is not None:
ctx.current_P0 = _cov_out.copy()
phi_out = np.eye(6, dtype=np.float64).flatten()
else:
phi_out = current_phi_flat
dm_out = m_out if is_powered else drag_mass_kg
return True, "", segment_states, r_out, v_out, m_out, phi_out, dm_out
[docs]
def propagate_cowell(
state0: NumericalState,
duration_s: float,
dt_out: float = 60.0,
drag_config: Optional[DragConfig] = None,
include_third_body: bool = True,
rtol: Optional[float] = None,
atol: Optional[float] = None,
maneuvers: Optional[list[FiniteBurn]] = None,
use_de: bool = True,
use_empirical_drag: bool = True,
include_stm: bool = False,
snc_config: Optional[SNCConfig] = None,
coast_rtol: float = 1e-8,
coast_atol: float = 1e-8,
powered_rtol: float = 1e-12,
powered_atol: float = 1e-12,
_precomputed_sun: Optional[np.ndarray] = None,
_precomputed_moon: Optional[np.ndarray] = None,
) -> list[NumericalState]:
"""Propagate an orbit using segmented Cowell's method with DOP853.
This is a mission-operations–grade numerical propagator that
automatically segments the integration timeline at engine
ignition/cutoff boundaries. Each segment uses the appropriate
derivative function:
- **Coast segments**: 6-DOF [r, v] — gravitational + drag.
- **Powered segments**: 7-DOF [r, v, m] — adds thrust and
Tsiolkovsky mass depletion.
The segmented approach ensures that ``solve_ivp`` never steps across
a force-model discontinuity, eliminating truncation error at burn
edges.
Known Limitations:
- Geopotential truncated at J6.
- Atmospheric scale height uses single-layer exponential profile;
multi-altitude molecular mass variation is a third-order effect.
- Powered-arc default mass absolute tolerance is 10⁻⁵ kg (0.01 g), which
is loose for micro-thrusters on small spacecraft; pass ``atol`` for
tighter mass conservation.
- SRP uses a dual-cone Earth umbra/penumbra shadow model
(``_srp_illumination_factor_dual_cone_njit``). The penumbra formula
is the standard Montenbruck & Gill §3.4.2 planar circle-circle
intersection — industry reference for Earth-orbit SRP shadow models.
Planar approximation error is O(β²/12) ≈ 1.7×10⁻⁷, below IEEE-754
double precision significance.
Args:
state0: Initial state (position + velocity + optional mass).
duration_s: Total propagation duration in seconds.
dt_out: Output time step in seconds (default 60 s).
drag_config: Optional atmospheric drag parameters.
include_third_body: Include Sun/Moon gravity.
rtol: Top-level relative tolerance override (WARNING: Silently discards any
fine-tuned ``coast_rtol`` and ``powered_rtol`` values).
atol: Top-level absolute tolerance override (WARNING: Silently discards any
fine-tuned ``coast_atol`` and ``powered_atol`` values).
maneuvers: Optional list of ``FiniteBurn`` definitions.
Burns must not overlap in time.
use_de: Use JPL DE421 for Sun/Moon (True) or analytical (False).
use_empirical_drag: Use F10.7/Ap drag model (True) or static (False).
coast_rtol: Relative tolerance for coast arcs (default 1e-8).
coast_atol: Absolute tolerance for coast arcs (default 1e-8).
For high-accuracy conjunction analysis, tighten to 1e-12.
powered_rtol: Relative tolerance for powered arcs (default 1e-12).
powered_atol: Absolute tolerance for powered arcs (default 1e-12).
Returns:
List of ``NumericalState`` objects at each output time step.
If maneuvers are present, each state includes the current mass.
"""
t_jd0 = state0.t_jd
if rtol is not None or atol is not None:
logger.warning(
"Top-level `rtol` or `atol` provided to propagate_cowell. "
"These will override `coast_rtol`, `powered_rtol`, etc., for all segments. "
"Prefer using the segment-specific tolerance arguments."
)
if t_jd0 < 2400000.5:
logger.warning(
f"Epoch {t_jd0} looks like MJD instead of JD. "
"SGP4 models expect standard Julian Dates (typically > 2400000)."
)
if duration_s > 72 * 3600:
logger.warning(
f"Long-duration Cowell propagation ({duration_s/3600:.1f}h > 72h) detected. "
"Numerical accuracy degrades for multi-day propagations due to unmodeled "
"perturbations (SRP variability, atmospheric weather). Verify force-model "
"completeness and tighten tolerances for mission-critical analysis."
) # Previous text incorrectly blamed SGP4 — this is the Cowell integrator.
burns: list[FiniteBurn] = []
mass_kg = state0.mass_kg
if maneuvers:
burns, mass_kg = _prepare_maneuvers(maneuvers, mass_kg)
segments = _build_segments(t_jd0, duration_s, burns)
if include_third_body:
if _precomputed_sun is not None and _precomputed_moon is not None:
sun_coeffs, moon_coeffs = _precomputed_sun, _precomputed_moon
duration_d = duration_s / 86400.0
else:
sun_coeffs, moon_coeffs, duration_d = _compute_planetary_splines(t_jd0, duration_s, use_de)
else:
sun_coeffs = np.zeros((1, 1, 3))
moon_coeffs = np.zeros((1, 1, 3))
duration_d = duration_s / 86400.0
use_drag = drag_config is not None
drag_cd = drag_config.cd if drag_config else 0.0
drag_area_m2 = drag_config.area_m2 if drag_config else 0.0
drag_mass_kg = drag_config.mass_kg if drag_config else 1.0
hf_atmosphere = False
if drag_config is not None and use_empirical_drag:
model_str = drag_config.model.upper()
if model_str == "NRLMSISE00":
hf_atmosphere = True
elif model_str == "EXPONENTIAL":
hf_atmosphere = False
else:
raise ValueError(f"Unsupported DragConfig model: {drag_config.model}. Supported models are 'NRLMSISE00' and 'EXPONENTIAL'.")
_srp_requested = drag_config is not None and getattr(drag_config, "include_srp", True)
use_srp = bool(_srp_requested and include_third_body)
if _srp_requested and not include_third_body:
logger.warning(
"DragConfig.include_srp=True but include_third_body=False: SRP requires "
"the Sun position vector computed by the third-body module. SRP is disabled. "
"Set include_third_body=True to enable SRP, or set DragConfig.include_srp=False "
"to suppress this warning."
)
srp_cr = float(drag_config.cr) if drag_config is not None else 1.5
srp_area_m2 = drag_area_m2
if drag_config is not None and drag_config.srp_area_m2 is not None:
srp_area_m2 = float(drag_config.srp_area_m2)
if srp_area_m2 < 0.0:
raise ValueError(
f"DragConfig.srp_area_m2 must be non-negative, got {srp_area_m2}."
)
srp_use_shadow = bool(drag_config is not None and getattr(drag_config, "srp_conical_shadow", True))
_r0_mag = float(np.linalg.norm(state0.position_km))
# drag_ref_alt_km is updated from current_r at each segment boundary below.
# This initial value is used only for the first segment & as the drag environment seed.
drag_ref_alt_km = max(0.0, _r0_mag - EARTH_EQUATORIAL_RADIUS_KM)
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily = _prepare_drag_environment(t_jd0, drag_ref_alt_km, use_drag, use_empirical_drag)
_initial_cov = getattr(state0, "covariance_km2", None)
# Default to 6x6 identity (scaled) if using STM.
if _initial_cov is not None:
current_P0 = _initial_cov.copy()
else:
if include_stm:
raise ValueError(
"propagate_cowell: `include_stm=True` requires `state0.covariance_km2` to be set. "
"Cannot propagate covariance without an initial uncertainty. "
"Set state0 = NumericalState(..., covariance_km2=np.diag("
"[sigma_r**2]*3 + [sigma_v**2]*3)) where sigma_r (km) and "
"sigma_v (km/s) are your 1-sigma position/velocity uncertainties."
)
current_P0 = np.eye(3, dtype=np.float64) * 1e-6
ctx = _PropagatorContext(
t_jd0=t_jd0, duration_d=duration_d, dt_out=dt_out, include_third_body=include_third_body,
use_drag=use_drag, use_empirical_drag=use_empirical_drag, hf_atmosphere=hf_atmosphere,
use_srp=use_srp, srp_cr=srp_cr, srp_use_shadow=srp_use_shadow, include_stm=include_stm,
rtol=rtol, atol=atol, coast_rtol=coast_rtol, coast_atol=coast_atol,
powered_rtol=powered_rtol, powered_atol=powered_atol,
sun_coeffs=sun_coeffs, moon_coeffs=moon_coeffs,
drag_cd=drag_cd, drag_area_m2=drag_area_m2, srp_area_m2=srp_area_m2, current_P0=current_P0,
snc_config=snc_config
)
all_states: list[NumericalState] = []
current_r = state0.position_km.copy()
current_v = state0.velocity_km_s.copy()
current_mass = mass_kg
current_phi_flat = np.eye(6, dtype=np.float64).flatten()
for seg_start_s, seg_end_s, active_burn in segments:
seg_duration = seg_end_s - seg_start_s
if seg_duration < 1e-9:
continue
if use_drag and use_empirical_drag:
# Recompute drag_ref_alt_km from the actual current position.
# Using the fixed initial altitude systematically underestimates drag
# as the satellite decays to lower altitudes over long propagations.
_current_r_mag = float(np.linalg.norm(current_r))
drag_ref_alt_km = max(0.0, _current_r_mag - EARTH_EQUATORIAL_RADIUS_KM)
drag_rho, drag_H_km, drag_ref_alt_km, f107_obs, f107_adj, ap_daily = _prepare_drag_environment(
t_jd0 + seg_start_s / 86400.0, drag_ref_alt_km, use_drag, use_empirical_drag
)
success, msg, seg_states, current_r, current_v, current_mass, current_phi_flat, drag_mass_kg = _integrate_segment(
seg_start_s, seg_end_s, active_burn,
current_r, current_v, current_mass,
current_phi_flat, ctx,
drag_mass_kg, drag_rho, drag_H_km, drag_ref_alt_km,
f107_obs, f107_adj, ap_daily
)
if not success:
logger.error(f"Integration failed: {msg}")
from astra import config
if config.ASTRA_STRICT_MODE:
from astra.errors import PropagationError
raise PropagationError(f"Cowell integration failed: {msg}", t_jd=t_jd0 + seg_start_s / SECONDS_PER_DAY)
else:
# Append a terminal error state to inform caller in relaxed mode (Fix 3.4)
all_states.append(
NumericalState(
t_jd=t_jd0 + seg_start_s / SECONDS_PER_DAY,
position_km=current_r,
velocity_km_s=current_v,
mass_kg=current_mass,
covariance_km2=ctx.current_P0 if ctx.include_stm else None,
error_message=msg,
)
)
break
all_states.extend(seg_states)
if all_states:
deduped = [all_states[0]]
for s in all_states[1:]:
if abs(s.t_jd - deduped[-1].t_jd) > 1e-12:
deduped.append(s)
all_states = deduped
logger.info(f"Propagation complete: {len(all_states)} states generated.")
return all_states
# ---------------------------------------------------------------------------
# Timeline Segmentation
# ---------------------------------------------------------------------------
def _build_segments(
t_jd0: float,
duration_s: float,
burns: list[FiniteBurn],
) -> list[tuple[float, float, Optional[FiniteBurn]]]:
"""Build an ordered list of (t_start_s, t_end_s, burn_or_None) segments.
Slices the total propagation window so that every burn arc and
every coast arc is its own contiguous segment. The integrator is
re-initialised at each boundary.
Args:
t_jd0: Epoch of propagation start (Julian Date).
duration_s: Total propagation time in seconds.
burns: Sorted list of FiniteBurn objects.
Returns:
List of (start_s, end_s, burn) tuples. burn is None for coast.
"""
segments: list[tuple[float, float, Optional[FiniteBurn]]] = []
cursor_s = 0.0
end_s = duration_s
# Robust overlap detection (SE-G) — build a set of burn
# indices to skip so that the processing loop below actually omits them,
# rather than just warning and then still scheduling the overlapping arc.
# In STRICT_MODE we raise immediately; in Relaxed mode we warn and skip.
import logging as _log_mod
_seg_log = _log_mod.getLogger(__name__)
skipped_indices: set[int] = set()
for i in range(len(burns) - 1):
b1, b2 = burns[i], burns[i + 1]
if b2.epoch_ignition_jd < b1.epoch_cutoff_jd - 1e-12:
from astra import config
from astra.errors import ManeuverError
msg = (
f"Temporal overlap detected: Burn[{i+1}] ignition "
f"{b2.epoch_ignition_jd:.8f} JD is before Burn[{i}] cutoff "
f"{b1.epoch_cutoff_jd:.8f} JD."
)
if config.ASTRA_STRICT_MODE:
raise ManeuverError(msg)
else:
_seg_log.warning(
"%s Burn[%d] will be skipped to avoid dual-thrust arc.", msg, i + 1
)
skipped_indices.add(i + 1) # skip the later (overlapping) burn
for idx, burn in enumerate(burns):
if idx in skipped_indices:
continue # AUDIT-B-03: actually omit the overlapping burn
# Convert burn epochs to seconds relative to t_jd0
ign_s = (burn.epoch_ignition_jd - t_jd0) * 86400.0
cut_s = (burn.epoch_cutoff_jd - t_jd0) * 86400.0
# Clamp to propagation window
ign_s = max(ign_s, 0.0)
cut_s = min(cut_s, end_s)
if ign_s >= end_s or cut_s <= 0.0:
continue # Burn is entirely outside the window
# Coast before this burn
if ign_s > cursor_s + 1e-9:
segments.append((cursor_s, ign_s, None))
# Powered arc
if cut_s > ign_s + 1e-9:
segments.append((ign_s, cut_s, burn))
cursor_s = cut_s
# Final coast after last burn
if cursor_s < end_s - 1e-9:
segments.append((cursor_s, end_s, None))
return segments
# ---------------------------------------------------------------------------
# Propagation at Arbitrary Epochs (AS-01)
# ---------------------------------------------------------------------------
[docs]
def propagate_cowell_at_times(
state0: "NumericalState",
times_jd: "np.ndarray",
*,
drag_config: Optional["DragConfig"] = None,
include_third_body: bool = True,
use_de: bool = True,
maneuvers: Optional["list[FiniteBurn]"] = None,
include_stm: bool = False,
snc_config: Optional["SNCConfig"] = None,
) -> "list[NumericalState]":
"""Propagate to specific Julian Date epochs using the Cowell integrator.
Unlike :func:`propagate_cowell` which returns states at uniform ``dt_out``
intervals, this function returns states at **caller-specified** epochs.
This is essential for:
- Orbit determination residual computation at observation times.
- Ephemeris generation at irregular cadences.
- State interpolation at conjunction screening epochs.
Internally propagates with a dense output step and interpolates to the
requested times using cubic spline interpolation on the propagated
trajectory. The underlying force model, maneuver handling, and STM
propagation are identical to :func:`propagate_cowell`.
Args:
state0: Initial state (position + velocity + optional mass/covariance).
times_jd: 1-D array of Julian Dates at which states are requested.
Must all be ≥ ``state0.t_jd``. Need not be sorted (output will
match the input order).
drag_config: Optional atmospheric drag parameters.
include_third_body: Include Sun/Moon gravity (default True).
use_de: Use JPL DE421 ephemeris for Sun/Moon (default True).
maneuvers: Optional list of FiniteBurn objects.
include_stm: Propagate State Transition Matrix for covariance.
snc_config: Process noise configuration.
Returns:
List of :class:`NumericalState` objects, one per requested epoch,
in the same order as ``times_jd``.
Raises:
ValueError: If ``times_jd`` is empty or contains epochs before
``state0.t_jd``.
Example::
import astra
import numpy as np
state0 = astra.NumericalState(
t_jd=2460000.5,
position_km=np.array([6778.0, 0.0, 0.0]),
velocity_km_s=np.array([0.0, 7.668, 0.0]),
)
obs_times = np.array([2460000.51, 2460000.52, 2460000.55])
states = astra.propagate_cowell_at_times(state0, obs_times)
for s in states:
print(f"t={s.t_jd:.5f} r={s.position_km}")
"""
import scipy.interpolate
times_jd = np.asarray(times_jd, dtype=float)
if times_jd.ndim != 1 or len(times_jd) == 0:
raise ValueError("times_jd must be a non-empty 1-D array.")
t0_jd = state0.t_jd
min_t = float(np.min(times_jd))
max_t = float(np.max(times_jd))
if min_t < t0_jd - 1e-10:
raise ValueError(
f"All times_jd must be ≥ state0.t_jd ({t0_jd:.8f}). "
f"Got min(times_jd) = {min_t:.8f}."
)
# Total propagation duration
duration_s = (max_t - t0_jd) * 86400.0
if duration_s < 1e-6:
# All requested times are at or before epoch — return per-epoch copies
# with the correct t_jd. Returning the same object N times would allow
# a caller mutation to corrupt all N entries simultaneously.
return [
NumericalState(
t_jd=float(t),
position_km=state0.position_km.copy(),
velocity_km_s=state0.velocity_km_s.copy(),
mass_kg=state0.mass_kg,
)
for t in times_jd
]
# Propagate with dense output (small dt_out for interpolation accuracy)
# Use 10s steps for LEO, or 60s steps if > 6 hours
dt_out = min(10.0, max(1.0, duration_s / 1000.0))
dense_states = propagate_cowell(
state0=state0,
duration_s=duration_s,
dt_out=dt_out,
drag_config=drag_config,
include_third_body=include_third_body,
use_de=use_de,
maneuvers=maneuvers,
include_stm=include_stm,
snc_config=snc_config,
)
if len(dense_states) < 2:
# Single-state trajectory — return a copy for each requested epoch.
s0 = dense_states[0]
return [
NumericalState(
t_jd=float(t),
position_km=s0.position_km.copy(),
velocity_km_s=s0.velocity_km_s.copy(),
mass_kg=s0.mass_kg,
)
for t in times_jd
]
# Build interpolation arrays
dense_t_jd = np.array([s.t_jd for s in dense_states])
dense_pos = np.array([s.position_km for s in dense_states])
dense_vel = np.array([s.velocity_km_s for s in dense_states])
# Use local seconds for conditioning
t_s = (dense_t_jd - t0_jd) * 86400.0
req_s = (times_jd - t0_jd) * 86400.0
# Clamp requested times to propagation range
t_min_s, t_max_s = t_s[0], t_s[-1]
req_s_clamped = np.clip(req_s, t_min_s, t_max_s)
spline_pos = scipy.interpolate.CubicSpline(t_s, dense_pos, bc_type="natural")
spline_vel = scipy.interpolate.CubicSpline(t_s, dense_vel, bc_type="natural")
# BL-01: Interpolate mass along the trajectory so mid-burn queries return
# the physically correct instantaneous mass, not always the final mass.
# For coast-only propagations every entry is None; fall back to None.
dense_masses_raw = [s.mass_kg for s in dense_states]
_has_mass = any(m is not None for m in dense_masses_raw)
if _has_mass:
dense_masses = np.array(
[m if m is not None else 0.0 for m in dense_masses_raw],
dtype=float,
)
spline_mass = scipy.interpolate.CubicSpline(t_s, dense_masses, bc_type="natural")
interp_mass_arr = spline_mass(req_s_clamped)
else:
interp_mass_arr = None
# Evaluate position and velocity at requested times
interp_pos = spline_pos(req_s_clamped)
interp_vel = spline_vel(req_s_clamped)
result = []
for k in range(len(times_jd)):
m_k: Optional[float] = (
float(interp_mass_arr[k]) if interp_mass_arr is not None else None
)
result.append(NumericalState(
t_jd=float(times_jd[k]),
position_km=interp_pos[k],
velocity_km_s=interp_vel[k],
mass_kg=m_k,
))
return result
# ---------------------------------------------------------------------------
# Batch High-Fidelity Propagation # ---------------------------------------------------------------------------
[docs]
def propagate_cowell_batch(
states: "dict[str, NumericalState]",
duration_s: float,
dt_out: float = 60.0,
drag_config: Optional["DragConfig"] = None,
maneuvers: Optional["dict[str, list[FiniteBurn]]"] = None,
include_third_body: bool = True,
include_stm: bool = False,
max_workers: Optional[int] = None,
use_empirical_drag: bool = True,
) -> dict[str, list["NumericalState"]]:
"""Propagate multiple satellites concurrently with the high-fidelity Cowell integrator.
This is a production batch wrapper around :func:`propagate_cowell` that
parallelises propagation over ``N`` initial states using a
``ThreadPoolExecutor``. It eliminates the ad-hoc ``ThreadPoolExecutor``
pattern that users had to replicate (and which already existed
*inline* inside :func:`find_conjunctions`).
The input uses ``dict[str, NumericalState]`` (satellite-ID → state) rather
than a list, which is consistent with the ``TrajectoryMap`` convention used
throughout ASTRA (e.g. ``find_conjunctions``). This also makes the key
explicit and avoids any ambiguity when the caller needs to correlate inputs
and outputs.
Args:
states: ``dict[satellite_id, NumericalState]`` — one entry per satellite.
The key (string) is used as the map key in the returned dict.
duration_s: Total propagation duration in seconds (identical for all).
dt_out: Output step size in seconds (default 60 s).
drag_config: Optional :class:`DragConfig`; applied uniformly to all
satellites unless overridden per-satellite in the future.
maneuvers: Optional ``dict[satellite_id, list[FiniteBurn]]``.
Missing keys default to an empty burn sequence.
include_third_body: Enable Sun/Moon third-body gravity.
include_stm: Whether to propagate the 6x6 State Transition Matrix.
max_workers: Thread pool size. Defaults to ``min(32, len(states))``.
use_empirical_drag: Whether to use empirical (F10.7/Ap) drag calculations.
Note:
Atmosphere model (NRLMSISE-00 or exponential) and SRP are controlled
via the ``drag_config`` argument. Set ``DragConfig(model='NRLMSISE00',
include_srp=True)`` to enable high-fidelity atmosphere and SRP;
these are NOT separate kwargs. Returns:
``dict[satellite_id, list[NumericalState]]`` — propagated state histories.
Satellites that fail propagation are absent from the dict; their error
is logged at WARNING level.
Raises:
ValueError: If ``states`` is empty or ``duration_s`` <= 0.
Example::
import astra, numpy as np
s1 = astra.NumericalState(t_jd=2460000.5,
position_km=np.array([6778.0, 0.0, 0.0]),
velocity_km_s=np.array([0.0, 7.668, 0.0]))
s2 = astra.NumericalState(t_jd=2460000.5,
position_km=np.array([6788.0, 0.0, 0.0]),
velocity_km_s=np.array([0.0, 7.658, 0.0]))
results = astra.propagate_cowell_batch(
{"ISS": s1, "DEBRIS-A": s2}, duration_s=3600.0
)
for sat_id, traj in results.items():
print(sat_id, len(traj), "steps")
"""
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
_log = logging.getLogger(__name__)
if not states:
raise ValueError("propagate_cowell_batch: states dict is empty.")
if duration_s <= 0.0:
raise ValueError(
f"propagate_cowell_batch: duration_s must be positive, got {duration_s}."
)
n = len(states)
effective_workers = max_workers if max_workers is not None else min(32, n)
burns_map = maneuvers or {}
results: dict[str, list[NumericalState]] = {}
def _worker(sat_id: str, state: "NumericalState") -> tuple[str, list["NumericalState"]]:
"""Single-satellite propagation task for the thread pool."""
# propagate_cowell does not accept hf_atmosphere / use_srp kwargs directly;
# those are handled via DragConfig.model and the drag_config's srp_cr field.
# use_empirical_drag=True enables NRLMSISE-00 when DragConfig.model='NRLMSISE00'.
traj = propagate_cowell(
state,
duration_s=duration_s,
dt_out=dt_out,
drag_config=drag_config,
maneuvers=burns_map.get(sat_id, []),
include_third_body=include_third_body,
include_stm=include_stm,
use_empirical_drag=use_empirical_drag,
)
return sat_id, traj
future_to_id: dict = {}
with ThreadPoolExecutor(max_workers=effective_workers) as executor:
for sat_id, state in states.items():
fut = executor.submit(_worker, sat_id, state)
future_to_id[fut] = sat_id
for fut in as_completed(future_to_id):
sat_id = future_to_id[fut]
try:
sid, traj = fut.result()
results[sid] = traj
except ValueError as ve:
# BL-07: Distinguish caller precondition failures (e.g. missing
# covariance_km2 for include_stm=True) from integration failures.
_log.error(
"propagate_cowell_batch: PRECONDITION FAILURE for %s — %s. "
"This is a caller error (e.g. missing covariance_km2 when "
"include_stm=True). The satellite was dropped from results.",
sat_id,
ve,
)
except Exception as exc:
_log.warning(
"propagate_cowell_batch: propagation failed for %s — %s",
sat_id,
exc,
)
return results