# astra/tle.py
"""ASTRA Core TLE parsing and validation.
This module is the entry point for all data entering ASTRA Core. It handles
parsing, validation, and batch loading of Two-Line Element (TLE) sets from
raw text.
"""
from __future__ import annotations
import logging
import numpy as np
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from astra.models import SatelliteState
from datetime import datetime, timedelta, timezone
from astra.errors import AstraError, InvalidTLEError
from astra.models import SatelliteTLE
logger = logging.getLogger(__name__)
# J2000 reference epoch for JD conversion (matching astra.time)
_J2000_JD = 2451545.0
_J2000_EPOCH = datetime(2000, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
def _parse_epoch_to_jd(epoch_str: str) -> float:
"""Convert TLE epoch string (YYDDD.FFFFFFFF) to Julian Date.
Uses integer-split arithmetic to avoid floating-point rounding errors
at the day/fraction boundary. The fractional day is accumulated
via timedelta microseconds rather than floating-point multiplication.
Years >= 57 are interpreted as 19YY, years < 57 as 20YY (standard TLE
two-digit year convention).
From calendar year 2057 onward, YY ≥ 57 maps to 19YY and collides with
early-spacecraft epochs; for archival or long-horizon data prefer CCSDS OMM
(full UTC epoch) over TLEs.
"""
try:
y = int(epoch_str[:2])
day_of_year = float(epoch_str[2:])
except ValueError as e:
raise ValueError(f"Invalid epoch format: {epoch_str}") from e
year = 2000 + y if y < 57 else 1900 + y
# Integer day + fractional-day both accumulated through timedelta
day_whole = int(day_of_year) # e.g. 123
day_frac = day_of_year - day_whole # e.g. 0.45678
# Convert fractional day to integer microseconds for precise accumulation
frac_us = round(day_frac * 86400 * 1_000_000)
dt = datetime(year, 1, 1, tzinfo=timezone.utc) + timedelta(
days=day_whole - 1, microseconds=frac_us
)
delta = dt - _J2000_EPOCH
return _J2000_JD + delta.total_seconds() / 86400.0
[docs]
def check_tle_staleness(
satellite: SatelliteState, target_jd: float | np.ndarray
) -> None:
"""Verify that the propagation time is within 30 days of the satellite epoch.
SGP4 accuracy degrades exponentially over time. For mission-critical analysis,
using TLEs older than 30 days is discouraged and blocked in STRICT mode.
Args:
satellite: The SatelliteTLE or SatelliteOMM object being propagated.
target_jd: The target Julian Date(s) for propagation.
Raises:
PropagationError: If delta > 30 days and ASTRA_STRICT_MODE is True.
"""
import numpy as np
from astra import config
from astra.errors import PropagationError
delta_days = np.abs(np.asanyarray(target_jd) - satellite.epoch_jd)
max_delta = np.max(delta_days)
if max_delta > 30.0:
msg = (
f"TLE/OMM for {satellite.name} (NORAD {satellite.norad_id}) is stale "
f"({max_delta:.2f} days from epoch). Max recommended SGP4 horizon is 30 days."
)
if config.ASTRA_STRICT_MODE:
raise PropagationError(
msg, norad_id=satellite.norad_id, t_jd=float(np.max(target_jd))
)
logger.warning(msg)
[docs]
def parse_tle(name: str, line1: str, line2: str) -> SatelliteTLE:
"""Parse three raw TLE lines into a validated SatelliteTLE object.
Args:
name: Object name string.
line1: TLE line 1.
line2: TLE line 2.
Returns:
A fully populated SatelliteTLE instance.
Raises:
InvalidTLEError: If any validation fails.
"""
# 1. Strip whitespace
name = name.strip()
line1 = line1.strip()
line2 = line2.strip()
# 2. Length checks (each line must be exactly 69 characters)
if len(line1) != 69:
raise InvalidTLEError(
f"Line 1 length is {len(line1)}, expected 69",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line1,
reason="L1_LENGTH",
)
if len(line2) != 69:
raise InvalidTLEError(
f"Line 2 length is {len(line2)}, expected 69",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line2,
reason="L2_LENGTH",
)
# 3. Prefix checks (line 1 must start with "1 ", line 2 with "2 ")
if not line1.startswith("1 "):
raise InvalidTLEError(
"Line 1 does not start with '1 '",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line1,
reason="L1_PREFIX",
)
if not line2.startswith("2 "):
raise InvalidTLEError(
"Line 2 does not start with '2 '",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line2,
reason="L2_PREFIX",
)
# 4. Checksum validation
try:
expected_cs1 = int(line1[68])
if _compute_checksum(line1) != expected_cs1:
raise InvalidTLEError(
"Line 1 checksum mismatch",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line1,
reason="L1_CHECKSUM",
)
except ValueError:
raise InvalidTLEError(
"Line 1 checksum character is not a digit",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line1,
reason="L1_CHECKSUM",
)
try:
expected_cs2 = int(line2[68])
if _compute_checksum(line2) != expected_cs2:
raise InvalidTLEError(
"Line 2 checksum mismatch",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line2,
reason="L2_CHECKSUM",
)
except ValueError:
raise InvalidTLEError(
"Line 2 checksum character is not a digit",
norad_id="UNKNOWN",
object_name=name,
invalid_line=line2,
reason="L2_CHECKSUM",
)
# 5. NORAD ID consistency
norad_id = line1[2:7].strip()
norad_id2 = line2[2:7].strip()
if norad_id != norad_id2:
raise InvalidTLEError(
"NORAD ID mismatch between Line 1 and Line 2",
norad_id=norad_id,
object_name=name,
invalid_line=line2,
reason="ID_MISMATCH",
)
# 6. Extract epoch and convert to Julian Date
epoch_str = line1[18:32].strip()
try:
epoch_jd = _parse_epoch_to_jd(epoch_str)
except ValueError as e:
raise InvalidTLEError(
f"Failed to parse epoch: {e}",
norad_id=norad_id,
object_name=name,
invalid_line=line1,
reason="EPOCH_PARSE_ERROR",
)
# 7. Physical Bounds Checking (Input Poisoning Defense)
try:
ecco_str = line2[26:33].strip()
if ecco_str:
ecco = float("." + ecco_str)
if ecco < 0.0 or ecco >= 1.0:
raise ValueError(f"Eccentricity {ecco} out of bounds [0, 1) for SGP4")
incl_str = line2[8:16].strip()
if incl_str:
incl = float(incl_str)
if incl < 0.0 or incl > 180.0:
raise ValueError(f"Inclination {incl} out of bounds [0, 180]")
except ValueError as e:
raise InvalidTLEError(
f"Physical bounds violation in TLE: {e}",
norad_id=norad_id,
object_name=name,
invalid_line=line2,
reason="BOUNDS_VIOLATION",
)
# 8. Extract classification character (U=Unclassified, C=Classified, S=Secret)
# NOTE: This is a SECURITY classification, not an object type.
# Object type (PAYLOAD/DEBRIS/ROCKET_BODY) requires SATCAT lookup.
classification_flag = line1[7]
bstar_str = line1[53:61].strip()
try:
if bstar_str.startswith("-"):
bstar_sign = "-"
bstar_str = bstar_str[1:]
else:
bstar_sign = ""
bstar_str = bstar_str.replace(" ", "")
if len(bstar_str) >= 2 and bstar_str[-2] in "+-":
bstar = float(f"{bstar_sign}0.{bstar_str[:-2]}e{bstar_str[-2:]}")
else:
bstar = float(f"{bstar_sign}0.{bstar_str[:-1]}e{bstar_str[-1:]}")
except Exception:
bstar = 0.0
# 8. Default object_type to UNKNOWN — to be overridden by SATCAT enrichment
object_type = "UNKNOWN"
# 9. Instantiate and return
return SatelliteTLE(
norad_id=norad_id,
name=name,
line1=line1,
line2=line2,
epoch_jd=epoch_jd,
object_type=object_type,
classification_flag=classification_flag,
bstar=bstar,
)
[docs]
def validate_tle(name: str, line1: str, line2: str) -> bool:
"""Non-destructive validation of TLE strings.
Args:
name: Object name string.
line1: TLE line 1.
line2: TLE line 2.
Returns:
True if TLE is well-formed and checksums pass, False otherwise.
"""
try:
parse_tle(name, line1, line2)
return True
except InvalidTLEError:
return False
def _chunk_tle_lines(tle_lines: list[str]) -> list[tuple[str, str, str]]:
"""Group a list of TLE lines into triplets (name, line1, line2).
Supports both 3-line format (with name) and 2-line format (auto-generates 'Unknown').
Silently skips empty lines and invalid headers.
"""
lines = [L.strip() for L in tle_lines if L.strip()]
triplets = []
i = 0
n = len(lines)
while i < n:
if not lines[i].startswith("1 "):
if (
i + 1 < n
and lines[i + 1].startswith("1 ")
and i + 2 < n
and lines[i + 2].startswith("2 ")
):
triplets.append((lines[i], lines[i + 1], lines[i + 2]))
i += 3
else:
i += 1
continue
if i + 1 < n and lines[i + 1].startswith("2 "):
norad_id = lines[i][2:7].strip()
synthetic_name = f"NORAD-{norad_id}" if norad_id else "Unknown"
triplets.append((synthetic_name, lines[i], lines[i + 1]))
i += 2
else:
i += 1
return triplets
def _compute_checksum(line: str) -> int:
"""Compute the modulo-10 checksum of a TLE line.
The checksum is the sum of all digits in the line, plus 1 for each
minus sign (-). All other characters are ignored. The final sum
is calculated modulo 10.
"""
total = 0
# The checksum is calculated over the first 68 characters.
for char in line[:68]:
if char.isdigit():
total += int(char)
elif char == "-":
total += 1
return total % 10
[docs]
def load_tle_catalog(tle_lines: list[str]) -> list[SatelliteTLE]:
"""Parse a batch of TLE text lines into SatelliteTLE objects.
Invalid TLEs are skipped with a logged warning, unless ``ASTRA_STRICT_MODE``
is enabled. In STRICT mode, a single invalid TLE will raise an
``InvalidTLEError`` and abort the entire catalog load to prevent partial ingestion.
Args:
tle_lines: A flat list of strings, typically expected to be in
triplets: name, line1, line2.
Returns:
List of successfully parsed SatelliteTLE objects.
Raises:
AstraError: If total parse failure occurs (result is empty but input
was non-empty).
"""
if not tle_lines:
return []
triplets = _chunk_tle_lines(tle_lines)
if not triplets and any(L.strip() for L in tle_lines):
raise AstraError("Failed to parse any TLE triplets from input lines.")
from astra.config import ASTRA_STRICT_MODE
results: list[SatelliteTLE] = []
for name, line1, line2 in triplets:
try:
sat = parse_tle(name, line1, line2)
results.append(sat)
except InvalidTLEError as e:
if ASTRA_STRICT_MODE:
# In strict mode, a single invalid TLE fails the entire load (SE-I).
raise
norad_id = e.norad_id or "UNKNOWN"
logger.warning(
f"Skipping invalid TLE for {name} (NORAD {norad_id}): {e.message}"
)
if not results and any(L.strip() for L in tle_lines):
raise InvalidTLEError(
"Total parse failure: no valid TLEs found in non-empty catalog input. "
"Verify that input lines follow 3-line (name + L1 + L2) or 2-line (L1 + L2) format.",
reason="TOTAL_PARSE_FAILURE",
)
return results