Source code for lsurf.detectors.extended.recording_sphere

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Recording Sphere Detector for Earth-Scale Simulations

Provides a spherical detection surface at a specified altitude above Earth
for capturing rays in global-scale atmospheric simulations.

Examples
--------
>>> from lsurf.detectors.extended import RecordingSphereDetector
>>>
>>> # Create detector at 33 km altitude
>>> detector = RecordingSphereDetector(altitude=33000.0)
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
>>>
>>> # Compute angular distribution
>>> coords = result.compute_angular_coordinates()
>>> elevation = np.degrees(coords['elevation'])
"""

import numpy as np
from numpy.typing import NDArray

from ...surfaces import EARTH_RADIUS
from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult


[docs] class RecordingSphereDetector: """ Spherical detection surface at a specified altitude above Earth. Records all rays that intersect the sphere, capturing full ray state for later analysis. Used for Earth-scale simulations where the sphere is centered on Earth's center. Parameters ---------- altitude : float Altitude above Earth's surface in meters (default 33 km) earth_center : tuple of float Center of Earth, default (0, 0, -EARTH_RADIUS) earth_radius : float Earth radius in meters name : str Detector name for identification Attributes ---------- altitude : float Altitude above Earth's surface in meters. sphere_radius : float Radius of detection sphere (earth_radius + altitude). center : ndarray, shape (3,) Center position (Earth's center). name : str Detector name. accumulated_result : DetectorResult All accumulated detections since last clear(). Notes ----- The recording sphere has radius = earth_radius + altitude, centered at earth_center. This creates a sphere that surrounds Earth at the specified altitude. Examples -------- >>> # Detector at 33 km altitude >>> detector = RecordingSphereDetector(altitude=33000.0) >>> result = detector.detect(rays) >>> >>> # Access angular coordinates >>> coords = result.compute_angular_coordinates() """
[docs] def __init__( self, altitude: float = 33000.0, # 33 km default earth_center: tuple[float, float, float] = (0, 0, -EARTH_RADIUS), earth_radius: float = EARTH_RADIUS, name: str = "Recording Sphere", ): """ Initialize recording sphere detector. Parameters ---------- altitude : float Altitude above Earth's surface in meters. earth_center : tuple of float Center of Earth. earth_radius : float Earth radius in meters. name : str Detector name. """ self.name = name self.altitude = altitude self._earth_center = np.array(earth_center, dtype=np.float64) self.earth_radius = earth_radius self._sphere_radius = earth_radius + altitude self._accumulated_result = DetectorResult.empty(name) self._events: list[DetectionEvent] = []
@property def sphere_radius(self) -> float: """Radius of the detection sphere in meters.""" return self._sphere_radius @property def center(self) -> NDArray[np.float64]: """Center position of the sphere (Earth's center).""" return self._earth_center @property def accumulated_result(self) -> DetectorResult: """All accumulated detections since last clear().""" return self._accumulated_result @property def events(self) -> list[DetectionEvent]: """ Backward compatibility: list of DetectionEvent objects. For new code, use accumulated_result instead. """ return self._events
[docs] def detect( self, rays: RayBatch, current_time: float = 0.0, accumulate: bool = True, compute_travel_time: bool = True, speed_of_light: float = 299792458.0, max_propagation_distance: float | None = None, ) -> DetectorResult: """ Detect rays intersecting the recording sphere. Parameters ---------- rays : RayBatch Rays to detect current_time : float Current simulation time (unused, for interface compatibility) accumulate : bool Whether to accumulate results. Default is True. compute_travel_time : bool If True, add travel time to intersection to ray's accumulated time speed_of_light : float Speed of light for time computation max_propagation_distance : float, optional Maximum distance rays can propagate before detection (meters). If None, no limit is applied. Returns ------- DetectorResult Detection results for all intersecting rays """ active_mask = rays.active if not np.any(active_mask): return DetectorResult.empty(self.name) origins = rays.positions[active_mask].astype(np.float64) directions = rays.directions[active_mask].astype(np.float64) active_indices = np.where(active_mask)[0] # Ray-sphere intersection # Ray: P = O + t*D # Sphere: |P - C|^2 = R^2 oc = origins - self._earth_center a = np.sum(directions**2, axis=1) b = 2 * np.sum(oc * directions, axis=1) c = np.sum(oc**2, axis=1) - self._sphere_radius**2 discriminant = b**2 - 4 * a * c hit_mask = discriminant >= 0 if not np.any(hit_mask): return DetectorResult.empty(self.name) sqrt_disc = np.sqrt(discriminant[hit_mask]) t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask]) t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask]) # Take the positive intersection (rays going outward) t = np.where(t1 > 0, t1, t2) valid = t > 1e-6 # Apply max propagation distance if specified if max_propagation_distance is not None: valid = valid & (t < max_propagation_distance) if not np.any(valid): return DetectorResult.empty(self.name) # Get indices into original rays hit_indices = active_indices[hit_mask] valid_indices = hit_indices[valid] # Compute intersection positions t_valid = t[valid] hit_origins = origins[hit_mask][valid] hit_directions = directions[hit_mask][valid] intersection_positions = hit_origins + t_valid[:, np.newaxis] * hit_directions # Compute times if compute_travel_time: times = rays.accumulated_time[valid_indices] + t_valid / speed_of_light else: times = rays.accumulated_time[valid_indices].copy() # Get polarization vectors if available polarization_vectors = None if rays.polarization_vector is not None: polarization_vectors = rays.polarization_vector[valid_indices].astype( np.float32 ) result = DetectorResult( positions=intersection_positions.astype(np.float32), directions=hit_directions.astype(np.float32), times=times.astype(np.float32), intensities=rays.intensities[valid_indices].astype(np.float32), wavelengths=rays.wavelengths[valid_indices].astype(np.float32), ray_indices=valid_indices.astype(np.int32), generations=rays.generations[valid_indices].astype(np.int32), polarization_vectors=polarization_vectors, detector_name=self.name, ) if accumulate: self._accumulated_result = DetectorResult.merge( [self._accumulated_result, result] ) self._events.extend(result.to_detection_events()) return result
[docs] def detect_rays( self, rays: RayBatch, compute_travel_time: bool = True, speed_of_light: float = 299792458.0, max_propagation_distance: float | None = None, ) -> DetectorResult: """ Detect rays (alias for detect with accumulate=False). This method is provided for backward compatibility with code that used the detect_rays() method. Parameters ---------- rays : RayBatch Rays to detect compute_travel_time : bool If True, add travel time to intersection speed_of_light : float Speed of light for time computation max_propagation_distance : float, optional Maximum distance rays can propagate Returns ------- DetectorResult Detection results """ return self.detect( rays, accumulate=False, compute_travel_time=compute_travel_time, speed_of_light=speed_of_light, max_propagation_distance=max_propagation_distance, )
[docs] def clear(self) -> None: """ Clear all recorded detections. Resets the detector to its initial state with no recorded events. """ self._accumulated_result = DetectorResult.empty(self.name) self._events = []
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"RecordingSphereDetector(altitude={self.altitude/1000:.1f}km, " f"rays={self._accumulated_result.num_rays})" )
[docs] def __len__(self) -> int: """Return number of detected rays.""" return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs] def get_arrival_times(self) -> NDArray[np.float64]: """Get array of all arrival times.""" return self._accumulated_result.times.astype(np.float64)
[docs] def get_arrival_angles( self, reference_direction: NDArray[np.float32] ) -> NDArray[np.float64]: """Get angles between ray directions and reference direction.""" if self._accumulated_result.is_empty: return np.array([], dtype=np.float64) ref = reference_direction / np.linalg.norm(reference_direction) dir_norms = self._accumulated_result.directions / np.linalg.norm( self._accumulated_result.directions, axis=1, keepdims=True ) cos_angles = np.dot(dir_norms, ref) cos_angles = np.clip(cos_angles, -1.0, 1.0) return np.arccos(cos_angles).astype(np.float64)
[docs] def get_intensities(self) -> NDArray[np.float64]: """Get array of all detected intensities.""" return self._accumulated_result.intensities.astype(np.float64)
[docs] def get_wavelengths(self) -> NDArray[np.float64]: """Get array of all detected wavelengths.""" return self._accumulated_result.wavelengths.astype(np.float64)
[docs] def get_positions(self) -> NDArray[np.float32]: """Get array of all detection positions.""" return self._accumulated_result.positions.astype(np.float32)
[docs] def get_total_intensity(self) -> float: """Get sum of all detected intensities.""" return self._accumulated_result.total_intensity