"""ASTRA Core Data Ingestion Module.
Bridges the computation engine to live orbital data providers like CelesTrak
and Space-Track. Converts real-time API responses into ASTRA trajectory pipelines.
Supports dual-format ingestion:
- ``format="tle"`` (default): Returns a ``list[SatelliteTLE]``.
- ``format="json"`` (OMM): Returns a ``list[SatelliteOMM]``.
Example::
# Legacy TLE (default, unchanged behaviour)
tles = astra.fetch_celestrak_group("starlink")
# Modern OMM with full physical metadata
omms = astra.fetch_celestrak_group("starlink", format="json")
"""
from __future__ import annotations
from typing import Any
from typing import Literal, Union, cast
from astra.errors import AstraError
from astra.models import SatelliteTLE, SatelliteOMM
from astra.tle import load_tle_catalog
from astra.log import get_logger
from astra.version import __version__
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
_session = requests.Session()
_retry = Retry(
total=3,
backoff_factor=1.0,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
_adapter = HTTPAdapter(max_retries=_retry)
_session.mount("http://", _adapter)
_session.mount("https://", _adapter)
logger = get_logger(__name__)
_HEADERS = {
"User-Agent": (
f"ASTRA-Core/{__version__} (CelesTrak catalog client; "
f"https://pypi.org/project/astra-core-engine/)"
),
}
_BASE_URL = "https://celestrak.org/NORAD/elements/gp.php"
_SUP_GP_URL = "https://celestrak.org/NORAD/elements/supplemental/sup-gp.php"
FormatLiteral = Literal["tle", "json"]
# ---------------------------------------------------------------------------
# Internal Helpers
# ---------------------------------------------------------------------------
def _format_celestrak_supgp(fmt: FormatLiteral) -> str:
"""FORMAT value for sup-gp.php (CelesTrak expects uppercase TLE / JSON)."""
return "TLE" if fmt == "tle" else "JSON"
def _supplemental_params(group: str, fmt: FormatLiteral) -> dict[str, Any] | None:
"""Build query params for sup-gp.php, or None if no supplemental route exists.
``GROUP=active`` has no ``FILE=active`` equivalent on the supplemental API;
``gps-ops`` maps to ``SOURCE=GPS-A`` (broadcast almanac), not the legacy ops list.
"""
key = group.strip().lower()
form = _format_celestrak_supgp(fmt)
if key == "active":
return None
if key in ("gps-ops", "gps_ops"):
return {"SOURCE": "GPS-A", "FORMAT": form}
return {"FILE": group.strip().lower(), "FORMAT": form}
def _rate_limited(response: requests.Response) -> bool:
return response.status_code == 403 and (
"Data is updated once every 2 hours" in response.text
)
def _legacy_response_triggers_supplemental(response: requests.Response) -> bool:
"""True when gp.php should be retried via sup-gp.php."""
if response.status_code >= 500:
return True
if response.status_code == 200:
text = (response.text or "").strip()
if not text:
return True
if "invalid query" in text.lower():
return True
return False
def _fetch_supplemental_raw(
group: str, fmt: FormatLiteral, params: dict[str, str]
) -> str:
"""Download raw text/JSON from CelesTrak supplemental sup-gp.php."""
try:
response = _session.get(
_SUP_GP_URL, params=params, headers=_HEADERS, timeout=20.0, verify=True
)
if _rate_limited(response):
raise AstraError(
f"CelesTrak rate limit reached for group '{group}'. "
f"Cached data should be used. {response.text.strip()}"
)
response.raise_for_status()
except requests.RequestException as e:
raise AstraError(
f"Failed to fetch CelesTrak group '{group}' [{fmt}] via supplemental sup-gp.php: {e}"
) from e
text = (response.text or "").strip()
if not text or "invalid query" in text.lower():
raise AstraError(
f"CelesTrak supplemental sup-gp.php returned no usable data for group '{group}' [{fmt}]."
)
return response.text
def _fetch_group_raw(group: str, fmt: FormatLiteral) -> str:
"""Download raw text/JSON from CelesTrak (legacy gp.php, then sup-gp.php if needed)."""
url = f"{_BASE_URL}?GROUP={group}&FORMAT={fmt}"
sup_params = _supplemental_params(group, fmt)
try:
response = _session.get(url, headers=_HEADERS, timeout=20.0, verify=True)
except requests.RequestException as e:
if sup_params is None:
raise AstraError(
f"Failed to fetch CelesTrak group '{group}' [{fmt}]: {e}"
) from e
logger.info(
"CelesTrak gp.php request failed for group %r [%s]; using supplemental sup-gp.php",
group,
fmt,
)
return _fetch_supplemental_raw(group, fmt, sup_params)
if _rate_limited(response):
raise AstraError(
f"CelesTrak rate limit reached for group '{group}'. "
f"Cached data should be used. {response.text.strip()}"
)
if _legacy_response_triggers_supplemental(response):
if sup_params is None:
raise AstraError(
f"CelesTrak gp.php failed for group '{group}' [{fmt}] "
f"(HTTP {response.status_code}) and this group has no supplemental "
"sup-gp.php mapping (e.g. GROUP=active)."
)
logger.info(
"CelesTrak gp.php unavailable for group %r [%s]; using supplemental sup-gp.php",
group,
fmt,
)
return _fetch_supplemental_raw(group, fmt, sup_params)
try:
response.raise_for_status()
except requests.RequestException as e:
raise AstraError(
f"Failed to fetch CelesTrak group '{group}' [{fmt}]: {e}"
) from e
return response.text
def _parse_response(
text: str, fmt: FormatLiteral
) -> Union[list[SatelliteTLE], list[SatelliteOMM]]:
"""Route a raw API response to the correct parser based on the format string."""
if fmt == "tle":
return load_tle_catalog(text.splitlines())
elif fmt == "json":
from astra.omm import parse_omm_json
return parse_omm_json(text)
else:
raise AstraError(f"Unsupported format '{fmt}'. Use 'tle' or 'json'.")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def fetch_celestrak_active(
format: FormatLiteral = "tle",
) -> Union[list[SatelliteTLE], list[SatelliteOMM]]:
"""Fetch the active satellite catalog from CelesTrak.
Downloads the entire live active catalog and parses it into ASTRA data models.
Uses legacy ``gp.php`` only (there is no ``sup-gp.php`` equivalent for the full
active catalog).
Args:
format: ``"tle"`` (default) for legacy TLE format,
``"json"`` for modern OMM JSON with physical metadata.
Returns:
List of ``SatelliteTLE`` (format="tle") or ``SatelliteOMM`` (format="json").
"""
logger.info(f"Fetching active satellite catalog from CelesTrak [{format}]...")
text = _fetch_group_raw("active", format)
return _parse_response(text, format)
[docs]
def fetch_celestrak_group(
group: str, format: FormatLiteral = "tle"
) -> Union[list[SatelliteTLE], list[SatelliteOMM]]:
"""Fetch a specific constellation/group from CelesTrak.
Valid groups include: ``'starlink'``, ``'gps-ops'``, ``'iridium-33-debris'``, etc.
If legacy ``gp.php`` fails (HTTP 5xx, empty body, or invalid-query body), the client
retries against supplemental ``sup-gp.php`` where supported. For ``gps-ops``, the
supplemental path uses broadcast almanac data (``SOURCE=GPS-A``), not the legacy ops list.
Args:
group: CelesTrak group name string.
format: ``"tle"`` (default) for legacy TLE format,
``"json"`` for modern OMM JSON with physical metadata.
Returns:
List of ``SatelliteTLE`` (format="tle") or ``SatelliteOMM`` (format="json").
"""
text = _fetch_group_raw(group, format)
return _parse_response(text, format)
[docs]
def fetch_celestrak_comprehensive(
format: FormatLiteral = "tle",
strict_mode: bool = False,
) -> Union[list[SatelliteTLE], list[SatelliteOMM]]:
"""Fetch active payloads plus major debris clouds for a pseudo-full catalog.
Since CelesTrak does not expose a single unauthenticated 'all' endpoint,
this function assembles the ~25,000+ most important objects across key groups.
Args:
format: ``"tle"`` (default) for legacy TLE format,
``"json"`` for modern OMM JSON with physical metadata.
strict_mode: If True, raises an AstraError immediately if any group fails to
download. If False (default), logs a warning and continues.
Returns:
Deduplicated list of ``SatelliteTLE`` or ``SatelliteOMM`` objects.
"""
groups = [
"active", # All active payloads (~15k)
"1999-025", # Fengyun-1C debris (~3k)
"iridium-33-debris", # Iridium 33 debris (~300)
"cosmos-2251-debris", # Cosmos 2251 debris (~1k)
"1982-092", # Cosmos 1408 debris (~500)
"2019-006", # MICROSAT-R debris (~100)
"analyst", # Analyst objects
]
logger.info(
f"Assembling comprehensive catalog from {len(groups)} CelesTrak groups [{format}]..."
)
seen_ids: set[str] = set()
unified_catalog: list[Any] = []
for g in groups:
try:
logger.debug(f"Fetching group: {g}")
objects = fetch_celestrak_group(g, format=format)
for obj in objects:
if obj.norad_id not in seen_ids:
seen_ids.add(obj.norad_id)
unified_catalog.append(obj)
except AstraError as _grp_exc:
if strict_mode:
raise AstraError(
f"CelesTrak group '{g}' failed to download during comprehensive fetch. "
f"Strict mode is enabled. Error: {_grp_exc}"
) from _grp_exc
# Log a warning instead of silently
# skipping failed groups. A silent pass here produces a partial
# catalog that can cause missed conjunction screening without any
# indication of the data gap. The user MUST know which groups failed.
logger.warning(
"CelesTrak group '%s' failed to download and will be EXCLUDED from the "
"comprehensive catalog. The returned catalog is INCOMPLETE and may miss "
"conjunction events involving objects from this group. Error: %s. "
"Retry or fetch this group individually with fetch_celestrak_group(%r).",
g,
_grp_exc,
g,
)
return unified_catalog
# ---------------------------------------------------------------------------
# Explicit OMM Sibling Functions (Discoverable API)
# ---------------------------------------------------------------------------
# These thin wrappers exist purely for discoverability.
# When a user types `astra.fetch_celestrak_` in their IDE, they immediately
# see both the TLE and OMM variants without needing to know about format=.
[docs]
def fetch_celestrak_active_omm() -> list[SatelliteOMM]:
"""Fetch the active satellite catalog from CelesTrak in OMM JSON format.
Returns high-fidelity ``SatelliteOMM`` objects that include physical
metadata unavailable in TLEs: mass, radar cross-section (RCS), and
ballistic coefficient.
Data formats: ✓ SatelliteOMM only (use ``fetch_celestrak_active`` for TLEs)
Returns:
List of ``SatelliteOMM`` objects for all active satellites.
Example::
import astra
# OMM — high-fidelity with RCS and mass metadata
satellites = astra.fetch_celestrak_active_omm()
# TLE — legacy format (default)
satellites = astra.fetch_celestrak_active()
"""
return cast(list[SatelliteOMM], fetch_celestrak_active(format="json"))
[docs]
def fetch_celestrak_group_omm(group: str) -> list[SatelliteOMM]:
"""Fetch a specific satellite group from CelesTrak in OMM JSON format.
Returns high-fidelity ``SatelliteOMM`` objects with physical metadata
unavailable in TLEs: mass, radar cross-section (RCS), and ballistic
coefficient.
Data formats: ✓ SatelliteOMM only (use ``fetch_celestrak_group`` for TLEs)
Args:
group: CelesTrak group name (e.g. ``"starlink"``, ``"gps-ops"``).
Returns:
List of ``SatelliteOMM`` objects.
Example::
import astra
# OMM — high-fidelity
starlinks = astra.fetch_celestrak_group_omm("starlink")
# TLE — legacy format (default)
starlinks = astra.fetch_celestrak_group("starlink")
"""
return cast(list[SatelliteOMM], fetch_celestrak_group(group, format="json"))
[docs]
def fetch_celestrak_comprehensive_omm() -> list[SatelliteOMM]:
"""Fetch a comprehensive multi-group catalog from CelesTrak in OMM JSON format.
Assembles ~25,000+ objects from active satellites, Fengyun-1C debris,
Iridium 33 debris, Cosmos 2251 debris, and other major debris clouds.
Data formats: ✓ SatelliteOMM only (use ``fetch_celestrak_comprehensive`` for TLEs)
Returns:
Deduplicated list of ``SatelliteOMM`` objects.
Example::
import astra
# OMM — high-fidelity comprehensive catalog
catalog = astra.fetch_celestrak_comprehensive_omm()
# TLE — legacy format
catalog = astra.fetch_celestrak_comprehensive()
"""
return cast(list[SatelliteOMM], fetch_celestrak_comprehensive(format="json"))