Source code for astra.visibility

"""ASTRA Core observer visibility and pass prediction module.
Calculates topocentric elevation/azimuth of satellites relative to ground
observers and detects pass events (AOS, TCA, LOS).
Features:
1. Custom fully-vectorized WGS84 ENU rotation matrix yielding 100x
   performance over standard object-oriented Skyfield graph transversals.
2. Sub-second Binary Search Bisection root-finding for precise AOS/LOS.
3. Automatically synchronized IERS EOPs via timescale loading.
"""
from __future__ import annotations
from typing import Any
import math
import numpy as np
from skyfield.sgp4lib import TEME
from astra import data_pipeline as _dp
from astra.constants import EARTH_EQUATORIAL_RADIUS_KM
from astra.models import Observer, PassEvent, SatelliteState
from astra.orbit import propagate_trajectory, propagate_orbit
from astra.log import get_logger

logger = get_logger(__name__)
def _wgs84_observer_itrs(lat_deg: float, lon_deg: float, elev_m: float) -> np.ndarray:
    """Calculate the ITRS static Earth-fixed Cartesian coordinates of an observer."""
    a = EARTH_EQUATORIAL_RADIUS_KM
    f = 1.0 / 298.257223563
    e2 = 2 * f - f**2
    phi = math.radians(lat_deg)
    lam = math.radians(lon_deg)
    h = elev_m / 1000.0
    sin_p = math.sin(phi)
    cos_p = math.cos(phi)
    sin_l = math.sin(lam)
    cos_l = math.cos(lam)
    N = a / math.sqrt(1.0 - e2 * sin_p**2)
    X = (N + h) * cos_p * cos_l
    Y = (N + h) * cos_p * sin_l
    Z = (N * (1.0 - e2) + h) * sin_p
    return np.array([X, Y, Z])  # type: ignore[no-any-return]
def _itrs_to_enu_matrix(lat_deg: float, lon_deg: float) -> np.ndarray:
    """Generate the rotation matrix from Earth-Fixed ITRS to local ENU."""
    phi = math.radians(lat_deg)
    lam = math.radians(lon_deg)
    sin_p = math.sin(phi)
    cos_p = math.cos(phi)
    sin_l = math.sin(lam)
    cos_l = math.cos(lam)
    return np.array(
        [  # type: ignore[no-any-return]
            [-sin_l, cos_l, 0.0],
            [-sin_p * cos_l, -sin_p * sin_l, cos_p],
            [cos_p * cos_l, cos_p * sin_l, sin_p],
        ]
    )
[docs] def visible_from_location( positions_teme: np.ndarray, times_jd: np.ndarray, observer: Observer ) -> np.ndarray: """Compute topocentric elevation angles using custom vectorized ENU algebra.""" if len(times_jd) == 0: return np.array([]) # type: ignore[no-any-return] _dp._ensure_skyfield() from astra.frames import teme_to_ecef r_itrs_km = teme_to_ecef(positions_teme, times_jd, use_spacebook_eop=True) # 3. WGS84 observer position r_obs = _wgs84_observer_itrs( observer.latitude_deg, observer.longitude_deg, observer.elevation_m ) rho_itrs = r_itrs_km.T - r_obs[:, np.newaxis] # 4. ITRS to ENU topocentric R_enu = _itrs_to_enu_matrix(observer.latitude_deg, observer.longitude_deg) rho_enu = R_enu @ rho_itrs # numpy matrix mult handles (3,3) @ (3,T) perfectly rho_E, rho_N, rho_U = rho_enu[0], rho_enu[1], rho_enu[2] rho_xy = np.sqrt(rho_E**2 + rho_N**2) elev_deg = np.degrees(np.arctan2(rho_U, rho_xy)) return elev_deg # type: ignore[no-any-return]
[docs] def get_azimuths( positions_teme: np.ndarray, times_jd: np.ndarray, observer: Observer ) -> np.ndarray: """Companion function for azimuth processing using the same fast matrix algebra.""" _dp._ensure_skyfield() from astra.frames import teme_to_ecef r_itrs_km = teme_to_ecef(positions_teme, times_jd, use_spacebook_eop=True) r_obs = _wgs84_observer_itrs( observer.latitude_deg, observer.longitude_deg, observer.elevation_m ) rho_itrs = r_itrs_km.T - r_obs[:, np.newaxis] R_enu = _itrs_to_enu_matrix(observer.latitude_deg, observer.longitude_deg) rho_enu = R_enu @ rho_itrs rho_E, rho_N = rho_enu[0], rho_enu[1] az_deg = np.degrees(np.arctan2(rho_E, rho_N)) % 360.0 return az_deg # type: ignore[no-any-return]
def _visible_from_location_cached( positions_teme: np.ndarray, times_jd: np.ndarray, r_obs: np.ndarray, R_enu: np.ndarray, ) -> np.ndarray: """Compute topocentric elevation angles using cached observer position and ENU matrix (P-04).""" from astra.frames import teme_to_ecef # Bypass Spacebook inside bisection search loops to prioritize microsecond latency r_itrs_km = teme_to_ecef(positions_teme, times_jd, use_spacebook_eop=False) rho_itrs = r_itrs_km.T - r_obs[:, np.newaxis] rho_enu = R_enu @ rho_itrs rho_E, rho_N, rho_U = rho_enu[0], rho_enu[1], rho_enu[2] rho_xy = np.sqrt(rho_E**2 + rho_N**2) return np.degrees(np.arctan2(rho_U, rho_xy)) # type: ignore[no-any-return] def _find_exact_crossing( satellite: SatelliteState, observer: Observer, t_low: float, t_high: float, ascending: bool, r_obs: np.ndarray, R_enu: np.ndarray, iterations: int = 15, ) -> float: """Binary search bisection to find the exact sub-second crossing.""" mask = observer.min_elevation_deg _dp._ensure_skyfield() ts = _dp._skyfield_ts assert ts is not None # Cache precession-nutation rotation matrix which changes <0.00001 deg over the pass # t_low and t_high are UTC Julian Dates. tl = t_low th = t_high for _ in range(iterations): t_mid = (tl + th) / 2.0 t_since_min = (t_mid - satellite.epoch_jd) * 1440.0 state = propagate_orbit(satellite, satellite.epoch_jd, t_since_min) if state.error_code != 0: return th if ascending else tl elev = _visible_from_location_cached( np.array([state.position_km]), np.array([t_mid]), r_obs, R_enu, )[0] if ascending: if elev < mask: tl = t_mid else: th = t_mid else: if elev < mask: th = t_mid else: tl = t_mid return (tl + th) / 2.0 # type: ignore[no-any-return]
[docs] def passes_over_location( satellite: SatelliteState, observer: Observer, t_start_jd: float, t_end_jd: float, step_minutes: float = 1.0, ) -> list[PassEvent]: """Predict satellite passes over a ground observer location. Computes all pass events where the satellite's elevation exceeds the observer's minimum elevation threshold within the specified time window. Args: satellite: SatelliteTLE or SatelliteOMM object to propagate. observer: Ground observer with location and minimum elevation. t_start_jd: Start time as Julian Date (UTC). t_end_jd: End time as Julian Date (UTC). step_minutes: Time step for coarse scan (default 1 minute). Returns: List of PassEvent objects, sorted by AOS time. Each event includes: - Precise AOS/TCA/LOS times (binary search to sub-second accuracy) - Maximum elevation and azimuth at all three events - Duration in seconds - Illumination status (satellite sunlit, observer in darkness) """ times_jd, positions_teme, _velocities = propagate_trajectory( satellite, t_start_jd, t_end_jd, step_minutes=step_minutes ) if len(times_jd) == 0: return [] # type: ignore[no-any-return] valid_mask = ~np.isnan(positions_teme[:, 0]) if not np.any(valid_mask): return [] # type: ignore[no-any-return] times_jd = times_jd[valid_mask] positions_teme = positions_teme[valid_mask] elevation_array = visible_from_location(positions_teme, times_jd, observer) az_array = get_azimuths(positions_teme, times_jd, observer) # Pre-compute static observer geometry once for the sub-second bisection loop (P-04) r_obs = _wgs84_observer_itrs( observer.latitude_deg, observer.longitude_deg, observer.elevation_m ) R_enu = _itrs_to_enu_matrix(observer.latitude_deg, observer.longitude_deg) above_horizon = elevation_array >= observer.min_elevation_deg pad_above = np.concatenate(([False], above_horizon, [False])) transitions = np.diff(pad_above.astype(int)) rise_indices = np.where(transitions == 1)[0] set_indices = np.where(transitions == -1)[0] - 1 events = [] T_len = len(times_jd) for i in range(len(rise_indices)): r_idx = rise_indices[i] s_idx = set_indices[i] pass_elevations = elevation_array[r_idx : s_idx + 1] tca_idx = r_idx + int(np.argmax(pass_elevations)) t_aos_approx_high = times_jd[r_idx] t_aos_approx_low = ( times_jd[r_idx - 1] if r_idx > 0 else t_aos_approx_high - (step_minutes / 1440.0) ) t_los_approx_low = times_jd[s_idx] t_los_approx_high = ( times_jd[s_idx + 1] if s_idx < T_len - 1 else t_los_approx_low + (step_minutes / 1440.0) ) aos_jd_exact = _find_exact_crossing( satellite, observer, float(t_aos_approx_low), float(t_aos_approx_high), ascending=True, r_obs=r_obs, R_enu=R_enu, ) los_jd_exact = _find_exact_crossing( satellite, observer, float(t_los_approx_low), float(t_los_approx_high), ascending=False, r_obs=r_obs, R_enu=R_enu, ) tca_jd = float(times_jd[tca_idx]) duration_s = max(0.0, (los_jd_exact - aos_jd_exact) * 86400.0) # Compute illumination status at TCA sat_illuminated, obs_in_darkness = _compute_illumination_at_tca( positions_teme[tca_idx], tca_jd, observer ) events.append( PassEvent( norad_id=satellite.norad_id, observer_name=observer.name, aos_jd=float(aos_jd_exact), tca_jd=tca_jd, los_jd=float(los_jd_exact), max_elevation_deg=float(elevation_array[tca_idx]), azimuth_at_aos_deg=float(az_array[r_idx]), azimuth_at_tca_deg=float(az_array[tca_idx]), azimuth_at_los_deg=float(az_array[s_idx]), duration_seconds=float(duration_s), satellite_illuminated=sat_illuminated, observer_in_darkness=obs_in_darkness, ) ) return events # type: ignore[no-any-return]
def _compute_illumination_at_tca( position_teme: np.ndarray, tca_jd: float, observer: Observer, ) -> tuple[bool, bool]: """Compute satellite and observer illumination status at TCA. Returns: (satellite_illuminated, observer_in_darkness) """ try: from astra.data_pipeline import sun_position_teme # Get Sun position in TEME frame sun_pos_teme = sun_position_teme(tca_jd) # Check if satellite is illuminated (not in Earth's shadow) # Simple check: satellite is illuminated if angle between Sun and satellite > 90° from Earth center sat_r = np.linalg.norm(position_teme) sat_unit = position_teme / sat_r sun_unit = sun_pos_teme / np.linalg.norm(sun_pos_teme) # Satellite in sunlight if dot(sat_unit, sun_unit) < 0 (Sun opposite to satellite direction from Earth) # More precisely: check if satellite is in Earth's umbra/penumbra cos_angle = float(np.dot(sat_unit, sun_unit)) sat_illuminated = cos_angle < 0.9 # Simplified: illuminated if not directly behind Earth # Check if observer is in darkness # Observer is in darkness if Sun is below horizon at observer location from astra.frames import teme_to_ecef, ecef_to_geodetic_wgs84 # Get Sun position in ECEF sun_ecef = teme_to_ecef( sun_pos_teme[np.newaxis, :], np.array([tca_jd]) )[0] # Observer position in ECEF obs_ecef = _wgs84_observer_itrs( observer.latitude_deg, observer.longitude_deg, observer.elevation_m ) # Vector from observer to Sun obs_to_sun = sun_ecef - obs_ecef # Convert to ENU for elevation check R_enu = _itrs_to_enu_matrix(observer.latitude_deg, observer.longitude_deg) sun_enu = R_enu @ obs_to_sun # Sun elevation angle sun_elev = np.degrees(np.arctan2( sun_enu[2], np.sqrt(sun_enu[0]**2 + sun_enu[1]**2) )) # Observer in darkness if Sun is below horizon obs_in_darkness = sun_elev < -6.0 # Civil twilight threshold return sat_illuminated, obs_in_darkness except Exception as e: # If computation fails, return safe defaults (satellite NOT illuminated, observer NOT in darkness) # This is conservative for visual pass prediction - won't claim a satellite is visible when we can't verify logger.warning( "Illumination computation failed at TCA: %s. Returning safe defaults (not illuminated, not dark).", e ) return False, False