Source code for lsurf.detectors.results

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
DetectorResult - Unified bulk numpy-based result container for detectors.

This module provides the DetectorResult class which is the primary return type
for all detectors and simulations. It replaces both List[DetectionEvent] and
RecordedRays with a unified interface.

Examples
--------
>>> from lsurf.detectors import DetectorResult
>>>
>>> # Create a result from detection
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
>>> print(f"Total intensity: {result.total_intensity:.3e}")
>>>
>>> # Filter by wavelength
>>> visible = result.filter_by_wavelength(400e-9, 700e-9)
>>>
>>> # Compute statistics
>>> stats = result.compute_statistics()
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
from numpy.typing import NDArray

if TYPE_CHECKING:
    from ..utilities.recording_sphere import RecordedRays

# Optional h5py import
try:
    import h5py

    HAS_H5PY = True
except ImportError:
    HAS_H5PY = False

# Import EARTH_RADIUS from surfaces to avoid circular imports at module level
# Use lazy import in methods that need it


[docs] @dataclass class DetectorResult: """ Unified bulk numpy-based result container for detector outputs. This is the primary return type for all detectors and simulations, providing efficient bulk storage and analysis of detected rays. Attributes ---------- positions : ndarray, shape (N, 3) Intersection positions (meters) directions : ndarray, shape (N, 3) Ray directions at intersection (unit vectors) times : ndarray, shape (N,) Time of arrival (seconds) intensities : ndarray, shape (N,) Ray intensity at detection wavelengths : ndarray, shape (N,) Ray wavelength (meters) ray_indices : ndarray, shape (N,), optional Original ray indices in the source RayBatch generations : ndarray, shape (N,), optional Ray generation (number of surface interactions) polarization_vectors : ndarray, shape (N, 3), optional 3D polarization vectors (electric field direction) detector_name : str Name of the detector that produced this result metadata : dict Additional metadata (simulation parameters, etc.) Examples -------- >>> result = detector.detect(rays) >>> print(f"Detected {result.num_rays} rays") >>> print(f"Total intensity: {result.total_intensity:.3e}") >>> >>> # Filter by time window >>> early = result.filter_by_time(0, 1e-6) >>> >>> # Merge multiple results >>> combined = DetectorResult.merge([result1, result2]) """ # Core data (always present) positions: NDArray[np.float32] directions: NDArray[np.float32] times: NDArray[np.float32] intensities: NDArray[np.float32] wavelengths: NDArray[np.float32] # Optional data ray_indices: NDArray[np.int32] | None = None generations: NDArray[np.int32] | None = None polarization_vectors: NDArray[np.float32] | None = None # Metadata detector_name: str = "unnamed" metadata: dict[str, Any] = field(default_factory=dict) # ------------------------------------------------------------------------- # Properties # ------------------------------------------------------------------------- @property def num_rays(self) -> int: """Number of detected rays.""" return len(self.positions) @property def total_intensity(self) -> float: """Sum of all detected intensities.""" return float(np.sum(self.intensities)) @property def is_empty(self) -> bool: """Whether the result contains no rays.""" return self.num_rays == 0 # ------------------------------------------------------------------------- # Statistics and Analysis # -------------------------------------------------------------------------
[docs] def compute_statistics(self) -> dict[str, Any]: """ Compute summary statistics for the detected rays. Returns ------- dict Dictionary containing: - count: number of rays - total_intensity: sum of intensities - mean_time: average arrival time - std_time: arrival time standard deviation - min_time: earliest arrival - max_time: latest arrival - mean_wavelength: average wavelength - time_spread: max_time - min_time Examples -------- >>> stats = result.compute_statistics() >>> print(f"Detected {stats['count']} rays") >>> print(f"Time spread: {stats['time_spread']:.3e} s") """ if self.is_empty: return { "count": 0, "total_intensity": 0.0, "mean_time": 0.0, "std_time": 0.0, "min_time": 0.0, "max_time": 0.0, "mean_wavelength": 0.0, "time_spread": 0.0, } return { "count": self.num_rays, "total_intensity": self.total_intensity, "mean_time": float(np.mean(self.times)), "std_time": float(np.std(self.times)), "min_time": float(np.min(self.times)), "max_time": float(np.max(self.times)), "mean_wavelength": float(np.mean(self.wavelengths)), "time_spread": float(np.max(self.times) - np.min(self.times)), }
[docs] def compute_time_histogram( self, num_bins: int = 50, time_range: tuple[float, float] | None = None, weighted: bool = True, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """ Compute arrival time distribution histogram. Parameters ---------- num_bins : int Number of histogram bins time_range : tuple, optional (min, max) time range. If None, uses data range. weighted : bool If True, weight by intensity. If False, count rays. Returns ------- bin_centers : ndarray Bin centers in seconds values : ndarray Histogram values (counts or intensity sum per bin) Examples -------- >>> times, counts = result.compute_time_histogram(num_bins=100) >>> plt.bar(times * 1e9, counts, width=(times[1]-times[0])*1e9) >>> plt.xlabel('Time (ns)') """ if self.is_empty: return np.array([], dtype=np.float64), np.array([], dtype=np.float64) if time_range is None: time_range = (float(np.min(self.times)), float(np.max(self.times))) # Handle case where all times are very similar time_span = time_range[1] - time_range[0] if time_span < 1e-12: weights = self.intensities if weighted else None total = np.sum(weights) if weights is not None else self.num_rays return np.array([np.mean(self.times)]), np.array([total]) weights = self.intensities if weighted else None values, bin_edges = np.histogram( self.times, bins=num_bins, range=time_range, weights=weights ) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 return bin_centers, values.astype(np.float64)
[docs] def compute_angular_distribution( self, reference_direction: NDArray[np.float32], num_bins: int = 50, weighted: bool = True, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """ Compute angular distribution histogram. Parameters ---------- reference_direction : ndarray, shape (3,) Reference direction for angle calculation num_bins : int Number of histogram bins weighted : bool If True, weight by intensity. If False, count rays. Returns ------- bin_centers : ndarray Bin centers in degrees (0-180) values : ndarray Histogram values Examples -------- >>> angles, counts = result.compute_angular_distribution( ... reference_direction=np.array([0, 0, 1]) ... ) """ if self.is_empty: return np.array([], dtype=np.float64), np.array([], dtype=np.float64) ref = reference_direction / np.linalg.norm(reference_direction) dir_norms = self.directions / np.linalg.norm( self.directions, axis=1, keepdims=True ) cos_angles = np.dot(dir_norms, ref) cos_angles = np.clip(cos_angles, -1.0, 1.0) angles = np.degrees(np.arccos(cos_angles)) weights = self.intensities if weighted else None values, bin_edges = np.histogram( angles, bins=num_bins, range=(0, 180), weights=weights ) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 return bin_centers, values.astype(np.float64)
[docs] def compute_angular_coordinates( self, earth_center: NDArray[np.float64] | None = None, ) -> dict[str, NDArray[np.float32]]: """ Compute angular coordinates for all ray intersection points. Computes spherical coordinates (latitude/longitude) of intersection points on a detection sphere relative to Earth's center. Parameters ---------- earth_center : ndarray, optional Earth center position, default (0, 0, -EARTH_RADIUS) Returns ------- dict Dictionary with: - 'elevation': Latitude angle above equator (radians, -π/2 to π/2) - 'azimuth': Longitude angle (radians, -π to π) - 'zenith': Zenith angle from north pole (radians, 0 to π) - 'incidence': Angle between ray direction and outward radial (radians) Examples -------- >>> coords = result.compute_angular_coordinates() >>> elevation_deg = np.degrees(coords['elevation']) """ from ..surfaces import EARTH_RADIUS if self.is_empty: return { "elevation": np.array([], dtype=np.float32), "azimuth": np.array([], dtype=np.float32), "zenith": np.array([], dtype=np.float32), "incidence": np.array([], dtype=np.float32), } if earth_center is None: earth_center = np.array([0, 0, -EARTH_RADIUS], dtype=np.float64) # Vector from Earth center to intersection point to_pos = self.positions.astype(np.float64) - earth_center r = np.linalg.norm(to_pos, axis=1, keepdims=True) # Spherical coordinates of the intersection point elevation = np.arcsin(to_pos[:, 2] / r.squeeze()) _azimuth = np.arctan2(to_pos[:, 1], to_pos[:, 0]) # noqa: F841 # Incidence angle: angle between ray direction and outward radial radial = to_pos / r cos_incidence = np.sum(self.directions * radial, axis=1) incidence = np.arccos(np.clip(cos_incidence, -1.0, 1.0)) # For azimuth of direction, project onto local tangent plane global_z = np.array([0, 0, 1], dtype=np.float64) tangent_x = np.cross(global_z, radial) tangent_x_norm = np.linalg.norm(tangent_x, axis=1, keepdims=True) tangent_x = tangent_x / np.maximum(tangent_x_norm, 1e-10) tangent_y = np.cross(radial, tangent_x) dir_x = np.sum(self.directions * tangent_x, axis=1) dir_y = np.sum(self.directions * tangent_y, axis=1) dir_azimuth = np.arctan2(dir_y, dir_x) return { "elevation": elevation.astype(np.float32), "azimuth": dir_azimuth.astype(np.float32), "zenith": incidence.astype(np.float32), "incidence": incidence.astype(np.float32), }
[docs] def compute_viewing_angle_from_origin( self, origin: NDArray[np.float64] | None = None, ) -> NDArray[np.float32]: """ Compute viewing angle from horizontal at specified origin. Calculates the angle above the horizontal plane (XY plane) when viewing each intersection point from the origin position. Parameters ---------- origin : ndarray, optional Observer position, default (0, 0, 0) Returns ------- ndarray Viewing angle from horizontal in radians (-π/2 to π/2) Positive angles are above horizontal, negative below Examples -------- >>> viewing_angles = result.compute_viewing_angle_from_origin() >>> print(f"Mean viewing angle: {np.degrees(viewing_angles.mean()):.1f}°") """ if self.is_empty: return np.array([], dtype=np.float32) if origin is None: origin = np.array([0, 0, 0], dtype=np.float64) to_point = self.positions.astype(np.float64) - origin horiz_dist = np.sqrt(to_point[:, 0] ** 2 + to_point[:, 1] ** 2) vert_dist = to_point[:, 2] viewing_angle = np.arctan2(vert_dist, horiz_dist) return viewing_angle.astype(np.float32)
[docs] def compute_ray_direction_angles(self) -> dict[str, NDArray[np.float32]]: """ Compute elevation and azimuth angles of ray directions. Returns ------- dict Dictionary with: - 'elevation': Angle above horizontal plane in radians (-π/2 to π/2) - 'azimuth': Azimuth angle in horizontal plane in radians (-π to π) Examples -------- >>> angles = result.compute_ray_direction_angles() >>> print(f"Mean elevation: {np.degrees(angles['elevation'].mean()):.1f}°") """ if self.is_empty: return { "elevation": np.array([], dtype=np.float32), "azimuth": np.array([], dtype=np.float32), } elevation = np.arcsin(np.clip(self.directions[:, 2], -1.0, 1.0)) azimuth = np.arctan2(self.directions[:, 1], self.directions[:, 0]) return { "elevation": elevation.astype(np.float32), "azimuth": azimuth.astype(np.float32), }
# ------------------------------------------------------------------------- # Filtering # -------------------------------------------------------------------------
[docs] def filter(self, mask: NDArray[np.bool_]) -> "DetectorResult": """ Filter rays by boolean mask. Parameters ---------- mask : ndarray of bool Boolean mask, True for rays to keep Returns ------- DetectorResult Filtered result containing only selected rays Examples -------- >>> high_intensity = result.filter(result.intensities > 0.1) """ return DetectorResult( positions=self.positions[mask], directions=self.directions[mask], times=self.times[mask], intensities=self.intensities[mask], wavelengths=self.wavelengths[mask], ray_indices=( self.ray_indices[mask] if self.ray_indices is not None else None ), generations=( self.generations[mask] if self.generations is not None else None ), polarization_vectors=( self.polarization_vectors[mask] if self.polarization_vectors is not None else None ), detector_name=self.detector_name, metadata=self.metadata.copy(), )
[docs] def filter_by_wavelength( self, min_wavelength: float, max_wavelength: float ) -> "DetectorResult": """ Filter rays by wavelength range. Parameters ---------- min_wavelength : float Minimum wavelength in meters max_wavelength : float Maximum wavelength in meters Returns ------- DetectorResult Filtered result Examples -------- >>> visible = result.filter_by_wavelength(400e-9, 700e-9) """ mask = (self.wavelengths >= min_wavelength) & ( self.wavelengths <= max_wavelength ) return self.filter(mask)
[docs] def filter_by_time(self, min_time: float, max_time: float) -> "DetectorResult": """ Filter rays by time range. Parameters ---------- min_time : float Minimum time in seconds max_time : float Maximum time in seconds Returns ------- DetectorResult Filtered result Examples -------- >>> early_arrivals = result.filter_by_time(0, 1e-6) """ mask = (self.times >= min_time) & (self.times <= max_time) return self.filter(mask)
[docs] def filter_by_intensity( self, min_intensity: float = 0.0, max_intensity: float = float("inf") ) -> "DetectorResult": """ Filter rays by intensity range. Parameters ---------- min_intensity : float Minimum intensity max_intensity : float Maximum intensity Returns ------- DetectorResult Filtered result Examples -------- >>> bright = result.filter_by_intensity(min_intensity=0.1) """ mask = (self.intensities >= min_intensity) & (self.intensities <= max_intensity) return self.filter(mask)
# ------------------------------------------------------------------------- # Class Methods # -------------------------------------------------------------------------
[docs] @classmethod def empty(cls, detector_name: str = "unnamed") -> "DetectorResult": """ Create an empty DetectorResult. Parameters ---------- detector_name : str Name for the detector Returns ------- DetectorResult Empty result with zero rays Examples -------- >>> empty = DetectorResult.empty("my_detector") >>> print(empty.is_empty) # True """ return cls( positions=np.zeros((0, 3), dtype=np.float32), directions=np.zeros((0, 3), dtype=np.float32), times=np.zeros(0, dtype=np.float32), intensities=np.zeros(0, dtype=np.float32), wavelengths=np.zeros(0, dtype=np.float32), ray_indices=None, generations=None, polarization_vectors=None, detector_name=detector_name, )
[docs] @classmethod def merge(cls, results: list["DetectorResult"]) -> "DetectorResult": """ Merge multiple DetectorResults into one. Parameters ---------- results : list of DetectorResult Results to merge Returns ------- DetectorResult Combined result Examples -------- >>> combined = DetectorResult.merge([result1, result2, result3]) """ if not results: return cls.empty() non_empty = [r for r in results if not r.is_empty] if not non_empty: return cls.empty(results[0].detector_name if results else "unnamed") if len(non_empty) == 1: return non_empty[0] # Check for optional fields has_ray_indices = all(r.ray_indices is not None for r in non_empty) has_generations = all(r.generations is not None for r in non_empty) has_polarization = all(r.polarization_vectors is not None for r in non_empty) return cls( positions=np.vstack([r.positions for r in non_empty]), directions=np.vstack([r.directions for r in non_empty]), times=np.concatenate([r.times for r in non_empty]), intensities=np.concatenate([r.intensities for r in non_empty]), wavelengths=np.concatenate([r.wavelengths for r in non_empty]), ray_indices=( np.concatenate([r.ray_indices for r in non_empty]) if has_ray_indices else None ), generations=( np.concatenate([r.generations for r in non_empty]) if has_generations else None ), polarization_vectors=( np.vstack([r.polarization_vectors for r in non_empty]) if has_polarization else None ), detector_name=non_empty[0].detector_name, metadata=non_empty[0].metadata.copy(), )
# ------------------------------------------------------------------------- # Serialization # -------------------------------------------------------------------------
[docs] def save_npz(self, filepath: str | Path) -> None: """ Save to numpy .npz file. Parameters ---------- filepath : str or Path Output file path Examples -------- >>> result.save_npz("detections.npz") """ filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) # Compute angular coordinates for convenience angular = self.compute_angular_coordinates() save_dict = { "positions": self.positions, "directions": self.directions, "times": self.times, "intensities": self.intensities, "wavelengths": self.wavelengths, "elevation": angular["elevation"], "azimuth": angular["azimuth"], "zenith": angular["zenith"], } if self.ray_indices is not None: save_dict["ray_indices"] = self.ray_indices if self.generations is not None: save_dict["generations"] = self.generations if self.polarization_vectors is not None: save_dict["polarization_vectors"] = self.polarization_vectors # Add metadata save_dict["meta_detector_name"] = np.array(self.detector_name) for key, value in self.metadata.items(): save_dict[f"meta_{key}"] = np.array(value) np.savez_compressed(filepath, **save_dict)
[docs] @classmethod def load_npz(cls, filepath: str | Path) -> "DetectorResult": """ Load from numpy .npz file. Parameters ---------- filepath : str or Path Input file path Returns ------- DetectorResult Loaded result Examples -------- >>> result = DetectorResult.load_npz("detections.npz") """ data = np.load(filepath, allow_pickle=True) # Extract metadata metadata = {} detector_name = "unnamed" for key in data.files: if key.startswith("meta_"): meta_key = key[5:] if meta_key == "detector_name": detector_name = str(data[key]) else: metadata[meta_key] = data[key] return cls( positions=data["positions"], directions=data["directions"], times=data["times"], intensities=data["intensities"], wavelengths=data["wavelengths"], ray_indices=data.get("ray_indices"), generations=data.get("generations"), polarization_vectors=data.get("polarization_vectors"), detector_name=detector_name, metadata=metadata, )
[docs] def save_hdf5( self, filepath: str | Path, compression: str = "gzip", ) -> None: """ Save to HDF5 file. Parameters ---------- filepath : str or Path Output file path compression : str Compression algorithm ('gzip', 'lzf', or None) Examples -------- >>> result.save_hdf5("detections.h5") """ if not HAS_H5PY: raise ImportError( "h5py is required for HDF5 support. Install with: pip install h5py" ) filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) with h5py.File(filepath, "w") as f: # Create rays group rays_grp = f.create_group("rays") # Store ray data with compression rays_grp.create_dataset( "positions", data=self.positions, compression=compression ) rays_grp.create_dataset( "directions", data=self.directions, compression=compression ) rays_grp.create_dataset("times", data=self.times, compression=compression) rays_grp.create_dataset( "intensities", data=self.intensities, compression=compression ) rays_grp.create_dataset( "wavelengths", data=self.wavelengths, compression=compression ) # Optional fields if self.ray_indices is not None: rays_grp.create_dataset( "ray_indices", data=self.ray_indices, compression=compression ) if self.generations is not None: rays_grp.create_dataset( "generations", data=self.generations, compression=compression ) if self.polarization_vectors is not None: rays_grp.create_dataset( "polarization_vectors", data=self.polarization_vectors, compression=compression, ) # Compute and store angular coordinates angular = self.compute_angular_coordinates() angular_grp = f.create_group("angular") for key, value in angular.items(): angular_grp.create_dataset(key, data=value, compression=compression) # Store metadata meta_grp = f.create_group("metadata") meta_grp.attrs["num_rays"] = self.num_rays meta_grp.attrs["detector_name"] = self.detector_name meta_grp.attrs["save_time"] = datetime.now().isoformat() for key, value in self.metadata.items(): if isinstance(value, (int, float, str, bool)): meta_grp.attrs[key] = value elif isinstance(value, np.ndarray): meta_grp.create_dataset(key, data=value) elif isinstance(value, (list, tuple)): meta_grp.create_dataset(key, data=np.array(value)) else: meta_grp.attrs[key] = str(value)
[docs] @classmethod def load_hdf5(cls, filepath: str | Path) -> "DetectorResult": """ Load from HDF5 file. Parameters ---------- filepath : str or Path Input file path Returns ------- DetectorResult Loaded result Examples -------- >>> result = DetectorResult.load_hdf5("detections.h5") """ if not HAS_H5PY: raise ImportError( "h5py is required for HDF5 support. Install with: pip install h5py" ) with h5py.File(filepath, "r") as f: rays_grp = f["rays"] # Load required fields positions = rays_grp["positions"][...] directions = rays_grp["directions"][...] times = rays_grp["times"][...] intensities = rays_grp["intensities"][...] wavelengths = rays_grp["wavelengths"][...] # Load optional fields ray_indices = ( rays_grp["ray_indices"][...] if "ray_indices" in rays_grp else None ) generations = ( rays_grp["generations"][...] if "generations" in rays_grp else None ) polarization_vectors = ( rays_grp["polarization_vectors"][...] if "polarization_vectors" in rays_grp else None ) # Load metadata metadata = {} detector_name = "unnamed" if "metadata" in f: meta_grp = f["metadata"] detector_name = meta_grp.attrs.get("detector_name", "unnamed") for key, value in meta_grp.attrs.items(): if key not in ("num_rays", "detector_name", "save_time"): metadata[key] = value for key in meta_grp.keys(): metadata[key] = meta_grp[key][...] return cls( positions=positions, directions=directions, times=times, intensities=intensities, wavelengths=wavelengths, ray_indices=ray_indices, generations=generations, polarization_vectors=polarization_vectors, detector_name=detector_name, metadata=metadata, )
# ------------------------------------------------------------------------- # Backward Compatibility # -------------------------------------------------------------------------
[docs] def to_detection_events(self) -> list: """ Convert to list of DetectionEvent objects for backward compatibility. Returns ------- list of DetectionEvent List of individual detection events Examples -------- >>> events = result.to_detection_events() >>> for event in events: ... print(f"Ray {event.ray_index}: {event.intensity:.3f}") """ from .base import DetectionEvent events = [] for i in range(self.num_rays): ray_idx = int(self.ray_indices[i]) if self.ray_indices is not None else i event = DetectionEvent( ray_index=ray_idx, position=self.positions[i].copy(), direction=self.directions[i].copy(), time=float(self.times[i]), wavelength=float(self.wavelengths[i]), intensity=float(self.intensities[i]), ) events.append(event) return events
[docs] @classmethod def from_detection_events( cls, events: list, detector_name: str = "unnamed", generations: NDArray[np.int32] | None = None, polarization_vectors: NDArray[np.float32] | None = None, ) -> "DetectorResult": """ Create from list of DetectionEvent objects for backward compatibility. Parameters ---------- events : list of DetectionEvent List of detection events detector_name : str Detector name generations : ndarray, optional Ray generations if available polarization_vectors : ndarray, optional Polarization vectors if available Returns ------- DetectorResult Converted result Examples -------- >>> result = DetectorResult.from_detection_events(detector.events) """ if not events: return cls.empty(detector_name) return cls( positions=np.stack([e.position for e in events]).astype(np.float32), directions=np.stack([e.direction for e in events]).astype(np.float32), times=np.array([e.time for e in events], dtype=np.float32), intensities=np.array([e.intensity for e in events], dtype=np.float32), wavelengths=np.array([e.wavelength for e in events], dtype=np.float32), ray_indices=np.array([e.ray_index for e in events], dtype=np.int32), generations=generations, polarization_vectors=polarization_vectors, detector_name=detector_name, )
[docs] def to_recorded_rays(self) -> "RecordedRays": """ Convert to RecordedRays for backward compatibility. Returns ------- RecordedRays Converted RecordedRays object Notes ----- This is provided for backward compatibility during migration. New code should use DetectorResult directly. """ from ..utilities.recording_sphere import RecordedRays return RecordedRays( positions=self.positions, directions=self.directions, times=self.times, intensities=self.intensities, wavelengths=self.wavelengths, generations=( self.generations if self.generations is not None else np.zeros(self.num_rays, dtype=np.int32) ), polarization_vectors=self.polarization_vectors, )
[docs] @classmethod def from_recorded_rays( cls, recorded: "RecordedRays", detector_name: str = "unnamed", ray_indices: NDArray[np.int32] | None = None, ) -> "DetectorResult": """ Create from RecordedRays for backward compatibility. Parameters ---------- recorded : RecordedRays RecordedRays object to convert detector_name : str Detector name ray_indices : ndarray, optional Original ray indices if available Returns ------- DetectorResult Converted result Notes ----- This is provided for backward compatibility during migration. New code should use DetectorResult directly. """ # Use recorded.ray_indices if available and ray_indices parameter not provided final_ray_indices = ray_indices if final_ray_indices is None and hasattr(recorded, "ray_indices"): final_ray_indices = recorded.ray_indices return cls( positions=recorded.positions, directions=recorded.directions, times=recorded.times, intensities=recorded.intensities, wavelengths=recorded.wavelengths, ray_indices=final_ray_indices, generations=recorded.generations, polarization_vectors=recorded.polarization_vectors, detector_name=detector_name, )
# ------------------------------------------------------------------------- # Representation # -------------------------------------------------------------------------
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"DetectorResult(detector='{self.detector_name}', " f"rays={self.num_rays}, intensity={self.total_intensity:.3e})" )
[docs] def __len__(self) -> int: """Return number of detected rays.""" return self.num_rays
[docs] def __iter__(self): """ Iterate over detection events for backward compatibility. Yields DetectionEvent objects for each detected ray. New code should access arrays directly instead. """ from .base import DetectionEvent for i in range(self.num_rays): ray_idx = int(self.ray_indices[i]) if self.ray_indices is not None else i yield DetectionEvent( ray_index=ray_idx, position=self.positions[i].copy(), direction=self.directions[i].copy(), time=float(self.times[i]), wavelength=float(self.wavelengths[i]), intensity=float(self.intensities[i]), )
[docs] def __getitem__(self, index: int): """ Get a single detection event by index for backward compatibility. Parameters ---------- index : int Index of the detection event Returns ------- DetectionEvent Detection event at the specified index """ from .base import DetectionEvent if index < 0: index = self.num_rays + index if index < 0 or index >= self.num_rays: raise IndexError(f"Index {index} out of range for {self.num_rays} rays") ray_idx = ( int(self.ray_indices[index]) if self.ray_indices is not None else index ) return DetectionEvent( ray_index=ray_idx, position=self.positions[index].copy(), direction=self.directions[index].copy(), time=float(self.times[index]), wavelength=float(self.wavelengths[index]), intensity=float(self.intensities[index]), )