Source code for lsurf.simulation.result

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Simulation Result Data Structures

Contains the result dataclasses returned by the Simulation/Orchestrator classes.
"""

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

import numpy as np
import numpy.typing as npt

if TYPE_CHECKING:
    from ..detectors.results import DetectorResult
    from ..utilities.ray_data import RayBatch


@dataclass
class AdaptiveStepStats:
    """
    Statistics about adaptive step sizes used during propagation.

    Collects per-ray min/max/count from GPU adaptive kernels and
    provides aggregate summaries.

    Attributes
    ----------
    step_min : ndarray, shape (N,)
        Minimum step size used by each ray (meters).
    step_max : ndarray, shape (N,)
        Maximum step size used by each ray (meters).
    step_count : ndarray, shape (N,)
        Number of adaptive steps taken by each ray.
    total_distance : ndarray, shape (N,)
        Total geometric path length per ray (meters), for computing mean step.
    """

    step_min: npt.NDArray[np.float32]
    step_max: npt.NDArray[np.float32]
    step_count: npt.NDArray[np.float32]
    total_distance: npt.NDArray[np.float32]

    @property
    def global_min(self) -> float:
        """Smallest step size across all rays."""
        valid = self.step_count > 0
        if not np.any(valid):
            return 0.0
        return float(np.min(self.step_min[valid]))

    @property
    def global_max(self) -> float:
        """Largest step size across all rays."""
        valid = self.step_count > 0
        if not np.any(valid):
            return 0.0
        return float(np.max(self.step_max[valid]))

    @property
    def total_steps(self) -> int:
        """Total number of adaptive steps across all rays."""
        return int(np.sum(self.step_count))

    @property
    def mean_step(self) -> float:
        """Mean step size (total distance / total steps)."""
        total = float(np.sum(self.total_distance))
        count = float(np.sum(self.step_count))
        if count == 0:
            return 0.0
        return total / count

    def summary(self) -> str:
        """Return a human-readable summary string."""
        valid = self.step_count > 0
        n_active = int(np.sum(valid))
        if n_active == 0:
            return "AdaptiveStepStats: no active rays"
        per_ray_mean = np.where(
            self.step_count > 0,
            self.total_distance / self.step_count,
            0.0,
        )
        return (
            f"AdaptiveStepStats: {n_active} rays, "
            f"{self.total_steps} total steps\n"
            f"  step size: min={self.global_min:.4g} m, "
            f"max={self.global_max:.4g} m, "
            f"mean={self.mean_step:.4g} m\n"
            f"  per-ray steps: min={int(np.min(self.step_count[valid]))}, "
            f"max={int(np.max(self.step_count[valid]))}, "
            f"mean={np.mean(self.step_count[valid]):.1f}\n"
            f"  per-ray mean step: min={np.min(per_ray_mean[valid]):.4g} m, "
            f"max={np.max(per_ray_mean[valid]):.4g} m"
        )

    @staticmethod
    def merge(stats_list: list["AdaptiveStepStats"]) -> "AdaptiveStepStats":
        """Merge multiple AdaptiveStepStats (e.g. from multiple bounces)."""
        if not stats_list:
            empty = np.empty(0, dtype=np.float32)
            return AdaptiveStepStats(empty, empty, empty, empty)

        # Concatenate per-ray arrays
        return AdaptiveStepStats(
            step_min=np.concatenate([s.step_min for s in stats_list]),
            step_max=np.concatenate([s.step_max for s in stats_list]),
            step_count=np.concatenate([s.step_count for s in stats_list]),
            total_distance=np.concatenate([s.total_distance for s in stats_list]),
        )


@dataclass
class SurfaceHitRecord:
    """
    Record of ray hits on a surface.

    Stores positions and directions of rays that hit a specific surface,
    useful for visualization and analysis of intermediate surface interactions.

    Attributes
    ----------
    surface_name : str
        Name of the surface.
    positions : ndarray, shape (N, 3)
        Hit positions in world coordinates.
    directions : ndarray, shape (N, 3)
        Ray directions at hit points.
    intensities : ndarray, shape (N,)
        Ray intensities at hit.
    wavelengths : ndarray, shape (N,)
        Ray wavelengths.
    bounce : int
        Which bounce iteration this hit occurred on (0-indexed).
    """

    surface_name: str
    positions: npt.NDArray[np.float32]
    directions: npt.NDArray[np.float32]
    intensities: npt.NDArray[np.float32]
    wavelengths: npt.NDArray[np.float32]
    bounce: int

    @property
    def num_hits(self) -> int:
        """Number of hits recorded."""
        return len(self.positions)

    def __repr__(self) -> str:
        return (
            f"SurfaceHitRecord(surface_name={self.surface_name!r}, "
            f"num_hits={self.num_hits}, bounce={self.bounce})"
        )


[docs] @dataclass class SimulationStatistics: """ Statistics from a simulation run. Attributes ---------- total_rays_initial : int Number of rays at simulation start. total_rays_created : int Total rays including split rays. rays_detected : int Rays that hit detector surfaces. rays_absorbed : int Rays terminated by absorber surfaces. rays_terminated_intensity : int Rays terminated due to low intensity. rays_terminated_bounds : int Rays that exited bounding sphere. rays_terminated_max_bounces : int Rays that reached max bounce limit. bounces_completed : int Number of bounce iterations completed. max_depth_reached : int Maximum tree depth from ray splitting. """ total_rays_initial: int = 0 total_rays_created: int = 0 rays_detected: int = 0 rays_absorbed: int = 0 rays_terminated_intensity: int = 0 rays_terminated_bounds: int = 0 rays_terminated_max_bounces: int = 0 bounces_completed: int = 0 max_depth_reached: int = 0 adaptive_step_stats: AdaptiveStepStats | None = None
[docs] @dataclass class SimulationResult: """ Complete result from a ray tracing simulation. Attributes ---------- detected : DetectorResult Rays that were recorded by detector surfaces. remaining : RayBatch Rays that are still active after simulation completed (either didn't hit any surface or exceeded max bounces). statistics : SimulationStatistics Detailed simulation statistics. detections_per_surface : dict Number of detections per detector surface name. surface_hits : dict or None If track_surface_hits was enabled, contains SurfaceHitRecord objects for each optical surface, keyed by surface name. Each surface may have multiple records (one per bounce). If disabled, this is None. Examples -------- >>> result = sim.run(rays) >>> print(f"Detected {result.statistics.rays_detected} rays") >>> print(f"Completed in {result.statistics.bounces_completed} bounces") >>> >>> # Access detection results >>> print(f"Total detected intensity: {result.detected.total_intensity:.3e}") >>> times = result.detected.times >>> positions = result.detected.positions >>> >>> # Access surface hits (if track_surface_hits=True) >>> if result.surface_hits: ... for name, records in result.surface_hits.items(): ... for rec in records: ... print(f"{name} bounce {rec.bounce}: {rec.num_hits} hits") """ detected: "DetectorResult" remaining: "RayBatch" statistics: SimulationStatistics detections_per_surface: dict[str, int] = field(default_factory=dict) surface_hits: dict[str, list[SurfaceHitRecord]] | None = None @property def num_detected(self) -> int: """Number of rays that hit detector surfaces.""" return self.detected.num_rays @property def num_remaining(self) -> int: """Number of rays still active.""" return self.remaining.num_rays @property def bounces(self) -> int: """Number of bounce iterations performed (backwards compatibility).""" return self.statistics.bounces_completed @property def total_rays_processed(self) -> int: """Total rays processed including splits (backwards compatibility).""" return self.statistics.total_rays_created
[docs] def get_surface_hit_positions(self, surface_name: str) -> npt.NDArray[np.float32]: """ Get all hit positions for a specific surface across all bounces. Parameters ---------- surface_name : str Name of the surface. Returns ------- ndarray, shape (N, 3) Concatenated hit positions from all bounces. Empty array if no hits or if track_surface_hits was not enabled. """ if self.surface_hits is None or surface_name not in self.surface_hits: return np.empty((0, 3), dtype=np.float32) records = self.surface_hits[surface_name] if not records: return np.empty((0, 3), dtype=np.float32) return np.concatenate([r.positions for r in records], axis=0)
@dataclass class SurfaceHitStats: """ Statistics about surface hits during a simulation. Attributes ---------- surface_name : str Name of the surface. hit_count : int Number of rays that hit this surface. mean_intensity : float Mean intensity of rays hitting this surface. total_intensity : float Sum of intensities of rays hitting this surface. """ surface_name: str hit_count: int mean_intensity: float total_intensity: float def merge_detector_results(result_list: list) -> "DetectorResult": """ Merge multiple DetectorResult objects into a single instance. Parameters ---------- result_list : list of DetectorResult List of detector results to merge. Returns ------- DetectorResult Combined detector results. """ from ..detectors.results import DetectorResult return DetectorResult.merge(result_list) # Backward compatibility: keep merge_recorded_rays as alias def merge_recorded_rays(recorded_list: list) -> "DetectorResult": """ Merge multiple DetectorResult/RecordedRays into a single instance. This function is provided for backward compatibility. New code should use DetectorResult.merge() directly. Parameters ---------- recorded_list : list List of DetectorResult or RecordedRays to merge. Returns ------- DetectorResult Combined results. """ from ..detectors.results import DetectorResult if not recorded_list: return DetectorResult.empty() # Check if we have RecordedRays (old format) or DetectorResult (new format) first = recorded_list[0] if hasattr(first, "to_detection_events"): # Already DetectorResult return DetectorResult.merge(recorded_list) # Convert RecordedRays to DetectorResult converted = [] for r in recorded_list: if r.num_rays > 0: converted.append(DetectorResult.from_recorded_rays(r)) return DetectorResult.merge(converted)