Source code for astra.debris

# astra/debris.py
"""ASTRA Core debris catalog filtering and statistics.
Pre-propagation filtering of debris catalogs. ALL filtering in this module
operates on DebrisObject parameters (derived TLE fields, NOT propagated
positions). Filtering is O(N) and involves no SGP4 calls.
"""
from __future__ import annotations
import math
from typing import Any, Optional
from astra.models import (
    DebrisObject,
    FilterConfig,
    SatelliteTLE,
    SatelliteOMM,
    SatelliteState,
)
from astra.utils import orbit_period, orbital_elements
from astra.constants import (
    EARTH_EQUATORIAL_RADIUS_KM,
    EARTH_MU_KM3_S2,
    TLE_AGE_LEO_MAX_DAYS,
    TLE_AGE_DEFAULT_MAX_DAYS,
)
[docs] def make_debris_object(source: SatelliteState) -> DebrisObject: """Build a DebrisObject from a SatelliteTLE or SatelliteOMM with derived elements. For TLE sources, elements are parsed directly from the raw TLE lines. For OMM sources, elements are read directly from the already-converted dataclass fields (no string parsing required). """ if isinstance(source, SatelliteTLE): elements = orbital_elements(source.line2) period_min = orbit_period(elements["mean_motion_rev_per_day"]) # Mean motion in rad/s for semi-major axis if period_min <= 0: n_rad_s = float("inf") a = 0.0 else: n_rad_s = (2.0 * math.pi) / (period_min * 60.0) a = (EARTH_MU_KM3_S2 / (n_rad_s**2)) ** (1.0 / 3.0) e = elements["eccentricity"] inclination_deg = elements["inclination_deg"] raan_deg = elements["raan_deg"] elif isinstance(source, SatelliteOMM): # OMM already carries all elements as clean floats — no string parsing. # mean_motion_rad_min → convert to rev/day for orbit_period() mean_motion_rev_day = source.mean_motion_rad_min * 1440.0 / (2.0 * math.pi) period_min = orbit_period(mean_motion_rev_day) if period_min <= 0: n_rad_s = float("inf") a = 0.0 else: n_rad_s = (2.0 * math.pi) / (period_min * 60.0) a = (EARTH_MU_KM3_S2 / (n_rad_s**2)) ** (1.0 / 3.0) e = source.eccentricity inclination_deg = math.degrees(source.inclination_rad) raan_deg = math.degrees(source.raan_rad) else: raise TypeError(f"Unsupported source type: {type(source).__name__}") perigee_km = a * (1.0 - e) - EARTH_EQUATORIAL_RADIUS_KM apogee_km = a * (1.0 + e) - EARTH_EQUATORIAL_RADIUS_KM altitude_km = a - EARTH_EQUATORIAL_RADIUS_KM # Harvest RCS from OMM if available rcs_m2 = getattr(source, "rcs_m2", None) return DebrisObject( source=source, altitude_km=altitude_km, inclination_deg=inclination_deg, period_minutes=period_min, raan_deg=raan_deg, eccentricity=e, apogee_km=apogee_km, perigee_km=perigee_km, object_class=source.object_type, rcs_m2=rcs_m2, )
[docs] def filter_altitude( objects: list[DebrisObject], min_km: float, max_km: float ) -> list[DebrisObject]: """Retain only objects whose mean orbital altitude is within bounds. Also filters objects whose perigee is pathologically low (perigee < min_km * 0.9) to discard quickly-decaying objects. Args: objects: List of DebrisObjects. min_km: Lower altitude bound (inclusive). max_km: Upper altitude bound (inclusive). Returns: Filtered list of DebrisObjects. """ results = [] for obj in objects: if min_km <= obj.altitude_km <= max_km: if obj.perigee_km >= (min_km * 0.9): results.append(obj) return results
[docs] def filter_region( objects: list[DebrisObject], lat_min_deg: float, lat_max_deg: float, lon_min_deg: Optional[float] = None, lon_max_deg: Optional[float] = None, ) -> list[DebrisObject]: """Retain objects whose ground track could pass through a bounding box. Uses a two-stage filter: **Stage 1 - Latitude (inclination-based):** An object with inclination *i* reaches latitudes up to ``i`` (prograde) or ``180 - i`` (retrograde). Objects whose latitude band does not overlap ``[lat_min_deg, lat_max_deg]`` are excluded immediately. **Stage 2 - Longitude (RAAN-based, if bounds are supplied):** For short-period orbits (period < 1440 min / 24 h), Earth's rotation causes the satellite's ground track to sweep all longitudes within a day, so the longitude filter is vacuous and all Stage-1 survivors are kept. For long-period orbits (GEO / HEO / deep-space), the ascending node is anchored near RAAN for the duration of interest, so a window check around RAAN is applied with a margin equal to the longitude swept in half an orbital period. This remains an **over-inclusive** pre-filter. Note: For hard longitude exclusion use ``propagate_trajectory()`` followed by manual geodetic post-filtering. Args: objects: List of DebrisObjects. lat_min_deg: Minimum latitude bound (degrees). lat_max_deg: Maximum latitude bound (degrees). lon_min_deg: Minimum longitude bound (degrees, -180 to +180). ``None`` disables longitude filtering entirely. lon_max_deg: Maximum longitude bound (degrees, -180 to +180). ``None`` disables longitude filtering entirely. Returns: Filtered list of DebrisObjects. Example:: # Retain objects that could pass over India (lat 8-37, lon 68-97) filtered = filter_region( objects, lat_min_deg=8.0, lat_max_deg=37.0, lon_min_deg=68.0, lon_max_deg=97.0, ) """ # Use `is not None` - NOT truthiness - so that # lon_min_deg=0.0 correctly activates the longitude filter path. apply_lon_filter = (lon_min_deg is not None) and (lon_max_deg is not None) results = [] for obj in objects: # ── Stage 1: Latitude ──────────────────────────────────────────────── inc = obj.inclination_deg max_lat_reached = inc if inc <= 90.0 else 180.0 - inc # Object latitude band: [-max_lat_reached, +max_lat_reached] if not (lat_min_deg <= max_lat_reached and lat_max_deg >= -max_lat_reached): continue # latitude bands do not overlap → skip # ── Stage 2: Longitude (if bounds supplied) ────────────────────────── if apply_lon_filter: # RAAN + inclination-based longitude pre-filter. # For orbits with period < 24 h (LEO/MEO), Earth's rotation (~360°/day) # means the ascending node sweeps every longitude within <= 1 day # regardless of RAAN. All such objects that pass Stage 1 are kept. period_min = obj.period_minutes if period_min <= 0.0 or period_min < 1440.0: # Short-period orbit: ascending node covers all longitudes in 24 h. results.append(obj) continue # Long-period orbit (GEO / HEO): ascending node stays near RAAN. # Apply a window check with margin for half-period nodal drift. raan = obj.raan_deg % 360.0 # normalise to [0, 360) lmin = float(lon_min_deg) % 360.0 # type: ignore[arg-type] lmax = float(lon_max_deg) % 360.0 # type: ignore[arg-type] # Half-period longitude drift: Earth rotates 360/day, so the node # drifts ~360*(P_min/1440)/2 degrees in half an orbital period. lon_half_sweep_deg = 360.0 * (period_min / 1440.0) / 2.0 if lon_half_sweep_deg >= 180.0: # If nodal drift covers 180+ degrees, it effectively sweeps # the entire globe in one period. Skip filtering. results.append(obj) continue lo = (lmin - lon_half_sweep_deg) % 360.0 hi = (lmax + lon_half_sweep_deg) % 360.0 if lo <= hi: in_band = lo <= raan <= hi else: # Band wraps through 0 degrees (e.g. lo=350, hi=10) in_band = raan >= lo or raan <= hi if not in_band: continue # RAAN outside reachable longitude band → skip results.append(obj) return results
[docs] def filter_time_window( objects: list[DebrisObject], t_start_jd: float, t_end_jd: float ) -> list[DebrisObject]: """Eliminate objects whose TLE epoch is too stale for predictions. Args: objects: List of DebrisObjects. t_start_jd: Window start as Julian Date. t_end_jd: Window end as Julian Date. Returns: Filtered list of DebrisObjects. """ results = [] for obj in objects: age_days = t_start_jd - obj.source.epoch_jd # Stale thresholds # Stricter threshold for LEO due to higher atmospheric drag is_stale = False if obj.altitude_km < 2000: if age_days > TLE_AGE_LEO_MAX_DAYS: is_stale = True else: if age_days > TLE_AGE_DEFAULT_MAX_DAYS: is_stale = True if not is_stale: results.append(obj) return results
[docs] def catalog_statistics(objects: list[DebrisObject]) -> dict[str, Any]: """Compute summary statistics across a debris catalog. Args: objects: List of DebrisObjects. Returns: Dictionary of computed statistics. """ if not objects: return { "total_count": 0, "by_type": {"PAYLOAD": 0, "ROCKET_BODY": 0, "DEBRIS": 0, "UNKNOWN": 0}, "by_regime": {"LEO": 0, "MEO": 0, "GEO": 0, "HEO": 0}, "altitude_mean_km": 0.0, "altitude_std_km": 0.0, "altitude_min_km": 0.0, "altitude_max_km": 0.0, "inclination_distribution": { "equatorial": 0, "inclined": 0, "polar": 0, "retrograde": 0, }, } import numpy as np altitudes = np.array([obj.altitude_km for obj in objects]) eccentricities = np.array([obj.eccentricity for obj in objects]) inclinations = np.array([obj.inclination_deg for obj in objects]) classes = [obj.object_class for obj in objects] # Regime classification via vectorized boolean indexing is_heo = eccentricities > 0.25 is_leo = (~is_heo) & (altitudes < 2000) is_geo = (~is_heo) & (altitudes >= 35000) & (altitudes <= 36000) is_meo = (~is_heo) & (~is_leo) & (~is_geo) by_regime = { "LEO": int(np.sum(is_leo)), "MEO": int(np.sum(is_meo)), "GEO": int(np.sum(is_geo)), "HEO": int(np.sum(is_heo)), } # Inclination distribution inclination_dist = { "equatorial": int(np.sum(inclinations < 10.0)), "inclined": int(np.sum((inclinations >= 10.0) & (inclinations < 80.0))), "polar": int(np.sum((inclinations >= 80.0) & (inclinations <= 90.0))), "retrograde": int(np.sum(inclinations > 90.0)), } # Object type counts from collections import Counter type_counts = Counter(classes) by_type = {"PAYLOAD": 0, "ROCKET_BODY": 0, "DEBRIS": 0, "UNKNOWN": 0} by_type.update({k: v for k, v in type_counts.items() if k in by_type}) return { "total_count": len(objects), "by_type": by_type, "by_regime": by_regime, "altitude_mean_km": float(np.mean(altitudes)), "altitude_std_km": float(np.std(altitudes)), "altitude_min_km": float(np.min(altitudes)), "altitude_max_km": float(np.max(altitudes)), "inclination_distribution": inclination_dist, }
[docs] def apply_filters( catalog: list[DebrisObject], config: FilterConfig ) -> list[DebrisObject]: """Execute the REQUIRED PIPELINE FUNCTION for filtering. Args: catalog: List of DebrisObjects to filter. config: FilterConfig data class outlining constraints. Returns: Filtered list of DebrisObjects. """ filtered = catalog # 1. Apply altitude filter if config.min_altitude_km is not None and config.max_altitude_km is not None: filtered = filter_altitude( filtered, config.min_altitude_km, config.max_altitude_km ) # 2. Apply region filter. Latitude bounds are independently useful; longitude # bounds further refine the same over-inclusive geographic pre-filter. has_lat = config.lat_min_deg is not None and config.lat_max_deg is not None has_lon = config.lon_min_deg is not None and config.lon_max_deg is not None if has_lat: # types in mypy expect float, but we verified not None filtered = filter_region( filtered, lat_min_deg=config.lat_min_deg, # type: ignore[arg-type] lat_max_deg=config.lat_max_deg, # type: ignore[arg-type] lon_min_deg=config.lon_min_deg if has_lon else None, lon_max_deg=config.lon_max_deg if has_lon else None, ) # 3. Apply time window filter if config.t_start_jd is not None and config.t_end_jd is not None: filtered = filter_time_window( filtered, t_start_jd=config.t_start_jd, t_end_jd=config.t_end_jd, ) # 4. Apply object type filter if config.object_types is not None: valid_types = set(config.object_types) filtered = [obj for obj in filtered if obj.object_class in valid_types] # 5. Apply max_objects cap if config.max_objects is not None and len(filtered) > config.max_objects: # The prompt specifies "max_objects cap (if provided)" filtered = filtered[: config.max_objects] return filtered