# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Local Recording Sphere Detector for Local-Scale Simulations
Provides a spherical detection surface centered at a specified position
for local-scale simulations without Earth curvature effects.
Examples
--------
>>> from lsurf.detectors.extended import LocalRecordingSphereDetector
>>>
>>> # Create detector with 33 km radius at origin
>>> detector = LocalRecordingSphereDetector(radius=33000.0)
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
"""
import numpy as np
from numpy.typing import NDArray
from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult
[docs]
class LocalRecordingSphereDetector:
"""
Spherical detection surface centered at a specified position.
Records all rays that intersect the sphere, useful for local-scale
simulations without Earth curvature effects.
Parameters
----------
radius : float
Sphere radius in meters (default 33 km)
center : tuple of float
Center position (default (0, 0, 0))
name : str
Detector name for identification
Attributes
----------
sphere_radius : float
Radius of detection sphere in meters.
center : ndarray, shape (3,)
Center position of the sphere.
name : str
Detector name.
accumulated_result : DetectorResult
All accumulated detections since last clear().
Examples
--------
>>> # Local detector at origin with 33 km radius
>>> detector = LocalRecordingSphereDetector(radius=33000.0)
>>> result = detector.detect(rays)
>>>
>>> # Detector at specific location
>>> detector = LocalRecordingSphereDetector(
... radius=10000.0,
... center=(1000.0, 0.0, 500.0)
... )
"""
[docs]
def __init__(
self,
radius: float = 33000.0,
center: tuple[float, float, float] = (0, 0, 0),
name: str = "Local Recording Sphere",
):
"""
Initialize local recording sphere detector.
Parameters
----------
radius : float
Sphere radius in meters.
center : tuple of float
Center position.
name : str
Detector name.
"""
self.name = name
self._radius = radius
self._center = np.array(center, dtype=np.float64)
self._accumulated_result = DetectorResult.empty(name)
self._events: list[DetectionEvent] = []
@property
def sphere_radius(self) -> float:
"""Radius of the detection sphere in meters."""
return self._radius
@property
def center(self) -> NDArray[np.float64]:
"""Center position of the sphere."""
return self._center
@property
def accumulated_result(self) -> DetectorResult:
"""All accumulated detections since last clear()."""
return self._accumulated_result
@property
def events(self) -> list[DetectionEvent]:
"""
Backward compatibility: list of DetectionEvent objects.
For new code, use accumulated_result instead.
"""
return self._events
[docs]
def detect(
self,
rays: RayBatch,
current_time: float = 0.0,
accumulate: bool = True,
compute_travel_time: bool = True,
speed_of_light: float = 299792458.0,
) -> DetectorResult:
"""
Detect rays intersecting the recording sphere.
Parameters
----------
rays : RayBatch
Rays to detect
current_time : float
Current simulation time (unused, for interface compatibility)
accumulate : bool
Whether to accumulate results. Default is True.
compute_travel_time : bool
If True, add travel time to intersection to ray's accumulated time
speed_of_light : float
Speed of light for time computation
Returns
-------
DetectorResult
Detection results for all intersecting rays
"""
active_mask = rays.active
if not np.any(active_mask):
return DetectorResult.empty(self.name)
origins = rays.positions[active_mask].astype(np.float64)
directions = rays.directions[active_mask].astype(np.float64)
active_indices = np.where(active_mask)[0]
# Ray-sphere intersection
oc = origins - self._center
a = np.sum(directions**2, axis=1)
b = 2 * np.sum(oc * directions, axis=1)
c = np.sum(oc**2, axis=1) - self._radius**2
discriminant = b**2 - 4 * a * c
hit_mask = discriminant >= 0
if not np.any(hit_mask):
return DetectorResult.empty(self.name)
sqrt_disc = np.sqrt(discriminant[hit_mask])
t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask])
t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask])
# Take the positive intersection (forward along ray)
# For rays inside the sphere, take t2 (exit point)
t = np.where(t1 > 0, t1, t2)
valid = t > 1e-6
if not np.any(valid):
return DetectorResult.empty(self.name)
# Get indices into original rays
hit_indices = active_indices[hit_mask]
valid_indices = hit_indices[valid]
# Compute intersection positions
t_valid = t[valid]
hit_origins = origins[hit_mask][valid]
hit_directions = directions[hit_mask][valid]
intersection_positions = hit_origins + t_valid[:, np.newaxis] * hit_directions
# Compute times
if compute_travel_time:
times = rays.accumulated_time[valid_indices] + t_valid / speed_of_light
else:
times = rays.accumulated_time[valid_indices].copy()
# Get polarization vectors if available
polarization_vectors = None
if rays.polarization_vector is not None:
polarization_vectors = rays.polarization_vector[valid_indices].astype(
np.float32
)
result = DetectorResult(
positions=intersection_positions.astype(np.float32),
directions=hit_directions.astype(np.float32),
times=times.astype(np.float32),
intensities=rays.intensities[valid_indices].astype(np.float32),
wavelengths=rays.wavelengths[valid_indices].astype(np.float32),
ray_indices=valid_indices.astype(np.int32),
generations=rays.generations[valid_indices].astype(np.int32),
polarization_vectors=polarization_vectors,
detector_name=self.name,
)
if accumulate:
self._accumulated_result = DetectorResult.merge(
[self._accumulated_result, result]
)
self._events.extend(result.to_detection_events())
return result
[docs]
def detect_rays(
self,
rays: RayBatch,
compute_travel_time: bool = True,
speed_of_light: float = 299792458.0,
) -> DetectorResult:
"""
Detect rays (alias for detect with accumulate=False).
This method is provided for backward compatibility with code
that used the detect_rays() method.
Parameters
----------
rays : RayBatch
Rays to detect
compute_travel_time : bool
If True, add travel time to intersection
speed_of_light : float
Speed of light for time computation
Returns
-------
DetectorResult
Detection results
"""
return self.detect(
rays,
accumulate=False,
compute_travel_time=compute_travel_time,
speed_of_light=speed_of_light,
)
[docs]
def record_rays(self, rays: RayBatch) -> DetectorResult:
"""
Record rays (backward compatibility alias for detect).
Parameters
----------
rays : RayBatch
Rays to record
Returns
-------
DetectorResult
Detection results
"""
return self.detect(rays, accumulate=False)
[docs]
def clear(self) -> None:
"""
Clear all recorded detections.
Resets the detector to its initial state with no recorded events.
"""
self._accumulated_result = DetectorResult.empty(self.name)
self._events = []
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"LocalRecordingSphereDetector(radius={self._radius/1000:.1f}km, "
f"center={self._center.tolist()}, rays={self._accumulated_result.num_rays})"
)
[docs]
def __len__(self) -> int:
"""Return number of detected rays."""
return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs]
def get_arrival_times(self) -> NDArray[np.float64]:
"""Get array of all arrival times."""
return self._accumulated_result.times.astype(np.float64)
[docs]
def get_arrival_angles(
self, reference_direction: NDArray[np.float32]
) -> NDArray[np.float64]:
"""Get angles between ray directions and reference direction."""
if self._accumulated_result.is_empty:
return np.array([], dtype=np.float64)
ref = reference_direction / np.linalg.norm(reference_direction)
dir_norms = self._accumulated_result.directions / np.linalg.norm(
self._accumulated_result.directions, axis=1, keepdims=True
)
cos_angles = np.dot(dir_norms, ref)
cos_angles = np.clip(cos_angles, -1.0, 1.0)
return np.arccos(cos_angles).astype(np.float64)
[docs]
def get_intensities(self) -> NDArray[np.float64]:
"""Get array of all detected intensities."""
return self._accumulated_result.intensities.astype(np.float64)
[docs]
def get_wavelengths(self) -> NDArray[np.float64]:
"""Get array of all detected wavelengths."""
return self._accumulated_result.wavelengths.astype(np.float64)
[docs]
def get_positions(self) -> NDArray[np.float32]:
"""Get array of all detection positions."""
return self._accumulated_result.positions.astype(np.float32)
[docs]
def get_total_intensity(self) -> float:
"""Get sum of all detected intensities."""
return self._accumulated_result.total_intensity