# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Simulation Result Data Structures
Contains the result dataclasses returned by the Simulation/Orchestrator classes.
"""
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
import numpy as np
import numpy.typing as npt
if TYPE_CHECKING:
from ..detectors.results import DetectorResult
from ..utilities.ray_data import RayBatch
@dataclass
class AdaptiveStepStats:
"""
Statistics about adaptive step sizes used during propagation.
Collects per-ray min/max/count from GPU adaptive kernels and
provides aggregate summaries.
Attributes
----------
step_min : ndarray, shape (N,)
Minimum step size used by each ray (meters).
step_max : ndarray, shape (N,)
Maximum step size used by each ray (meters).
step_count : ndarray, shape (N,)
Number of adaptive steps taken by each ray.
total_distance : ndarray, shape (N,)
Total geometric path length per ray (meters), for computing mean step.
"""
step_min: npt.NDArray[np.float32]
step_max: npt.NDArray[np.float32]
step_count: npt.NDArray[np.float32]
total_distance: npt.NDArray[np.float32]
@property
def global_min(self) -> float:
"""Smallest step size across all rays."""
valid = self.step_count > 0
if not np.any(valid):
return 0.0
return float(np.min(self.step_min[valid]))
@property
def global_max(self) -> float:
"""Largest step size across all rays."""
valid = self.step_count > 0
if not np.any(valid):
return 0.0
return float(np.max(self.step_max[valid]))
@property
def total_steps(self) -> int:
"""Total number of adaptive steps across all rays."""
return int(np.sum(self.step_count))
@property
def mean_step(self) -> float:
"""Mean step size (total distance / total steps)."""
total = float(np.sum(self.total_distance))
count = float(np.sum(self.step_count))
if count == 0:
return 0.0
return total / count
def summary(self) -> str:
"""Return a human-readable summary string."""
valid = self.step_count > 0
n_active = int(np.sum(valid))
if n_active == 0:
return "AdaptiveStepStats: no active rays"
per_ray_mean = np.where(
self.step_count > 0,
self.total_distance / self.step_count,
0.0,
)
return (
f"AdaptiveStepStats: {n_active} rays, "
f"{self.total_steps} total steps\n"
f" step size: min={self.global_min:.4g} m, "
f"max={self.global_max:.4g} m, "
f"mean={self.mean_step:.4g} m\n"
f" per-ray steps: min={int(np.min(self.step_count[valid]))}, "
f"max={int(np.max(self.step_count[valid]))}, "
f"mean={np.mean(self.step_count[valid]):.1f}\n"
f" per-ray mean step: min={np.min(per_ray_mean[valid]):.4g} m, "
f"max={np.max(per_ray_mean[valid]):.4g} m"
)
@staticmethod
def merge(stats_list: list["AdaptiveStepStats"]) -> "AdaptiveStepStats":
"""Merge multiple AdaptiveStepStats (e.g. from multiple bounces)."""
if not stats_list:
empty = np.empty(0, dtype=np.float32)
return AdaptiveStepStats(empty, empty, empty, empty)
# Concatenate per-ray arrays
return AdaptiveStepStats(
step_min=np.concatenate([s.step_min for s in stats_list]),
step_max=np.concatenate([s.step_max for s in stats_list]),
step_count=np.concatenate([s.step_count for s in stats_list]),
total_distance=np.concatenate([s.total_distance for s in stats_list]),
)
@dataclass
class SurfaceHitRecord:
"""
Record of ray hits on a surface.
Stores positions and directions of rays that hit a specific surface,
useful for visualization and analysis of intermediate surface interactions.
Attributes
----------
surface_name : str
Name of the surface.
positions : ndarray, shape (N, 3)
Hit positions in world coordinates.
directions : ndarray, shape (N, 3)
Ray directions at hit points.
intensities : ndarray, shape (N,)
Ray intensities at hit.
wavelengths : ndarray, shape (N,)
Ray wavelengths.
bounce : int
Which bounce iteration this hit occurred on (0-indexed).
"""
surface_name: str
positions: npt.NDArray[np.float32]
directions: npt.NDArray[np.float32]
intensities: npt.NDArray[np.float32]
wavelengths: npt.NDArray[np.float32]
bounce: int
@property
def num_hits(self) -> int:
"""Number of hits recorded."""
return len(self.positions)
def __repr__(self) -> str:
return (
f"SurfaceHitRecord(surface_name={self.surface_name!r}, "
f"num_hits={self.num_hits}, bounce={self.bounce})"
)
[docs]
@dataclass
class SimulationStatistics:
"""
Statistics from a simulation run.
Attributes
----------
total_rays_initial : int
Number of rays at simulation start.
total_rays_created : int
Total rays including split rays.
rays_detected : int
Rays that hit detector surfaces.
rays_absorbed : int
Rays terminated by absorber surfaces.
rays_terminated_intensity : int
Rays terminated due to low intensity.
rays_terminated_bounds : int
Rays that exited bounding sphere.
rays_terminated_max_bounces : int
Rays that reached max bounce limit.
bounces_completed : int
Number of bounce iterations completed.
max_depth_reached : int
Maximum tree depth from ray splitting.
"""
total_rays_initial: int = 0
total_rays_created: int = 0
rays_detected: int = 0
rays_absorbed: int = 0
rays_terminated_intensity: int = 0
rays_terminated_bounds: int = 0
rays_terminated_max_bounces: int = 0
bounces_completed: int = 0
max_depth_reached: int = 0
adaptive_step_stats: AdaptiveStepStats | None = None
[docs]
@dataclass
class SimulationResult:
"""
Complete result from a ray tracing simulation.
Attributes
----------
detected : DetectorResult
Rays that were recorded by detector surfaces.
remaining : RayBatch
Rays that are still active after simulation completed
(either didn't hit any surface or exceeded max bounces).
statistics : SimulationStatistics
Detailed simulation statistics.
detections_per_surface : dict
Number of detections per detector surface name.
surface_hits : dict or None
If track_surface_hits was enabled, contains SurfaceHitRecord objects
for each optical surface, keyed by surface name. Each surface may have
multiple records (one per bounce). If disabled, this is None.
Examples
--------
>>> result = sim.run(rays)
>>> print(f"Detected {result.statistics.rays_detected} rays")
>>> print(f"Completed in {result.statistics.bounces_completed} bounces")
>>>
>>> # Access detection results
>>> print(f"Total detected intensity: {result.detected.total_intensity:.3e}")
>>> times = result.detected.times
>>> positions = result.detected.positions
>>>
>>> # Access surface hits (if track_surface_hits=True)
>>> if result.surface_hits:
... for name, records in result.surface_hits.items():
... for rec in records:
... print(f"{name} bounce {rec.bounce}: {rec.num_hits} hits")
"""
detected: "DetectorResult"
remaining: "RayBatch"
statistics: SimulationStatistics
detections_per_surface: dict[str, int] = field(default_factory=dict)
surface_hits: dict[str, list[SurfaceHitRecord]] | None = None
@property
def num_detected(self) -> int:
"""Number of rays that hit detector surfaces."""
return self.detected.num_rays
@property
def num_remaining(self) -> int:
"""Number of rays still active."""
return self.remaining.num_rays
@property
def bounces(self) -> int:
"""Number of bounce iterations performed (backwards compatibility)."""
return self.statistics.bounces_completed
@property
def total_rays_processed(self) -> int:
"""Total rays processed including splits (backwards compatibility)."""
return self.statistics.total_rays_created
[docs]
def get_surface_hit_positions(self, surface_name: str) -> npt.NDArray[np.float32]:
"""
Get all hit positions for a specific surface across all bounces.
Parameters
----------
surface_name : str
Name of the surface.
Returns
-------
ndarray, shape (N, 3)
Concatenated hit positions from all bounces. Empty array if no hits
or if track_surface_hits was not enabled.
"""
if self.surface_hits is None or surface_name not in self.surface_hits:
return np.empty((0, 3), dtype=np.float32)
records = self.surface_hits[surface_name]
if not records:
return np.empty((0, 3), dtype=np.float32)
return np.concatenate([r.positions for r in records], axis=0)
@dataclass
class SurfaceHitStats:
"""
Statistics about surface hits during a simulation.
Attributes
----------
surface_name : str
Name of the surface.
hit_count : int
Number of rays that hit this surface.
mean_intensity : float
Mean intensity of rays hitting this surface.
total_intensity : float
Sum of intensities of rays hitting this surface.
"""
surface_name: str
hit_count: int
mean_intensity: float
total_intensity: float
def merge_detector_results(result_list: list) -> "DetectorResult":
"""
Merge multiple DetectorResult objects into a single instance.
Parameters
----------
result_list : list of DetectorResult
List of detector results to merge.
Returns
-------
DetectorResult
Combined detector results.
"""
from ..detectors.results import DetectorResult
return DetectorResult.merge(result_list)
# Backward compatibility: keep merge_recorded_rays as alias
def merge_recorded_rays(recorded_list: list) -> "DetectorResult":
"""
Merge multiple DetectorResult/RecordedRays into a single instance.
This function is provided for backward compatibility. New code should
use DetectorResult.merge() directly.
Parameters
----------
recorded_list : list
List of DetectorResult or RecordedRays to merge.
Returns
-------
DetectorResult
Combined results.
"""
from ..detectors.results import DetectorResult
if not recorded_list:
return DetectorResult.empty()
# Check if we have RecordedRays (old format) or DetectorResult (new format)
first = recorded_list[0]
if hasattr(first, "to_detection_events"):
# Already DetectorResult
return DetectorResult.merge(recorded_list)
# Convert RecordedRays to DetectorResult
converted = []
for r in recorded_list:
if r.num_rays > 0:
converted.append(DetectorResult.from_recorded_rays(r))
return DetectorResult.merge(converted)