Source code for lsurf.detectors.small.spherical

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Spherical Detector Implementation

Provides a spherical detector that detects rays passing within a specified
radius of a center point. Ideal for collecting rays from all directions.

Examples
--------
>>> from lsurf.detectors.small import SphericalDetector
>>>
>>> detector = SphericalDetector(
...     center=(0, 0, 100),
...     radius=10.0,
...     name="Far-field detector"
... )
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
"""

import numpy as np

from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult

# Check if GPU is available
try:
    from numba import cuda

    CUDA_AVAILABLE = cuda.is_available()
except ImportError:
    CUDA_AVAILABLE = False


[docs] class SphericalDetector: """ Spherical detector centered at a point. Detects all rays that pass within a certain radius of the center point. Good for collecting rays from all directions without directional bias. Parameters ---------- center : tuple of float Center position (x, y, z) in meters. radius : float Detection radius in meters. name : str, optional Detector name. Default is "Spherical Detector". use_gpu : bool, optional Whether to use GPU acceleration when available. Default is True. Attributes ---------- center : ndarray, shape (3,) Detector center position. radius : float Detection radius. use_gpu : bool GPU acceleration flag. name : str Detector name. accumulated_result : DetectorResult All accumulated detections since last clear(). Notes ----- Detection is based on the closest approach distance between each ray and the center point. A ray is detected if this distance is less than or equal to the detection radius. The arrival time is computed assuming the ray travels through air (n approx 1.0) from its current position to the detection point. Examples -------- >>> detector = SphericalDetector( ... center=(0, 0, 100), ... radius=10.0, ... use_gpu=True ... ) >>> result = detector.detect(reflected_rays) >>> print(f"Detected {result.num_rays} rays") """
[docs] def __init__( self, center: tuple[float, float, float], radius: float, name: str = "Spherical Detector", use_gpu: bool = True, ): """ Initialize spherical detector. Parameters ---------- center : tuple of float Center position (x, y, z) in meters. radius : float Detection radius in meters. name : str, optional Detector name. Default is "Spherical Detector". use_gpu : bool, optional Whether to use GPU acceleration. Default is True. """ self.name = name self.center = np.array(center, dtype=np.float32) self.radius = radius self.use_gpu = use_gpu self._accumulated_result = DetectorResult.empty(name) # Keep events list for backward compatibility self._events: list[DetectionEvent] = []
@property def accumulated_result(self) -> DetectorResult: """All accumulated detections since last clear().""" return self._accumulated_result @property def events(self) -> list[DetectionEvent]: """ Backward compatibility: list of DetectionEvent objects. For new code, use accumulated_result instead. """ return self._events
[docs] def detect( self, rays: RayBatch, current_time: float = 0.0, accumulate: bool = True ) -> DetectorResult: """ Detect rays that pass within detection radius. For each ray, find the closest approach to center. If within radius, record a detection event. Parameters ---------- rays : RayBatch Ray batch to test. current_time : float, optional Current simulation time. Default is 0.0. accumulate : bool, optional Whether to accumulate results. Default is True. Returns ------- DetectorResult Newly detected rays. """ if rays is None or rays.num_rays == 0: return DetectorResult.empty(self.name) # Use GPU only if requested AND available if self.use_gpu and CUDA_AVAILABLE: result = self._detect_gpu(rays, current_time) else: result = self._detect_cpu(rays, current_time) if accumulate: self._accumulated_result = DetectorResult.merge( [self._accumulated_result, result] ) # Update events list for backward compatibility self._events.extend(result.to_detection_events()) return result
def _detect_gpu(self, rays: RayBatch, current_time: float = 0.0) -> DetectorResult: """ GPU-accelerated detection. Parameters ---------- rays : RayBatch Ray batch to test. current_time : float Current simulation time. Returns ------- DetectorResult Newly detected rays. """ from ...propagation.detector_gpu import detect_spherical_gpu hit_mask, hit_distances, hit_times = detect_spherical_gpu( rays.positions.astype(np.float32), rays.directions.astype(np.float32), rays.active, rays.accumulated_time.astype(np.float32), rays.wavelengths.astype(np.float32), rays.intensities.astype(np.float32), self.center, self.radius, ) hit_indices = np.where(hit_mask)[0] if len(hit_indices) == 0: return DetectorResult.empty(self.name) # Compute closest points dir_norms = rays.directions[hit_indices] / np.linalg.norm( rays.directions[hit_indices], axis=1, keepdims=True ) closest_points = ( rays.positions[hit_indices] + hit_distances[hit_indices, np.newaxis] * dir_norms ) return DetectorResult( positions=closest_points.astype(np.float32), directions=rays.directions[hit_indices].astype(np.float32), times=hit_times[hit_indices].astype(np.float32), intensities=rays.intensities[hit_indices].astype(np.float32), wavelengths=rays.wavelengths[hit_indices].astype(np.float32), ray_indices=hit_indices.astype(np.int32), generations=( rays.generations[hit_indices].astype(np.int32) if rays.generations is not None else None ), polarization_vectors=( rays.polarization_vector[hit_indices].astype(np.float32) if rays.polarization_vector is not None else None ), detector_name=self.name, ) def _detect_cpu(self, rays: RayBatch, current_time: float = 0.0) -> DetectorResult: """ CPU detection implementation. Parameters ---------- rays : RayBatch Ray batch to test. current_time : float Current simulation time. Returns ------- DetectorResult Newly detected rays. """ c = 299792458.0 # Speed of light in m/s n = 1.0 # Refractive index of air # Vectorized computation active_mask = rays.active if not np.any(active_mask): return DetectorResult.empty(self.name) origins = rays.positions[active_mask] directions = rays.directions[active_mask] active_indices = np.where(active_mask)[0] # Vector from ray origin to sphere center oc = self.center - origins # Normalize directions dir_norms = directions / np.linalg.norm(directions, axis=1, keepdims=True) # Project onto ray direction t_closest = np.sum(oc * dir_norms, axis=1) # Only consider forward propagation forward_mask = t_closest > 0 if not np.any(forward_mask): return DetectorResult.empty(self.name) # Find closest points on rays closest_points = ( origins[forward_mask] + t_closest[forward_mask, np.newaxis] * dir_norms[forward_mask] ) # Distance to center dists = np.linalg.norm(closest_points - self.center, axis=1) # Check if within radius hit_mask = dists <= self.radius if not np.any(hit_mask): return DetectorResult.empty(self.name) # Get final indices final_active_indices = active_indices[forward_mask][hit_mask] final_closest_points = closest_points[hit_mask] final_t_closest = t_closest[forward_mask][hit_mask] # Compute arrival times additional_times = final_t_closest * n / c arrival_times = rays.accumulated_time[final_active_indices] + additional_times return DetectorResult( positions=final_closest_points.astype(np.float32), directions=rays.directions[final_active_indices].astype(np.float32), times=arrival_times.astype(np.float32), intensities=rays.intensities[final_active_indices].astype(np.float32), wavelengths=rays.wavelengths[final_active_indices].astype(np.float32), ray_indices=final_active_indices.astype(np.int32), generations=( rays.generations[final_active_indices].astype(np.int32) if rays.generations is not None else None ), polarization_vectors=( rays.polarization_vector[final_active_indices].astype(np.float32) if rays.polarization_vector is not None else None ), detector_name=self.name, )
[docs] def clear(self) -> None: """ Clear all recorded detections. Resets the detector to its initial state with no recorded events. """ self._accumulated_result = DetectorResult.empty(self.name) self._events = []
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"SphericalDetector(center={self.center.tolist()}, " f"radius={self.radius}, rays={self._accumulated_result.num_rays})" )
[docs] def __len__(self) -> int: """Return number of detected rays.""" return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs] def get_arrival_times(self) -> np.ndarray: """ Get array of all arrival times. Returns ------- times : ndarray, shape (N,) Arrival times in seconds for all detected rays. """ return self._accumulated_result.times.astype(np.float64)
[docs] def get_arrival_angles(self, reference_direction: np.ndarray) -> np.ndarray: """ Get angles between ray directions and reference direction. Parameters ---------- reference_direction : ndarray, shape (3,) Reference vector for angle calculation. Returns ------- angles : ndarray, shape (N,) Angles in radians. """ if self._accumulated_result.is_empty: return np.array([], dtype=np.float64) ref = reference_direction / np.linalg.norm(reference_direction) dir_norms = self._accumulated_result.directions / np.linalg.norm( self._accumulated_result.directions, axis=1, keepdims=True ) cos_angles = np.dot(dir_norms, ref) cos_angles = np.clip(cos_angles, -1.0, 1.0) return np.arccos(cos_angles).astype(np.float64)
[docs] def get_intensities(self) -> np.ndarray: """ Get array of all detected intensities. Returns ------- intensities : ndarray, shape (N,) Intensity values for all detected rays. """ return self._accumulated_result.intensities.astype(np.float64)
[docs] def get_wavelengths(self) -> np.ndarray: """ Get array of all detected wavelengths. Returns ------- wavelengths : ndarray, shape (N,) Wavelengths in meters. """ return self._accumulated_result.wavelengths.astype(np.float64)
[docs] def get_positions(self) -> np.ndarray: """ Get array of all detection positions. Returns ------- positions : ndarray, shape (N, 3) 3D positions where rays were detected. """ return self._accumulated_result.positions.astype(np.float32)
[docs] def get_total_intensity(self) -> float: """ Get sum of all detected intensities. Returns ------- float Total detected intensity. """ return self._accumulated_result.total_intensity