Source code for lsurf.detectors.small.directional

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Directional Detector Implementation

Provides a directional detector with limited angular acceptance.
Useful for modeling detectors with finite field of view.

Examples
--------
>>> from lsurf.detectors.small import DirectionalDetector
>>>
>>> detector = DirectionalDetector(
...     position=(0, 0, 100),
...     direction=(0, 0, -1),  # Looking back toward source
...     acceptance_angle=np.radians(10),  # 10 degree acceptance cone
...     radius=5.0,
...     name="Telescope detector"
... )
>>> result = detector.detect(rays)
"""

import numpy as np

from ...utilities.ray_data import RayBatch
from ..base import DetectionEvent
from ..results import DetectorResult


[docs] class DirectionalDetector: """ Detector with angular acceptance cone. Detects rays that pass within a specified radius AND arrive within a specified angular acceptance cone. Useful for modeling detectors with limited field of view such as telescopes or fiber couplers. Parameters ---------- position : tuple of float Detector position (x, y, z) in meters. direction : tuple of float Direction detector is pointing (acceptance cone axis). acceptance_angle : float Half-angle of acceptance cone in radians. radius : float Detection radius at position in meters. name : str, optional Detector name. Default is "Directional Detector". Attributes ---------- position : ndarray, shape (3,) Detector position. direction : ndarray, shape (3,) Unit vector pointing in detector viewing direction. acceptance_angle : float Acceptance cone half-angle in radians. radius : float Detection radius. name : str Detector name. accumulated_result : DetectorResult All accumulated detections since last clear(). Notes ----- A ray is detected if: 1. Its closest approach to the detector position is within radius 2. The angle between the ray direction and the detector direction (considering the ray as incoming) is within acceptance_angle Examples -------- >>> # 10-degree acceptance cone detector >>> detector = DirectionalDetector( ... position=(0, 0, 100), ... direction=(0, 0, -1), ... acceptance_angle=np.radians(10), ... radius=5.0 ... ) >>> result = detector.detect(rays) >>> print(f"Detected {result.num_rays} rays within acceptance cone") """
[docs] def __init__( self, position: tuple[float, float, float], direction: tuple[float, float, float], acceptance_angle: float, radius: float, name: str = "Directional Detector", ): """ Initialize directional detector. Parameters ---------- position : tuple of float Detector position in meters. direction : tuple of float Direction detector is pointing. acceptance_angle : float Acceptance cone half-angle in radians. radius : float Detection radius in meters. name : str, optional Detector name. """ self.name = name self.position = np.array(position, dtype=np.float32) self.direction = np.array(direction, dtype=np.float32) self.direction = self.direction / np.linalg.norm(self.direction) self.acceptance_angle = acceptance_angle self.radius = radius self._accumulated_result = DetectorResult.empty(name) self._events: list[DetectionEvent] = []
@property def accumulated_result(self) -> DetectorResult: """All accumulated detections since last clear().""" return self._accumulated_result @property def events(self) -> list[DetectionEvent]: """ Backward compatibility: list of DetectionEvent objects. For new code, use accumulated_result instead. """ return self._events
[docs] def detect( self, rays: RayBatch, current_time: float = 0.0, accumulate: bool = True ) -> DetectorResult: """ Detect rays within acceptance cone and detection radius. Parameters ---------- rays : RayBatch Ray batch to test. current_time : float, optional Current simulation time. Default is 0.0. accumulate : bool, optional Whether to accumulate results. Default is True. Returns ------- DetectorResult Newly detected rays. """ if rays is None or rays.num_rays == 0: return DetectorResult.empty(self.name) c = 299792458.0 # Speed of light in m/s n = 1.0 # Refractive index of air # Vectorized computation active_mask = rays.active if not np.any(active_mask): return DetectorResult.empty(self.name) origins = rays.positions[active_mask] directions = rays.directions[active_mask] active_indices = np.where(active_mask)[0] # Find closest approach to detector position oc = self.position - origins dir_norms = directions / np.linalg.norm(directions, axis=1, keepdims=True) t_closest = np.sum(oc * dir_norms, axis=1) # Only consider forward propagation forward_mask = t_closest > 0 if not np.any(forward_mask): return DetectorResult.empty(self.name) # Get closest points closest_points = ( origins[forward_mask] + t_closest[forward_mask, np.newaxis] * dir_norms[forward_mask] ) dists = np.linalg.norm(closest_points - self.position, axis=1) # Check if within radius in_radius = dists <= self.radius if not np.any(in_radius): return DetectorResult.empty(self.name) # Update indices for rays within radius radius_indices = active_indices[forward_mask][in_radius] radius_dir_norms = dir_norms[forward_mask][in_radius] radius_t_closest = t_closest[forward_mask][in_radius] radius_closest_points = closest_points[in_radius] # Check if ray direction is within acceptance cone # Negative because ray is incoming (traveling toward detector) cos_angles = np.dot(-radius_dir_norms, self.direction) cos_angles = np.clip(cos_angles, -1.0, 1.0) angles = np.arccos(cos_angles) in_cone = angles <= self.acceptance_angle if not np.any(in_cone): return DetectorResult.empty(self.name) # Get final results final_indices = radius_indices[in_cone] final_closest_points = radius_closest_points[in_cone] final_t_closest = radius_t_closest[in_cone] # Compute arrival times additional_times = final_t_closest * n / c arrival_times = rays.accumulated_time[final_indices] + additional_times result = DetectorResult( positions=final_closest_points.astype(np.float32), directions=rays.directions[final_indices].astype(np.float32), times=arrival_times.astype(np.float32), intensities=rays.intensities[final_indices].astype(np.float32), wavelengths=rays.wavelengths[final_indices].astype(np.float32), ray_indices=final_indices.astype(np.int32), generations=( rays.generations[final_indices].astype(np.int32) if rays.generations is not None else None ), polarization_vectors=( rays.polarization_vector[final_indices].astype(np.float32) if rays.polarization_vector is not None else None ), detector_name=self.name, ) if accumulate: self._accumulated_result = DetectorResult.merge( [self._accumulated_result, result] ) self._events.extend(result.to_detection_events()) return result
[docs] def clear(self) -> None: """ Clear all recorded detections. Resets the detector to its initial state with no recorded events. """ self._accumulated_result = DetectorResult.empty(self.name) self._events = []
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"DirectionalDetector(position={self.position.tolist()}, " f"acceptance_angle={np.degrees(self.acceptance_angle):.1f} deg, " f"rays={self._accumulated_result.num_rays})" )
[docs] def __len__(self) -> int: """Return number of detected rays.""" return self._accumulated_result.num_rays
# Backward compatibility methods from old Detector base class
[docs] def get_arrival_times(self) -> np.ndarray: """Get array of all arrival times.""" return self._accumulated_result.times.astype(np.float64)
[docs] def get_arrival_angles(self, reference_direction: np.ndarray) -> np.ndarray: """Get angles between ray directions and reference direction.""" if self._accumulated_result.is_empty: return np.array([], dtype=np.float64) ref = reference_direction / np.linalg.norm(reference_direction) dir_norms = self._accumulated_result.directions / np.linalg.norm( self._accumulated_result.directions, axis=1, keepdims=True ) cos_angles = np.dot(dir_norms, ref) cos_angles = np.clip(cos_angles, -1.0, 1.0) return np.arccos(cos_angles).astype(np.float64)
[docs] def get_intensities(self) -> np.ndarray: """Get array of all detected intensities.""" return self._accumulated_result.intensities.astype(np.float64)
[docs] def get_wavelengths(self) -> np.ndarray: """Get array of all detected wavelengths.""" return self._accumulated_result.wavelengths.astype(np.float64)
[docs] def get_positions(self) -> np.ndarray: """Get array of all detection positions.""" return self._accumulated_result.positions.astype(np.float32)
[docs] def get_total_intensity(self) -> float: """Get sum of all detected intensities.""" return self._accumulated_result.total_intensity