# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Recording Sphere Detector for Earth-Scale Simulations
Provides a spherical detection surface at a specified altitude above Earth
for capturing rays in global-scale atmospheric simulations.
Examples
--------
>>> from lsurf.detectors.extended import RecordingSphereDetector
>>>
>>> # Create detector at 33 km altitude
>>> detector = RecordingSphereDetector(altitude=33000.0)
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
>>>
>>> # Compute angular distribution
>>> coords = result.compute_angular_coordinates()
>>> elevation = np.degrees(coords['elevation'])
"""
import numpy as np
from numpy.typing import NDArray
from ...surfaces import EARTH_RADIUS
from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult
[docs]
class RecordingSphereDetector:
"""
Spherical detection surface at a specified altitude above Earth.
Records all rays that intersect the sphere, capturing full ray state
for later analysis. Used for Earth-scale simulations where the sphere
is centered on Earth's center.
Parameters
----------
altitude : float
Altitude above Earth's surface in meters (default 33 km)
earth_center : tuple of float
Center of Earth, default (0, 0, -EARTH_RADIUS)
earth_radius : float
Earth radius in meters
name : str
Detector name for identification
Attributes
----------
altitude : float
Altitude above Earth's surface in meters.
sphere_radius : float
Radius of detection sphere (earth_radius + altitude).
center : ndarray, shape (3,)
Center position (Earth's center).
name : str
Detector name.
accumulated_result : DetectorResult
All accumulated detections since last clear().
Notes
-----
The recording sphere has radius = earth_radius + altitude, centered
at earth_center. This creates a sphere that surrounds Earth at the
specified altitude.
Examples
--------
>>> # Detector at 33 km altitude
>>> detector = RecordingSphereDetector(altitude=33000.0)
>>> result = detector.detect(rays)
>>>
>>> # Access angular coordinates
>>> coords = result.compute_angular_coordinates()
"""
[docs]
def __init__(
self,
altitude: float = 33000.0, # 33 km default
earth_center: tuple[float, float, float] = (0, 0, -EARTH_RADIUS),
earth_radius: float = EARTH_RADIUS,
name: str = "Recording Sphere",
):
"""
Initialize recording sphere detector.
Parameters
----------
altitude : float
Altitude above Earth's surface in meters.
earth_center : tuple of float
Center of Earth.
earth_radius : float
Earth radius in meters.
name : str
Detector name.
"""
self.name = name
self.altitude = altitude
self._earth_center = np.array(earth_center, dtype=np.float64)
self.earth_radius = earth_radius
self._sphere_radius = earth_radius + altitude
self._accumulated_result = DetectorResult.empty(name)
self._events: list[DetectionEvent] = []
@property
def sphere_radius(self) -> float:
"""Radius of the detection sphere in meters."""
return self._sphere_radius
@property
def center(self) -> NDArray[np.float64]:
"""Center position of the sphere (Earth's center)."""
return self._earth_center
@property
def accumulated_result(self) -> DetectorResult:
"""All accumulated detections since last clear()."""
return self._accumulated_result
@property
def events(self) -> list[DetectionEvent]:
"""
Backward compatibility: list of DetectionEvent objects.
For new code, use accumulated_result instead.
"""
return self._events
[docs]
def detect(
self,
rays: RayBatch,
current_time: float = 0.0,
accumulate: bool = True,
compute_travel_time: bool = True,
speed_of_light: float = 299792458.0,
max_propagation_distance: float | None = None,
) -> DetectorResult:
"""
Detect rays intersecting the recording sphere.
Parameters
----------
rays : RayBatch
Rays to detect
current_time : float
Current simulation time (unused, for interface compatibility)
accumulate : bool
Whether to accumulate results. Default is True.
compute_travel_time : bool
If True, add travel time to intersection to ray's accumulated time
speed_of_light : float
Speed of light for time computation
max_propagation_distance : float, optional
Maximum distance rays can propagate before detection (meters).
If None, no limit is applied.
Returns
-------
DetectorResult
Detection results for all intersecting rays
"""
active_mask = rays.active
if not np.any(active_mask):
return DetectorResult.empty(self.name)
origins = rays.positions[active_mask].astype(np.float64)
directions = rays.directions[active_mask].astype(np.float64)
active_indices = np.where(active_mask)[0]
# Ray-sphere intersection
# Ray: P = O + t*D
# Sphere: |P - C|^2 = R^2
oc = origins - self._earth_center
a = np.sum(directions**2, axis=1)
b = 2 * np.sum(oc * directions, axis=1)
c = np.sum(oc**2, axis=1) - self._sphere_radius**2
discriminant = b**2 - 4 * a * c
hit_mask = discriminant >= 0
if not np.any(hit_mask):
return DetectorResult.empty(self.name)
sqrt_disc = np.sqrt(discriminant[hit_mask])
t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask])
t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask])
# Take the positive intersection (rays going outward)
t = np.where(t1 > 0, t1, t2)
valid = t > 1e-6
# Apply max propagation distance if specified
if max_propagation_distance is not None:
valid = valid & (t < max_propagation_distance)
if not np.any(valid):
return DetectorResult.empty(self.name)
# Get indices into original rays
hit_indices = active_indices[hit_mask]
valid_indices = hit_indices[valid]
# Compute intersection positions
t_valid = t[valid]
hit_origins = origins[hit_mask][valid]
hit_directions = directions[hit_mask][valid]
intersection_positions = hit_origins + t_valid[:, np.newaxis] * hit_directions
# Compute times
if compute_travel_time:
times = rays.accumulated_time[valid_indices] + t_valid / speed_of_light
else:
times = rays.accumulated_time[valid_indices].copy()
# Get polarization vectors if available
polarization_vectors = None
if rays.polarization_vector is not None:
polarization_vectors = rays.polarization_vector[valid_indices].astype(
np.float32
)
result = DetectorResult(
positions=intersection_positions.astype(np.float32),
directions=hit_directions.astype(np.float32),
times=times.astype(np.float32),
intensities=rays.intensities[valid_indices].astype(np.float32),
wavelengths=rays.wavelengths[valid_indices].astype(np.float32),
ray_indices=valid_indices.astype(np.int32),
generations=rays.generations[valid_indices].astype(np.int32),
polarization_vectors=polarization_vectors,
detector_name=self.name,
)
if accumulate:
self._accumulated_result = DetectorResult.merge(
[self._accumulated_result, result]
)
self._events.extend(result.to_detection_events())
return result
[docs]
def detect_rays(
self,
rays: RayBatch,
compute_travel_time: bool = True,
speed_of_light: float = 299792458.0,
max_propagation_distance: float | None = None,
) -> DetectorResult:
"""
Detect rays (alias for detect with accumulate=False).
This method is provided for backward compatibility with code
that used the detect_rays() method.
Parameters
----------
rays : RayBatch
Rays to detect
compute_travel_time : bool
If True, add travel time to intersection
speed_of_light : float
Speed of light for time computation
max_propagation_distance : float, optional
Maximum distance rays can propagate
Returns
-------
DetectorResult
Detection results
"""
return self.detect(
rays,
accumulate=False,
compute_travel_time=compute_travel_time,
speed_of_light=speed_of_light,
max_propagation_distance=max_propagation_distance,
)
[docs]
def clear(self) -> None:
"""
Clear all recorded detections.
Resets the detector to its initial state with no recorded events.
"""
self._accumulated_result = DetectorResult.empty(self.name)
self._events = []
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"RecordingSphereDetector(altitude={self.altitude/1000:.1f}km, "
f"rays={self._accumulated_result.num_rays})"
)
[docs]
def __len__(self) -> int:
"""Return number of detected rays."""
return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs]
def get_arrival_times(self) -> NDArray[np.float64]:
"""Get array of all arrival times."""
return self._accumulated_result.times.astype(np.float64)
[docs]
def get_arrival_angles(
self, reference_direction: NDArray[np.float32]
) -> NDArray[np.float64]:
"""Get angles between ray directions and reference direction."""
if self._accumulated_result.is_empty:
return np.array([], dtype=np.float64)
ref = reference_direction / np.linalg.norm(reference_direction)
dir_norms = self._accumulated_result.directions / np.linalg.norm(
self._accumulated_result.directions, axis=1, keepdims=True
)
cos_angles = np.dot(dir_norms, ref)
cos_angles = np.clip(cos_angles, -1.0, 1.0)
return np.arccos(cos_angles).astype(np.float64)
[docs]
def get_intensities(self) -> NDArray[np.float64]:
"""Get array of all detected intensities."""
return self._accumulated_result.intensities.astype(np.float64)
[docs]
def get_wavelengths(self) -> NDArray[np.float64]:
"""Get array of all detected wavelengths."""
return self._accumulated_result.wavelengths.astype(np.float64)
[docs]
def get_positions(self) -> NDArray[np.float32]:
"""Get array of all detection positions."""
return self._accumulated_result.positions.astype(np.float32)
[docs]
def get_total_intensity(self) -> float:
"""Get sum of all detected intensities."""
return self._accumulated_result.total_intensity