# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
DetectorResult - Unified bulk numpy-based result container for detectors.
This module provides the DetectorResult class which is the primary return type
for all detectors and simulations. It replaces both List[DetectionEvent] and
RecordedRays with a unified interface.
Examples
--------
>>> from lsurf.detectors import DetectorResult
>>>
>>> # Create a result from detection
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
>>> print(f"Total intensity: {result.total_intensity:.3e}")
>>>
>>> # Filter by wavelength
>>> visible = result.filter_by_wavelength(400e-9, 700e-9)
>>>
>>> # Compute statistics
>>> stats = result.compute_statistics()
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
import numpy as np
from numpy.typing import NDArray
if TYPE_CHECKING:
from ..utilities.recording_sphere import RecordedRays
# Optional h5py import
try:
import h5py
HAS_H5PY = True
except ImportError:
HAS_H5PY = False
# Import EARTH_RADIUS from surfaces to avoid circular imports at module level
# Use lazy import in methods that need it
[docs]
@dataclass
class DetectorResult:
"""
Unified bulk numpy-based result container for detector outputs.
This is the primary return type for all detectors and simulations,
providing efficient bulk storage and analysis of detected rays.
Attributes
----------
positions : ndarray, shape (N, 3)
Intersection positions (meters)
directions : ndarray, shape (N, 3)
Ray directions at intersection (unit vectors)
times : ndarray, shape (N,)
Time of arrival (seconds)
intensities : ndarray, shape (N,)
Ray intensity at detection
wavelengths : ndarray, shape (N,)
Ray wavelength (meters)
ray_indices : ndarray, shape (N,), optional
Original ray indices in the source RayBatch
generations : ndarray, shape (N,), optional
Ray generation (number of surface interactions)
polarization_vectors : ndarray, shape (N, 3), optional
3D polarization vectors (electric field direction)
detector_name : str
Name of the detector that produced this result
metadata : dict
Additional metadata (simulation parameters, etc.)
Examples
--------
>>> result = detector.detect(rays)
>>> print(f"Detected {result.num_rays} rays")
>>> print(f"Total intensity: {result.total_intensity:.3e}")
>>>
>>> # Filter by time window
>>> early = result.filter_by_time(0, 1e-6)
>>>
>>> # Merge multiple results
>>> combined = DetectorResult.merge([result1, result2])
"""
# Core data (always present)
positions: NDArray[np.float32]
directions: NDArray[np.float32]
times: NDArray[np.float32]
intensities: NDArray[np.float32]
wavelengths: NDArray[np.float32]
# Optional data
ray_indices: NDArray[np.int32] | None = None
generations: NDArray[np.int32] | None = None
polarization_vectors: NDArray[np.float32] | None = None
# Metadata
detector_name: str = "unnamed"
metadata: dict[str, Any] = field(default_factory=dict)
# -------------------------------------------------------------------------
# Properties
# -------------------------------------------------------------------------
@property
def num_rays(self) -> int:
"""Number of detected rays."""
return len(self.positions)
@property
def total_intensity(self) -> float:
"""Sum of all detected intensities."""
return float(np.sum(self.intensities))
@property
def is_empty(self) -> bool:
"""Whether the result contains no rays."""
return self.num_rays == 0
# -------------------------------------------------------------------------
# Statistics and Analysis
# -------------------------------------------------------------------------
[docs]
def compute_statistics(self) -> dict[str, Any]:
"""
Compute summary statistics for the detected rays.
Returns
-------
dict
Dictionary containing:
- count: number of rays
- total_intensity: sum of intensities
- mean_time: average arrival time
- std_time: arrival time standard deviation
- min_time: earliest arrival
- max_time: latest arrival
- mean_wavelength: average wavelength
- time_spread: max_time - min_time
Examples
--------
>>> stats = result.compute_statistics()
>>> print(f"Detected {stats['count']} rays")
>>> print(f"Time spread: {stats['time_spread']:.3e} s")
"""
if self.is_empty:
return {
"count": 0,
"total_intensity": 0.0,
"mean_time": 0.0,
"std_time": 0.0,
"min_time": 0.0,
"max_time": 0.0,
"mean_wavelength": 0.0,
"time_spread": 0.0,
}
return {
"count": self.num_rays,
"total_intensity": self.total_intensity,
"mean_time": float(np.mean(self.times)),
"std_time": float(np.std(self.times)),
"min_time": float(np.min(self.times)),
"max_time": float(np.max(self.times)),
"mean_wavelength": float(np.mean(self.wavelengths)),
"time_spread": float(np.max(self.times) - np.min(self.times)),
}
[docs]
def compute_time_histogram(
self,
num_bins: int = 50,
time_range: tuple[float, float] | None = None,
weighted: bool = True,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Compute arrival time distribution histogram.
Parameters
----------
num_bins : int
Number of histogram bins
time_range : tuple, optional
(min, max) time range. If None, uses data range.
weighted : bool
If True, weight by intensity. If False, count rays.
Returns
-------
bin_centers : ndarray
Bin centers in seconds
values : ndarray
Histogram values (counts or intensity sum per bin)
Examples
--------
>>> times, counts = result.compute_time_histogram(num_bins=100)
>>> plt.bar(times * 1e9, counts, width=(times[1]-times[0])*1e9)
>>> plt.xlabel('Time (ns)')
"""
if self.is_empty:
return np.array([], dtype=np.float64), np.array([], dtype=np.float64)
if time_range is None:
time_range = (float(np.min(self.times)), float(np.max(self.times)))
# Handle case where all times are very similar
time_span = time_range[1] - time_range[0]
if time_span < 1e-12:
weights = self.intensities if weighted else None
total = np.sum(weights) if weights is not None else self.num_rays
return np.array([np.mean(self.times)]), np.array([total])
weights = self.intensities if weighted else None
values, bin_edges = np.histogram(
self.times, bins=num_bins, range=time_range, weights=weights
)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
return bin_centers, values.astype(np.float64)
[docs]
def compute_angular_distribution(
self,
reference_direction: NDArray[np.float32],
num_bins: int = 50,
weighted: bool = True,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Compute angular distribution histogram.
Parameters
----------
reference_direction : ndarray, shape (3,)
Reference direction for angle calculation
num_bins : int
Number of histogram bins
weighted : bool
If True, weight by intensity. If False, count rays.
Returns
-------
bin_centers : ndarray
Bin centers in degrees (0-180)
values : ndarray
Histogram values
Examples
--------
>>> angles, counts = result.compute_angular_distribution(
... reference_direction=np.array([0, 0, 1])
... )
"""
if self.is_empty:
return np.array([], dtype=np.float64), np.array([], dtype=np.float64)
ref = reference_direction / np.linalg.norm(reference_direction)
dir_norms = self.directions / np.linalg.norm(
self.directions, axis=1, keepdims=True
)
cos_angles = np.dot(dir_norms, ref)
cos_angles = np.clip(cos_angles, -1.0, 1.0)
angles = np.degrees(np.arccos(cos_angles))
weights = self.intensities if weighted else None
values, bin_edges = np.histogram(
angles, bins=num_bins, range=(0, 180), weights=weights
)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
return bin_centers, values.astype(np.float64)
[docs]
def compute_angular_coordinates(
self,
earth_center: NDArray[np.float64] | None = None,
) -> dict[str, NDArray[np.float32]]:
"""
Compute angular coordinates for all ray intersection points.
Computes spherical coordinates (latitude/longitude) of intersection points
on a detection sphere relative to Earth's center.
Parameters
----------
earth_center : ndarray, optional
Earth center position, default (0, 0, -EARTH_RADIUS)
Returns
-------
dict
Dictionary with:
- 'elevation': Latitude angle above equator (radians, -π/2 to π/2)
- 'azimuth': Longitude angle (radians, -π to π)
- 'zenith': Zenith angle from north pole (radians, 0 to π)
- 'incidence': Angle between ray direction and outward radial (radians)
Examples
--------
>>> coords = result.compute_angular_coordinates()
>>> elevation_deg = np.degrees(coords['elevation'])
"""
from ..surfaces import EARTH_RADIUS
if self.is_empty:
return {
"elevation": np.array([], dtype=np.float32),
"azimuth": np.array([], dtype=np.float32),
"zenith": np.array([], dtype=np.float32),
"incidence": np.array([], dtype=np.float32),
}
if earth_center is None:
earth_center = np.array([0, 0, -EARTH_RADIUS], dtype=np.float64)
# Vector from Earth center to intersection point
to_pos = self.positions.astype(np.float64) - earth_center
r = np.linalg.norm(to_pos, axis=1, keepdims=True)
# Spherical coordinates of the intersection point
elevation = np.arcsin(to_pos[:, 2] / r.squeeze())
_azimuth = np.arctan2(to_pos[:, 1], to_pos[:, 0]) # noqa: F841
# Incidence angle: angle between ray direction and outward radial
radial = to_pos / r
cos_incidence = np.sum(self.directions * radial, axis=1)
incidence = np.arccos(np.clip(cos_incidence, -1.0, 1.0))
# For azimuth of direction, project onto local tangent plane
global_z = np.array([0, 0, 1], dtype=np.float64)
tangent_x = np.cross(global_z, radial)
tangent_x_norm = np.linalg.norm(tangent_x, axis=1, keepdims=True)
tangent_x = tangent_x / np.maximum(tangent_x_norm, 1e-10)
tangent_y = np.cross(radial, tangent_x)
dir_x = np.sum(self.directions * tangent_x, axis=1)
dir_y = np.sum(self.directions * tangent_y, axis=1)
dir_azimuth = np.arctan2(dir_y, dir_x)
return {
"elevation": elevation.astype(np.float32),
"azimuth": dir_azimuth.astype(np.float32),
"zenith": incidence.astype(np.float32),
"incidence": incidence.astype(np.float32),
}
[docs]
def compute_viewing_angle_from_origin(
self,
origin: NDArray[np.float64] | None = None,
) -> NDArray[np.float32]:
"""
Compute viewing angle from horizontal at specified origin.
Calculates the angle above the horizontal plane (XY plane) when
viewing each intersection point from the origin position.
Parameters
----------
origin : ndarray, optional
Observer position, default (0, 0, 0)
Returns
-------
ndarray
Viewing angle from horizontal in radians (-π/2 to π/2)
Positive angles are above horizontal, negative below
Examples
--------
>>> viewing_angles = result.compute_viewing_angle_from_origin()
>>> print(f"Mean viewing angle: {np.degrees(viewing_angles.mean()):.1f}°")
"""
if self.is_empty:
return np.array([], dtype=np.float32)
if origin is None:
origin = np.array([0, 0, 0], dtype=np.float64)
to_point = self.positions.astype(np.float64) - origin
horiz_dist = np.sqrt(to_point[:, 0] ** 2 + to_point[:, 1] ** 2)
vert_dist = to_point[:, 2]
viewing_angle = np.arctan2(vert_dist, horiz_dist)
return viewing_angle.astype(np.float32)
[docs]
def compute_ray_direction_angles(self) -> dict[str, NDArray[np.float32]]:
"""
Compute elevation and azimuth angles of ray directions.
Returns
-------
dict
Dictionary with:
- 'elevation': Angle above horizontal plane in radians (-π/2 to π/2)
- 'azimuth': Azimuth angle in horizontal plane in radians (-π to π)
Examples
--------
>>> angles = result.compute_ray_direction_angles()
>>> print(f"Mean elevation: {np.degrees(angles['elevation'].mean()):.1f}°")
"""
if self.is_empty:
return {
"elevation": np.array([], dtype=np.float32),
"azimuth": np.array([], dtype=np.float32),
}
elevation = np.arcsin(np.clip(self.directions[:, 2], -1.0, 1.0))
azimuth = np.arctan2(self.directions[:, 1], self.directions[:, 0])
return {
"elevation": elevation.astype(np.float32),
"azimuth": azimuth.astype(np.float32),
}
# -------------------------------------------------------------------------
# Filtering
# -------------------------------------------------------------------------
[docs]
def filter(self, mask: NDArray[np.bool_]) -> "DetectorResult":
"""
Filter rays by boolean mask.
Parameters
----------
mask : ndarray of bool
Boolean mask, True for rays to keep
Returns
-------
DetectorResult
Filtered result containing only selected rays
Examples
--------
>>> high_intensity = result.filter(result.intensities > 0.1)
"""
return DetectorResult(
positions=self.positions[mask],
directions=self.directions[mask],
times=self.times[mask],
intensities=self.intensities[mask],
wavelengths=self.wavelengths[mask],
ray_indices=(
self.ray_indices[mask] if self.ray_indices is not None else None
),
generations=(
self.generations[mask] if self.generations is not None else None
),
polarization_vectors=(
self.polarization_vectors[mask]
if self.polarization_vectors is not None
else None
),
detector_name=self.detector_name,
metadata=self.metadata.copy(),
)
[docs]
def filter_by_wavelength(
self, min_wavelength: float, max_wavelength: float
) -> "DetectorResult":
"""
Filter rays by wavelength range.
Parameters
----------
min_wavelength : float
Minimum wavelength in meters
max_wavelength : float
Maximum wavelength in meters
Returns
-------
DetectorResult
Filtered result
Examples
--------
>>> visible = result.filter_by_wavelength(400e-9, 700e-9)
"""
mask = (self.wavelengths >= min_wavelength) & (
self.wavelengths <= max_wavelength
)
return self.filter(mask)
[docs]
def filter_by_time(self, min_time: float, max_time: float) -> "DetectorResult":
"""
Filter rays by time range.
Parameters
----------
min_time : float
Minimum time in seconds
max_time : float
Maximum time in seconds
Returns
-------
DetectorResult
Filtered result
Examples
--------
>>> early_arrivals = result.filter_by_time(0, 1e-6)
"""
mask = (self.times >= min_time) & (self.times <= max_time)
return self.filter(mask)
[docs]
def filter_by_intensity(
self, min_intensity: float = 0.0, max_intensity: float = float("inf")
) -> "DetectorResult":
"""
Filter rays by intensity range.
Parameters
----------
min_intensity : float
Minimum intensity
max_intensity : float
Maximum intensity
Returns
-------
DetectorResult
Filtered result
Examples
--------
>>> bright = result.filter_by_intensity(min_intensity=0.1)
"""
mask = (self.intensities >= min_intensity) & (self.intensities <= max_intensity)
return self.filter(mask)
# -------------------------------------------------------------------------
# Class Methods
# -------------------------------------------------------------------------
[docs]
@classmethod
def empty(cls, detector_name: str = "unnamed") -> "DetectorResult":
"""
Create an empty DetectorResult.
Parameters
----------
detector_name : str
Name for the detector
Returns
-------
DetectorResult
Empty result with zero rays
Examples
--------
>>> empty = DetectorResult.empty("my_detector")
>>> print(empty.is_empty) # True
"""
return cls(
positions=np.zeros((0, 3), dtype=np.float32),
directions=np.zeros((0, 3), dtype=np.float32),
times=np.zeros(0, dtype=np.float32),
intensities=np.zeros(0, dtype=np.float32),
wavelengths=np.zeros(0, dtype=np.float32),
ray_indices=None,
generations=None,
polarization_vectors=None,
detector_name=detector_name,
)
[docs]
@classmethod
def merge(cls, results: list["DetectorResult"]) -> "DetectorResult":
"""
Merge multiple DetectorResults into one.
Parameters
----------
results : list of DetectorResult
Results to merge
Returns
-------
DetectorResult
Combined result
Examples
--------
>>> combined = DetectorResult.merge([result1, result2, result3])
"""
if not results:
return cls.empty()
non_empty = [r for r in results if not r.is_empty]
if not non_empty:
return cls.empty(results[0].detector_name if results else "unnamed")
if len(non_empty) == 1:
return non_empty[0]
# Check for optional fields
has_ray_indices = all(r.ray_indices is not None for r in non_empty)
has_generations = all(r.generations is not None for r in non_empty)
has_polarization = all(r.polarization_vectors is not None for r in non_empty)
return cls(
positions=np.vstack([r.positions for r in non_empty]),
directions=np.vstack([r.directions for r in non_empty]),
times=np.concatenate([r.times for r in non_empty]),
intensities=np.concatenate([r.intensities for r in non_empty]),
wavelengths=np.concatenate([r.wavelengths for r in non_empty]),
ray_indices=(
np.concatenate([r.ray_indices for r in non_empty])
if has_ray_indices
else None
),
generations=(
np.concatenate([r.generations for r in non_empty])
if has_generations
else None
),
polarization_vectors=(
np.vstack([r.polarization_vectors for r in non_empty])
if has_polarization
else None
),
detector_name=non_empty[0].detector_name,
metadata=non_empty[0].metadata.copy(),
)
# -------------------------------------------------------------------------
# Serialization
# -------------------------------------------------------------------------
[docs]
def save_npz(self, filepath: str | Path) -> None:
"""
Save to numpy .npz file.
Parameters
----------
filepath : str or Path
Output file path
Examples
--------
>>> result.save_npz("detections.npz")
"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
# Compute angular coordinates for convenience
angular = self.compute_angular_coordinates()
save_dict = {
"positions": self.positions,
"directions": self.directions,
"times": self.times,
"intensities": self.intensities,
"wavelengths": self.wavelengths,
"elevation": angular["elevation"],
"azimuth": angular["azimuth"],
"zenith": angular["zenith"],
}
if self.ray_indices is not None:
save_dict["ray_indices"] = self.ray_indices
if self.generations is not None:
save_dict["generations"] = self.generations
if self.polarization_vectors is not None:
save_dict["polarization_vectors"] = self.polarization_vectors
# Add metadata
save_dict["meta_detector_name"] = np.array(self.detector_name)
for key, value in self.metadata.items():
save_dict[f"meta_{key}"] = np.array(value)
np.savez_compressed(filepath, **save_dict)
[docs]
@classmethod
def load_npz(cls, filepath: str | Path) -> "DetectorResult":
"""
Load from numpy .npz file.
Parameters
----------
filepath : str or Path
Input file path
Returns
-------
DetectorResult
Loaded result
Examples
--------
>>> result = DetectorResult.load_npz("detections.npz")
"""
data = np.load(filepath, allow_pickle=True)
# Extract metadata
metadata = {}
detector_name = "unnamed"
for key in data.files:
if key.startswith("meta_"):
meta_key = key[5:]
if meta_key == "detector_name":
detector_name = str(data[key])
else:
metadata[meta_key] = data[key]
return cls(
positions=data["positions"],
directions=data["directions"],
times=data["times"],
intensities=data["intensities"],
wavelengths=data["wavelengths"],
ray_indices=data.get("ray_indices"),
generations=data.get("generations"),
polarization_vectors=data.get("polarization_vectors"),
detector_name=detector_name,
metadata=metadata,
)
[docs]
def save_hdf5(
self,
filepath: str | Path,
compression: str = "gzip",
) -> None:
"""
Save to HDF5 file.
Parameters
----------
filepath : str or Path
Output file path
compression : str
Compression algorithm ('gzip', 'lzf', or None)
Examples
--------
>>> result.save_hdf5("detections.h5")
"""
if not HAS_H5PY:
raise ImportError(
"h5py is required for HDF5 support. Install with: pip install h5py"
)
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with h5py.File(filepath, "w") as f:
# Create rays group
rays_grp = f.create_group("rays")
# Store ray data with compression
rays_grp.create_dataset(
"positions", data=self.positions, compression=compression
)
rays_grp.create_dataset(
"directions", data=self.directions, compression=compression
)
rays_grp.create_dataset("times", data=self.times, compression=compression)
rays_grp.create_dataset(
"intensities", data=self.intensities, compression=compression
)
rays_grp.create_dataset(
"wavelengths", data=self.wavelengths, compression=compression
)
# Optional fields
if self.ray_indices is not None:
rays_grp.create_dataset(
"ray_indices", data=self.ray_indices, compression=compression
)
if self.generations is not None:
rays_grp.create_dataset(
"generations", data=self.generations, compression=compression
)
if self.polarization_vectors is not None:
rays_grp.create_dataset(
"polarization_vectors",
data=self.polarization_vectors,
compression=compression,
)
# Compute and store angular coordinates
angular = self.compute_angular_coordinates()
angular_grp = f.create_group("angular")
for key, value in angular.items():
angular_grp.create_dataset(key, data=value, compression=compression)
# Store metadata
meta_grp = f.create_group("metadata")
meta_grp.attrs["num_rays"] = self.num_rays
meta_grp.attrs["detector_name"] = self.detector_name
meta_grp.attrs["save_time"] = datetime.now().isoformat()
for key, value in self.metadata.items():
if isinstance(value, (int, float, str, bool)):
meta_grp.attrs[key] = value
elif isinstance(value, np.ndarray):
meta_grp.create_dataset(key, data=value)
elif isinstance(value, (list, tuple)):
meta_grp.create_dataset(key, data=np.array(value))
else:
meta_grp.attrs[key] = str(value)
[docs]
@classmethod
def load_hdf5(cls, filepath: str | Path) -> "DetectorResult":
"""
Load from HDF5 file.
Parameters
----------
filepath : str or Path
Input file path
Returns
-------
DetectorResult
Loaded result
Examples
--------
>>> result = DetectorResult.load_hdf5("detections.h5")
"""
if not HAS_H5PY:
raise ImportError(
"h5py is required for HDF5 support. Install with: pip install h5py"
)
with h5py.File(filepath, "r") as f:
rays_grp = f["rays"]
# Load required fields
positions = rays_grp["positions"][...]
directions = rays_grp["directions"][...]
times = rays_grp["times"][...]
intensities = rays_grp["intensities"][...]
wavelengths = rays_grp["wavelengths"][...]
# Load optional fields
ray_indices = (
rays_grp["ray_indices"][...] if "ray_indices" in rays_grp else None
)
generations = (
rays_grp["generations"][...] if "generations" in rays_grp else None
)
polarization_vectors = (
rays_grp["polarization_vectors"][...]
if "polarization_vectors" in rays_grp
else None
)
# Load metadata
metadata = {}
detector_name = "unnamed"
if "metadata" in f:
meta_grp = f["metadata"]
detector_name = meta_grp.attrs.get("detector_name", "unnamed")
for key, value in meta_grp.attrs.items():
if key not in ("num_rays", "detector_name", "save_time"):
metadata[key] = value
for key in meta_grp.keys():
metadata[key] = meta_grp[key][...]
return cls(
positions=positions,
directions=directions,
times=times,
intensities=intensities,
wavelengths=wavelengths,
ray_indices=ray_indices,
generations=generations,
polarization_vectors=polarization_vectors,
detector_name=detector_name,
metadata=metadata,
)
# -------------------------------------------------------------------------
# Backward Compatibility
# -------------------------------------------------------------------------
[docs]
def to_detection_events(self) -> list:
"""
Convert to list of DetectionEvent objects for backward compatibility.
Returns
-------
list of DetectionEvent
List of individual detection events
Examples
--------
>>> events = result.to_detection_events()
>>> for event in events:
... print(f"Ray {event.ray_index}: {event.intensity:.3f}")
"""
from .base import DetectionEvent
events = []
for i in range(self.num_rays):
ray_idx = int(self.ray_indices[i]) if self.ray_indices is not None else i
event = DetectionEvent(
ray_index=ray_idx,
position=self.positions[i].copy(),
direction=self.directions[i].copy(),
time=float(self.times[i]),
wavelength=float(self.wavelengths[i]),
intensity=float(self.intensities[i]),
)
events.append(event)
return events
[docs]
@classmethod
def from_detection_events(
cls,
events: list,
detector_name: str = "unnamed",
generations: NDArray[np.int32] | None = None,
polarization_vectors: NDArray[np.float32] | None = None,
) -> "DetectorResult":
"""
Create from list of DetectionEvent objects for backward compatibility.
Parameters
----------
events : list of DetectionEvent
List of detection events
detector_name : str
Detector name
generations : ndarray, optional
Ray generations if available
polarization_vectors : ndarray, optional
Polarization vectors if available
Returns
-------
DetectorResult
Converted result
Examples
--------
>>> result = DetectorResult.from_detection_events(detector.events)
"""
if not events:
return cls.empty(detector_name)
return cls(
positions=np.stack([e.position for e in events]).astype(np.float32),
directions=np.stack([e.direction for e in events]).astype(np.float32),
times=np.array([e.time for e in events], dtype=np.float32),
intensities=np.array([e.intensity for e in events], dtype=np.float32),
wavelengths=np.array([e.wavelength for e in events], dtype=np.float32),
ray_indices=np.array([e.ray_index for e in events], dtype=np.int32),
generations=generations,
polarization_vectors=polarization_vectors,
detector_name=detector_name,
)
[docs]
def to_recorded_rays(self) -> "RecordedRays":
"""
Convert to RecordedRays for backward compatibility.
Returns
-------
RecordedRays
Converted RecordedRays object
Notes
-----
This is provided for backward compatibility during migration.
New code should use DetectorResult directly.
"""
from ..utilities.recording_sphere import RecordedRays
return RecordedRays(
positions=self.positions,
directions=self.directions,
times=self.times,
intensities=self.intensities,
wavelengths=self.wavelengths,
generations=(
self.generations
if self.generations is not None
else np.zeros(self.num_rays, dtype=np.int32)
),
polarization_vectors=self.polarization_vectors,
)
[docs]
@classmethod
def from_recorded_rays(
cls,
recorded: "RecordedRays",
detector_name: str = "unnamed",
ray_indices: NDArray[np.int32] | None = None,
) -> "DetectorResult":
"""
Create from RecordedRays for backward compatibility.
Parameters
----------
recorded : RecordedRays
RecordedRays object to convert
detector_name : str
Detector name
ray_indices : ndarray, optional
Original ray indices if available
Returns
-------
DetectorResult
Converted result
Notes
-----
This is provided for backward compatibility during migration.
New code should use DetectorResult directly.
"""
# Use recorded.ray_indices if available and ray_indices parameter not provided
final_ray_indices = ray_indices
if final_ray_indices is None and hasattr(recorded, "ray_indices"):
final_ray_indices = recorded.ray_indices
return cls(
positions=recorded.positions,
directions=recorded.directions,
times=recorded.times,
intensities=recorded.intensities,
wavelengths=recorded.wavelengths,
ray_indices=final_ray_indices,
generations=recorded.generations,
polarization_vectors=recorded.polarization_vectors,
detector_name=detector_name,
)
# -------------------------------------------------------------------------
# Representation
# -------------------------------------------------------------------------
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"DetectorResult(detector='{self.detector_name}', "
f"rays={self.num_rays}, intensity={self.total_intensity:.3e})"
)
[docs]
def __len__(self) -> int:
"""Return number of detected rays."""
return self.num_rays
[docs]
def __iter__(self):
"""
Iterate over detection events for backward compatibility.
Yields DetectionEvent objects for each detected ray.
New code should access arrays directly instead.
"""
from .base import DetectionEvent
for i in range(self.num_rays):
ray_idx = int(self.ray_indices[i]) if self.ray_indices is not None else i
yield DetectionEvent(
ray_index=ray_idx,
position=self.positions[i].copy(),
direction=self.directions[i].copy(),
time=float(self.times[i]),
wavelength=float(self.wavelengths[i]),
intensity=float(self.intensities[i]),
)
[docs]
def __getitem__(self, index: int):
"""
Get a single detection event by index for backward compatibility.
Parameters
----------
index : int
Index of the detection event
Returns
-------
DetectionEvent
Detection event at the specified index
"""
from .base import DetectionEvent
if index < 0:
index = self.num_rays + index
if index < 0 or index >= self.num_rays:
raise IndexError(f"Index {index} out of range for {self.num_rays} rays")
ray_idx = (
int(self.ray_indices[index]) if self.ray_indices is not None else index
)
return DetectionEvent(
ray_index=ray_idx,
position=self.positions[index].copy(),
direction=self.directions[index].copy(),
time=float(self.times[index]),
wavelength=float(self.wavelengths[index]),
intensity=float(self.intensities[index]),
)