Source code for astra.models

# astra/models.py
"""ASTRA Core data models.
All persistent data structures used across the library are defined here as
frozen dataclasses.  This is the single source of truth for all inter-module
data types — no other module defines dataclasses.
Every dataclass uses ``frozen=True`` to enforce immutability.
"""
from __future__ import annotations
from typing import Any
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
import numpy as np
from astra.constants import G0_STD as _G0_STD
# ---------------------------------------------------------------------------
# Type Aliases
# ---------------------------------------------------------------------------
# A map from NORAD ID to trajectory array.
# Shape of each array: (T, 3) where T = number of timesteps.
# Units: km, frame: TEME.
TrajectoryMap = dict[str, np.ndarray]
# NORAD ID → velocity array (same keys as TrajectoryMap).
# Shape of each array: (T, 3) where T = number of timesteps.
# Units: km/s, frame: TEME.
VelocityMap = dict[str, np.ndarray]
# A list of NORAD ID pairs (candidate conjunction pairs).
CandidatePairs = list[tuple[str, str]]
# Time array: array of Julian Dates.
TimeArray = np.ndarray  # shape (T,), dtype float64
def _quat_rotate(q: tuple[float, float, float, float], v: np.ndarray) -> np.ndarray:
    """Rotate vector v by quaternion q = (w, x, y, z)."""
    w, qx, qy, qz = q
    u = np.array([qx, qy, qz])
    return v + 2.0 * np.cross(u, np.cross(u, v) + w * v)
[docs] def projected_area_m2( dimensions_m: tuple[float, float, float], attitude_quaternion: tuple[float, float, float, float], rel_vel_direction: np.ndarray, ) -> float: """Compute projected cross-sectional area of a box onto the B-plane. Models the satellite as a rectangular cuboid with given dimensions, rotated by the attitude quaternion. Projects the 3 face normals onto the relative velocity direction and sums visible face areas. This replaces the isotropic sphere assumption for collision geometry. Args: dimensions_m: (length, width, height) in meters. attitude_quaternion: (w, x, y, z) unit quaternion. rel_vel_direction: (3,) unit vector along relative velocity. Returns: Projected area in m². """ length, w, h = dimensions_m q = attitude_quaternion # Body-frame face normals and their areas faces = [ (np.array([1.0, 0.0, 0.0]), w * h), # +X face (np.array([0.0, 1.0, 0.0]), length * h), # +Y face (np.array([0.0, 0.0, 1.0]), length * w), # +Z face ] total_area = 0.0 for normal_body, area in faces: # Rotate face normal to ECI frame normal_eci = _quat_rotate(q, normal_body) # Projected area contribution = |n · v_hat| * face_area # Factor of 2: both +/- faces can contribute cos_angle = abs(float(np.dot(normal_eci, rel_vel_direction))) total_area += cos_angle * area return total_area
# --------------------------------------------------------------------------- # Maneuver Frame Enumeration # ---------------------------------------------------------------------------
[docs] class ManeuverFrame(Enum): """Reference frame for specifying maneuver thrust direction. **VNB (Velocity-Normal-Binormal)** - **V:** Along the instantaneous velocity vector. - **N:** Along the orbital angular momentum (R × V). - **B:** Completes the right-handed triad (V × N). Preferred for orbit-raising and lowering burns: thrust stays aligned with velocity even on eccentric orbits. **RTN (Radial-Transverse-Normal)** / **RIC (Radial-Intrack-Crosstrack)** - **R:** Along the geocentric radial (position) vector. - **T:** Perpendicular to R in the orbital plane, roughly along-track. - **N:** Along the orbital angular momentum (R × V). Preferred for station-keeping and relative navigation. """ VNB = "VNB" RTN = "RTN"
# --------------------------------------------------------------------------- # FiniteBurn (Maneuver Definition) # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class FiniteBurn: """Finite-duration thrust maneuver for the 7-DOF Cowell propagator. Models a continuous engine burn with dynamically steered thrust (re-computed at every integration sub-step) and Tsiolkovsky-coupled mass depletion. All epochs are Julian Dates. Thrust direction is given as a *unit vector* in the chosen ``ManeuverFrame``; the engine magnitude is set by ``thrust_N``. """ epoch_ignition_jd: float duration_s: float thrust_N: float isp_s: float direction: tuple[float, float, float] frame: ManeuverFrame @property def epoch_cutoff_jd(self) -> float: """Julian Date of engine cutoff.""" return self.epoch_ignition_jd + self.duration_s / 86400.0 @property def mass_flow_rate_kg_s(self) -> float: """Propellant mass flow rate dm/dt (kg/s, positive value). Derived from the Tsiolkovsky relation: dm/dt = F / (Isp * g₀) """ # Import from constants instead of local literal. return self.thrust_N / (self.isp_s * _G0_STD) def __post_init__(self) -> None: """Validate burn parameters at construction time. Frozen dataclass: do not assign fields here except via ``object.__setattr__`` if normalization is ever added (initialization runs before immutability locks). Raises: ValueError: If any parameter is physically invalid. """ import numpy as _np if self.duration_s <= 0.0: raise ValueError( f"FiniteBurn.duration_s must be strictly positive, got {self.duration_s}." ) if self.thrust_N <= 0.0: raise ValueError( f"FiniteBurn.thrust_N must be strictly positive, got {self.thrust_N} N." ) if self.isp_s <= 0.0: raise ValueError( f"FiniteBurn.isp_s must be strictly positive, got {self.isp_s} s." ) d_mag = float(_np.linalg.norm(self.direction)) if abs(d_mag - 1.0) > 1e-6: raise ValueError( f"FiniteBurn.direction must be a unit vector (|d| = 1.0). " f"Got magnitude {d_mag:.8f}. Normalize before constructing FiniteBurn." )
# --------------------------------------------------------------------------- # SatelliteTLE # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class SatelliteTLE: """Parsed Two-Line Element set for a single orbital object. This is the entry point for all computations. Any satellite, debris, or rocket body that can be analysed in ASTRA Core must first exist as a ``SatelliteTLE``. """ norad_id: str name: str line1: str line2: str epoch_jd: float object_type: str classification_flag: str = "U" # 'U'=Unclassified, 'C'=Classified, 'S'=Secret bstar: float = 0.0 # NOTE: This is a security classification, NOT an object type. # object_type requires separate SATCAT enrichment. rcs_m2: Optional[float] = ( None # Radar Cross Section in **m²**. Required for high-fidelity SRP/drag. ) radius_m: Optional[float] = None dimensions_m: Optional[tuple[float, float, float]] = None # (length, width, height) attitude_quaternion: Optional[tuple[float, float, float, float]] = ( None # (w, x, y, z) ) attitude_mode: str = "TUMBLING" # Options: "NADIR", "TUMBLING", "INERTIAL"
[docs] @classmethod def from_strings(cls, line1: str, line2: str, name: str = "") -> "SatelliteTLE": """Create a SatelliteTLE directly from two raw lines, auto-calculating epoch. If ``name`` is omitted, a synthetic name ``NORAD-{id}`` is generated from the NORAD catalog number embedded in line 1, matching ``load_tle_catalog()``. Args: line1: Valid TLE line 1. line2: Valid TLE line 2. name: Optional name for the satellite. If empty, auto-generated. Returns: A fully initialized SatelliteTLE object. """ if not name: norad_id = line1[2:7].strip() if len(line1) >= 7 else "" name = f"NORAD-{norad_id}" if norad_id else "Unknown" from astra.tle import parse_tle return parse_tle(name, line1, line2)
# --------------------------------------------------------------------------- # SatelliteOMM # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class SatelliteOMM: """Parsed CCSDS Orbit Mean-Elements Message (OMM) for a single orbital object. Represents the modern, high-fidelity successor to the legacy TLE format. OMM JSON payloads are published by Space-Track.org and CelesTrak and contain additional physical metadata (mass, RCS) that TLEs structurally cannot carry. All angular fields are stored in **radians** (already converted by the OMM parser from the source's degree representation) to allow direct injection into ``Satrec.sgp4init()`` without further transformation. """ norad_id: str name: str epoch_jd: float object_type: str inclination_rad: float raan_rad: float argpo_rad: float mo_rad: float eccentricity: float mean_motion_rad_min: float bstar: float mean_motion_dot: float = 0.0 mean_motion_ddot: float = 0.0 rcs_m2: Optional[float] = None # Radar Cross Section in **m²**. mass_kg: Optional[float] = ( None # Spacecraft mass in **kg**. Mandatory for powered maneuvers. ) cd_area_over_mass: Optional[float] = None
[docs] @classmethod def from_dict(cls, record: dict[str, Any]) -> "SatelliteOMM": """Construct a ``SatelliteOMM`` from a raw OMM JSON dictionary. Convenience factory — delegates to ``astra.omm.parse_omm_record()`` so the caller does not need to import the parser module explicitly. Args: record: A single OMM record dictionary (as from ``json.loads()``). Returns: Fully populated ``SatelliteOMM`` instance. Example:: import json, astra records = json.loads(open("catalog.json").read()) sats = [astra.SatelliteOMM.from_dict(r) for r in records] """ from astra.omm import parse_omm_record return parse_omm_record(record)
# --------------------------------------------------------------------------- # SatelliteState – polymorphic Union across both data formats # --------------------------------------------------------------------------- #: Type alias accepted by all ASTRA-Core physics functions. #: Any function receiving a ``SatelliteState`` correctly handles #: both legacy TLE and modern OMM objects without modification. SatelliteState = Union[SatelliteTLE, SatelliteOMM] # --------------------------------------------------------------------------- # OrbitalState # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class OrbitalState: """Complete kinematic state of a single object at one instant. Produced by SGP4 propagation for a single satellite at a single time step. """ norad_id: str t_jd: float position_km: np.ndarray # shape (3,), dtype float64, TEME, km velocity_km_s: np.ndarray # shape (3,), dtype float64, TEME, km/s error_code: int
# --------------------------------------------------------------------------- # DebrisObject # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class DebrisObject: """Cataloged orbital object with pre-derived parameters for filtering. Contains the authoritative orbital source (either ``SatelliteTLE`` or ``SatelliteOMM``) plus derived orbital metrics that enable fast filtering **without propagation**. """ source: SatelliteState altitude_km: float inclination_deg: float period_minutes: float raan_deg: float eccentricity: float apogee_km: float perigee_km: float object_class: str rcs_m2: Optional[float] = None radius_m: Optional[float] = None # ------------------------------------------------------------------ # Backwards-compatibility shim # ------------------------------------------------------------------ @property def tle(self) -> SatelliteTLE: """Legacy accessor for TLE source. Raises if source is OMM. .. deprecated:: Use ``.source`` instead, which handles both TLE and OMM. """ if isinstance(self.source, SatelliteTLE): return self.source raise AttributeError( "This DebrisObject was built from an OMM record, not a TLE. " "Use `.source` to access the underlying SatelliteOMM." ) def __repr__(self) -> str: """Concise representation to avoid terminal hangs on large lists (A-06).""" sid = getattr(self.source, "norad_id", "UNK") return f"<DebrisObject NORAD={sid} Alt={self.altitude_km:.1f}km>"
# --------------------------------------------------------------------------- # ConjunctionEvent # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class ConjunctionEvent: """Detected close-approach event between two orbital objects. Primary output of conjunction analysis. Use ``collision_probability_nan`` for threshold checks without ``TypeError`` when Pc is unknown (``None`` becomes ``float('nan')``). """ object_a_id: str object_b_id: str tca_jd: float miss_distance_km: float relative_velocity_km_s: float collision_probability: Optional[float] # None when no covariance available risk_level: str position_a_km: np.ndarray # shape (3,), TEME, km position_b_km: np.ndarray # shape (3,), TEME, km covariance_source: str = "SYNTHETIC" # "CDM", "STM", "SYNTHETIC", or "UNAVAILABLE" @property def collision_probability_nan(self) -> float: """Collision probability as a ``float``, with ``None`` mapped to ``float('nan')``. Allows safe threshold comparisons without ``TypeError``:: pc = event.collision_probability_nan if not math.isnan(pc) and pc > 1e-4: alert(event) Returns: Pc in [0.0, 1.0] when covariance data was available, or ``float('nan')`` when ``collision_probability`` is ``None``. """ return ( float("nan") if self.collision_probability is None else self.collision_probability )
# --------------------------------------------------------------------------- # Observer # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class Observer: """Ground-based observation station. Required by all visibility functions. """ name: str latitude_deg: float longitude_deg: float elevation_m: float min_elevation_deg: float = 10.0 def __repr__(self) -> str: return f"<Observer '{self.name}' Lat={self.latitude_deg:.2f} Lon={self.longitude_deg:.2f}>"
# --------------------------------------------------------------------------- # PassEvent # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class PassEvent: """Satellite pass over a ground observer. A time interval during which the satellite's elevation angle exceeds the observer's minimum elevation threshold. Attributes: norad_id: NORAD catalog ID of the satellite. observer_name: Name of the ground observer/station. aos_jd: Acquisition of Signal time (Julian Date) - satellite rises above horizon. tca_jd: Time of Closest Approach (Julian Date) - maximum elevation. los_jd: Loss of Signal time (Julian Date) - satellite drops below horizon. max_elevation_deg: Maximum elevation angle during pass (degrees). azimuth_at_aos_deg: Azimuth at AOS (degrees, 0-360, N=0, E=90). azimuth_at_tca_deg: Azimuth at TCA (degrees, 0-360, N=0, E=90). azimuth_at_los_deg: Azimuth at LOS (degrees, 0-360, N=0, E=90). duration_seconds: Pass duration in seconds. satellite_illuminated: True if satellite is sunlit at TCA (for visual passes). observer_in_darkness: True if observer is in Earth's shadow at TCA. """ norad_id: str observer_name: str aos_jd: float tca_jd: float los_jd: float max_elevation_deg: float azimuth_at_aos_deg: float azimuth_at_tca_deg: float # Required - no default before fields without defaults azimuth_at_los_deg: float duration_seconds: float satellite_illuminated: bool = True # Is satellite in sunlight at TCA? observer_in_darkness: bool = False # Is observer in Earth's shadow at TCA?
# --------------------------------------------------------------------------- # FilterConfig # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True) class FilterConfig: """Filter parameters for the multi-stage debris filtering pipeline. Passed as a single configuration object to ``apply_filters()``. ``None`` fields are treated as "no constraint" for that dimension. """ min_altitude_km: Optional[float] = None max_altitude_km: Optional[float] = None lat_min_deg: Optional[float] = None lat_max_deg: Optional[float] = None lon_min_deg: Optional[float] = None lon_max_deg: Optional[float] = None t_start_jd: Optional[float] = None t_end_jd: Optional[float] = None object_types: Optional[tuple[str, ...]] = None max_objects: Optional[int] = ( None # Applied after other filters; keep first N survivors )