# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Spherical Detector Implementation
Provides a spherical detector that detects rays passing within a specified
radius of a center point. Ideal for collecting rays from all directions.
Examples
--------
>>> from lsurf.detectors.small import SphericalDetector
>>>
>>> detector = SphericalDetector(
... center=(0, 0, 100),
... radius=10.0,
... name="Far-field detector"
... )
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
"""
import numpy as np
from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult
# Check if GPU is available
try:
from numba import cuda
CUDA_AVAILABLE = cuda.is_available()
except ImportError:
CUDA_AVAILABLE = False
[docs]
class SphericalDetector:
"""
Spherical detector centered at a point.
Detects all rays that pass within a certain radius of the center point.
Good for collecting rays from all directions without directional bias.
Parameters
----------
center : tuple of float
Center position (x, y, z) in meters.
radius : float
Detection radius in meters.
name : str, optional
Detector name. Default is "Spherical Detector".
use_gpu : bool, optional
Whether to use GPU acceleration when available. Default is True.
Attributes
----------
center : ndarray, shape (3,)
Detector center position.
radius : float
Detection radius.
use_gpu : bool
GPU acceleration flag.
name : str
Detector name.
accumulated_result : DetectorResult
All accumulated detections since last clear().
Notes
-----
Detection is based on the closest approach distance between each ray
and the center point. A ray is detected if this distance is less than
or equal to the detection radius.
The arrival time is computed assuming the ray travels through air
(n approx 1.0) from its current position to the detection point.
Examples
--------
>>> detector = SphericalDetector(
... center=(0, 0, 100),
... radius=10.0,
... use_gpu=True
... )
>>> result = detector.detect(reflected_rays)
>>> print(f"Detected {result.num_rays} rays")
"""
[docs]
def __init__(
self,
center: tuple[float, float, float],
radius: float,
name: str = "Spherical Detector",
use_gpu: bool = True,
):
"""
Initialize spherical detector.
Parameters
----------
center : tuple of float
Center position (x, y, z) in meters.
radius : float
Detection radius in meters.
name : str, optional
Detector name. Default is "Spherical Detector".
use_gpu : bool, optional
Whether to use GPU acceleration. Default is True.
"""
self.name = name
self.center = np.array(center, dtype=np.float32)
self.radius = radius
self.use_gpu = use_gpu
self._accumulated_result = DetectorResult.empty(name)
# Keep events list for backward compatibility
self._events: list[DetectionEvent] = []
@property
def accumulated_result(self) -> DetectorResult:
"""All accumulated detections since last clear()."""
return self._accumulated_result
@property
def events(self) -> list[DetectionEvent]:
"""
Backward compatibility: list of DetectionEvent objects.
For new code, use accumulated_result instead.
"""
return self._events
[docs]
def detect(
self, rays: RayBatch, current_time: float = 0.0, accumulate: bool = True
) -> DetectorResult:
"""
Detect rays that pass within detection radius.
For each ray, find the closest approach to center. If within
radius, record a detection event.
Parameters
----------
rays : RayBatch
Ray batch to test.
current_time : float, optional
Current simulation time. Default is 0.0.
accumulate : bool, optional
Whether to accumulate results. Default is True.
Returns
-------
DetectorResult
Newly detected rays.
"""
if rays is None or rays.num_rays == 0:
return DetectorResult.empty(self.name)
# Use GPU only if requested AND available
if self.use_gpu and CUDA_AVAILABLE:
result = self._detect_gpu(rays, current_time)
else:
result = self._detect_cpu(rays, current_time)
if accumulate:
self._accumulated_result = DetectorResult.merge(
[self._accumulated_result, result]
)
# Update events list for backward compatibility
self._events.extend(result.to_detection_events())
return result
def _detect_gpu(self, rays: RayBatch, current_time: float = 0.0) -> DetectorResult:
"""
GPU-accelerated detection.
Parameters
----------
rays : RayBatch
Ray batch to test.
current_time : float
Current simulation time.
Returns
-------
DetectorResult
Newly detected rays.
"""
from ...propagation.detector_gpu import detect_spherical_gpu
hit_mask, hit_distances, hit_times = detect_spherical_gpu(
rays.positions.astype(np.float32),
rays.directions.astype(np.float32),
rays.active,
rays.accumulated_time.astype(np.float32),
rays.wavelengths.astype(np.float32),
rays.intensities.astype(np.float32),
self.center,
self.radius,
)
hit_indices = np.where(hit_mask)[0]
if len(hit_indices) == 0:
return DetectorResult.empty(self.name)
# Compute closest points
dir_norms = rays.directions[hit_indices] / np.linalg.norm(
rays.directions[hit_indices], axis=1, keepdims=True
)
closest_points = (
rays.positions[hit_indices]
+ hit_distances[hit_indices, np.newaxis] * dir_norms
)
return DetectorResult(
positions=closest_points.astype(np.float32),
directions=rays.directions[hit_indices].astype(np.float32),
times=hit_times[hit_indices].astype(np.float32),
intensities=rays.intensities[hit_indices].astype(np.float32),
wavelengths=rays.wavelengths[hit_indices].astype(np.float32),
ray_indices=hit_indices.astype(np.int32),
generations=(
rays.generations[hit_indices].astype(np.int32)
if rays.generations is not None
else None
),
polarization_vectors=(
rays.polarization_vector[hit_indices].astype(np.float32)
if rays.polarization_vector is not None
else None
),
detector_name=self.name,
)
def _detect_cpu(self, rays: RayBatch, current_time: float = 0.0) -> DetectorResult:
"""
CPU detection implementation.
Parameters
----------
rays : RayBatch
Ray batch to test.
current_time : float
Current simulation time.
Returns
-------
DetectorResult
Newly detected rays.
"""
c = 299792458.0 # Speed of light in m/s
n = 1.0 # Refractive index of air
# Vectorized computation
active_mask = rays.active
if not np.any(active_mask):
return DetectorResult.empty(self.name)
origins = rays.positions[active_mask]
directions = rays.directions[active_mask]
active_indices = np.where(active_mask)[0]
# Vector from ray origin to sphere center
oc = self.center - origins
# Normalize directions
dir_norms = directions / np.linalg.norm(directions, axis=1, keepdims=True)
# Project onto ray direction
t_closest = np.sum(oc * dir_norms, axis=1)
# Only consider forward propagation
forward_mask = t_closest > 0
if not np.any(forward_mask):
return DetectorResult.empty(self.name)
# Find closest points on rays
closest_points = (
origins[forward_mask]
+ t_closest[forward_mask, np.newaxis] * dir_norms[forward_mask]
)
# Distance to center
dists = np.linalg.norm(closest_points - self.center, axis=1)
# Check if within radius
hit_mask = dists <= self.radius
if not np.any(hit_mask):
return DetectorResult.empty(self.name)
# Get final indices
final_active_indices = active_indices[forward_mask][hit_mask]
final_closest_points = closest_points[hit_mask]
final_t_closest = t_closest[forward_mask][hit_mask]
# Compute arrival times
additional_times = final_t_closest * n / c
arrival_times = rays.accumulated_time[final_active_indices] + additional_times
return DetectorResult(
positions=final_closest_points.astype(np.float32),
directions=rays.directions[final_active_indices].astype(np.float32),
times=arrival_times.astype(np.float32),
intensities=rays.intensities[final_active_indices].astype(np.float32),
wavelengths=rays.wavelengths[final_active_indices].astype(np.float32),
ray_indices=final_active_indices.astype(np.int32),
generations=(
rays.generations[final_active_indices].astype(np.int32)
if rays.generations is not None
else None
),
polarization_vectors=(
rays.polarization_vector[final_active_indices].astype(np.float32)
if rays.polarization_vector is not None
else None
),
detector_name=self.name,
)
[docs]
def clear(self) -> None:
"""
Clear all recorded detections.
Resets the detector to its initial state with no recorded events.
"""
self._accumulated_result = DetectorResult.empty(self.name)
self._events = []
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"SphericalDetector(center={self.center.tolist()}, "
f"radius={self.radius}, rays={self._accumulated_result.num_rays})"
)
[docs]
def __len__(self) -> int:
"""Return number of detected rays."""
return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs]
def get_arrival_times(self) -> np.ndarray:
"""
Get array of all arrival times.
Returns
-------
times : ndarray, shape (N,)
Arrival times in seconds for all detected rays.
"""
return self._accumulated_result.times.astype(np.float64)
[docs]
def get_arrival_angles(self, reference_direction: np.ndarray) -> np.ndarray:
"""
Get angles between ray directions and reference direction.
Parameters
----------
reference_direction : ndarray, shape (3,)
Reference vector for angle calculation.
Returns
-------
angles : ndarray, shape (N,)
Angles in radians.
"""
if self._accumulated_result.is_empty:
return np.array([], dtype=np.float64)
ref = reference_direction / np.linalg.norm(reference_direction)
dir_norms = self._accumulated_result.directions / np.linalg.norm(
self._accumulated_result.directions, axis=1, keepdims=True
)
cos_angles = np.dot(dir_norms, ref)
cos_angles = np.clip(cos_angles, -1.0, 1.0)
return np.arccos(cos_angles).astype(np.float64)
[docs]
def get_intensities(self) -> np.ndarray:
"""
Get array of all detected intensities.
Returns
-------
intensities : ndarray, shape (N,)
Intensity values for all detected rays.
"""
return self._accumulated_result.intensities.astype(np.float64)
[docs]
def get_wavelengths(self) -> np.ndarray:
"""
Get array of all detected wavelengths.
Returns
-------
wavelengths : ndarray, shape (N,)
Wavelengths in meters.
"""
return self._accumulated_result.wavelengths.astype(np.float64)
[docs]
def get_positions(self) -> np.ndarray:
"""
Get array of all detection positions.
Returns
-------
positions : ndarray, shape (N, 3)
3D positions where rays were detected.
"""
return self._accumulated_result.positions.astype(np.float32)
[docs]
def get_total_intensity(self) -> float:
"""
Get sum of all detected intensities.
Returns
-------
float
Total detected intensity.
"""
return self._accumulated_result.total_intensity