Source code for lsurf.detectors.extended.local_sphere

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Local Recording Sphere Detector for Local-Scale Simulations

Provides a spherical detection surface centered at a specified position
for local-scale simulations without Earth curvature effects.

Examples
--------
>>> from lsurf.detectors.extended import LocalRecordingSphereDetector
>>>
>>> # Create detector with 33 km radius at origin
>>> detector = LocalRecordingSphereDetector(radius=33000.0)
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
"""

import numpy as np
from numpy.typing import NDArray

from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult


[docs] class LocalRecordingSphereDetector: """ Spherical detection surface centered at a specified position. Records all rays that intersect the sphere, useful for local-scale simulations without Earth curvature effects. Parameters ---------- radius : float Sphere radius in meters (default 33 km) center : tuple of float Center position (default (0, 0, 0)) name : str Detector name for identification Attributes ---------- sphere_radius : float Radius of detection sphere in meters. center : ndarray, shape (3,) Center position of the sphere. name : str Detector name. accumulated_result : DetectorResult All accumulated detections since last clear(). Examples -------- >>> # Local detector at origin with 33 km radius >>> detector = LocalRecordingSphereDetector(radius=33000.0) >>> result = detector.detect(rays) >>> >>> # Detector at specific location >>> detector = LocalRecordingSphereDetector( ... radius=10000.0, ... center=(1000.0, 0.0, 500.0) ... ) """
[docs] def __init__( self, radius: float = 33000.0, center: tuple[float, float, float] = (0, 0, 0), name: str = "Local Recording Sphere", ): """ Initialize local recording sphere detector. Parameters ---------- radius : float Sphere radius in meters. center : tuple of float Center position. name : str Detector name. """ self.name = name self._radius = radius self._center = np.array(center, dtype=np.float64) self._accumulated_result = DetectorResult.empty(name) self._events: list[DetectionEvent] = []
@property def sphere_radius(self) -> float: """Radius of the detection sphere in meters.""" return self._radius @property def center(self) -> NDArray[np.float64]: """Center position of the sphere.""" return self._center @property def accumulated_result(self) -> DetectorResult: """All accumulated detections since last clear().""" return self._accumulated_result @property def events(self) -> list[DetectionEvent]: """ Backward compatibility: list of DetectionEvent objects. For new code, use accumulated_result instead. """ return self._events
[docs] def detect( self, rays: RayBatch, current_time: float = 0.0, accumulate: bool = True, compute_travel_time: bool = True, speed_of_light: float = 299792458.0, ) -> DetectorResult: """ Detect rays intersecting the recording sphere. Parameters ---------- rays : RayBatch Rays to detect current_time : float Current simulation time (unused, for interface compatibility) accumulate : bool Whether to accumulate results. Default is True. compute_travel_time : bool If True, add travel time to intersection to ray's accumulated time speed_of_light : float Speed of light for time computation Returns ------- DetectorResult Detection results for all intersecting rays """ active_mask = rays.active if not np.any(active_mask): return DetectorResult.empty(self.name) origins = rays.positions[active_mask].astype(np.float64) directions = rays.directions[active_mask].astype(np.float64) active_indices = np.where(active_mask)[0] # Ray-sphere intersection oc = origins - self._center a = np.sum(directions**2, axis=1) b = 2 * np.sum(oc * directions, axis=1) c = np.sum(oc**2, axis=1) - self._radius**2 discriminant = b**2 - 4 * a * c hit_mask = discriminant >= 0 if not np.any(hit_mask): return DetectorResult.empty(self.name) sqrt_disc = np.sqrt(discriminant[hit_mask]) t1 = (-b[hit_mask] - sqrt_disc) / (2 * a[hit_mask]) t2 = (-b[hit_mask] + sqrt_disc) / (2 * a[hit_mask]) # Take the positive intersection (forward along ray) # For rays inside the sphere, take t2 (exit point) t = np.where(t1 > 0, t1, t2) valid = t > 1e-6 if not np.any(valid): return DetectorResult.empty(self.name) # Get indices into original rays hit_indices = active_indices[hit_mask] valid_indices = hit_indices[valid] # Compute intersection positions t_valid = t[valid] hit_origins = origins[hit_mask][valid] hit_directions = directions[hit_mask][valid] intersection_positions = hit_origins + t_valid[:, np.newaxis] * hit_directions # Compute times if compute_travel_time: times = rays.accumulated_time[valid_indices] + t_valid / speed_of_light else: times = rays.accumulated_time[valid_indices].copy() # Get polarization vectors if available polarization_vectors = None if rays.polarization_vector is not None: polarization_vectors = rays.polarization_vector[valid_indices].astype( np.float32 ) result = DetectorResult( positions=intersection_positions.astype(np.float32), directions=hit_directions.astype(np.float32), times=times.astype(np.float32), intensities=rays.intensities[valid_indices].astype(np.float32), wavelengths=rays.wavelengths[valid_indices].astype(np.float32), ray_indices=valid_indices.astype(np.int32), generations=rays.generations[valid_indices].astype(np.int32), polarization_vectors=polarization_vectors, detector_name=self.name, ) if accumulate: self._accumulated_result = DetectorResult.merge( [self._accumulated_result, result] ) self._events.extend(result.to_detection_events()) return result
[docs] def detect_rays( self, rays: RayBatch, compute_travel_time: bool = True, speed_of_light: float = 299792458.0, ) -> DetectorResult: """ Detect rays (alias for detect with accumulate=False). This method is provided for backward compatibility with code that used the detect_rays() method. Parameters ---------- rays : RayBatch Rays to detect compute_travel_time : bool If True, add travel time to intersection speed_of_light : float Speed of light for time computation Returns ------- DetectorResult Detection results """ return self.detect( rays, accumulate=False, compute_travel_time=compute_travel_time, speed_of_light=speed_of_light, )
[docs] def record_rays(self, rays: RayBatch) -> DetectorResult: """ Record rays (backward compatibility alias for detect). Parameters ---------- rays : RayBatch Rays to record Returns ------- DetectorResult Detection results """ return self.detect(rays, accumulate=False)
[docs] def clear(self) -> None: """ Clear all recorded detections. Resets the detector to its initial state with no recorded events. """ self._accumulated_result = DetectorResult.empty(self.name) self._events = []
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"LocalRecordingSphereDetector(radius={self._radius/1000:.1f}km, " f"center={self._center.tolist()}, rays={self._accumulated_result.num_rays})" )
[docs] def __len__(self) -> int: """Return number of detected rays.""" return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs] def get_arrival_times(self) -> NDArray[np.float64]: """Get array of all arrival times.""" return self._accumulated_result.times.astype(np.float64)
[docs] def get_arrival_angles( self, reference_direction: NDArray[np.float32] ) -> NDArray[np.float64]: """Get angles between ray directions and reference direction.""" if self._accumulated_result.is_empty: return np.array([], dtype=np.float64) ref = reference_direction / np.linalg.norm(reference_direction) dir_norms = self._accumulated_result.directions / np.linalg.norm( self._accumulated_result.directions, axis=1, keepdims=True ) cos_angles = np.dot(dir_norms, ref) cos_angles = np.clip(cos_angles, -1.0, 1.0) return np.arccos(cos_angles).astype(np.float64)
[docs] def get_intensities(self) -> NDArray[np.float64]: """Get array of all detected intensities.""" return self._accumulated_result.intensities.astype(np.float64)
[docs] def get_wavelengths(self) -> NDArray[np.float64]: """Get array of all detected wavelengths.""" return self._accumulated_result.wavelengths.astype(np.float64)
[docs] def get_positions(self) -> NDArray[np.float32]: """Get array of all detection positions.""" return self._accumulated_result.positions.astype(np.float32)
[docs] def get_total_intensity(self) -> float: """Get sum of all detected intensities.""" return self._accumulated_result.total_intensity