Source code for lsurf.utilities.propagation

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Ray Propagation Engine with GPU Acceleration

This module implements the core ray propagation algorithms including both
straight-line propagation (for homogeneous media) and curved-path propagation
(for gradient media).
"""

import math
from enum import Enum
from typing import Protocol

import numpy as np
import numpy.typing as npt

# GPU support is optional
try:
    from numba import cuda

    HAS_CUDA = True
except ImportError:

    class _FakeCuda:
        """Fake cuda module for when numba is not installed."""

        @staticmethod
        def jit(*args, **kwargs):
            """Return a no-op decorator."""

            def decorator(func):
                return func

            # Handle both @cuda.jit and @cuda.jit(device=True)
            if args and callable(args[0]):
                return args[0]
            return decorator

        @staticmethod
        def is_available():
            return False

        @staticmethod
        def grid(n):
            return 0

        @staticmethod
        def synchronize():
            pass

    cuda = _FakeCuda()  # type: ignore[assignment]
    HAS_CUDA = False

from ..materials.base import MaterialField
from .ray_data import Float32Array, RayBatch


[docs] class PropagationMode(Enum): """ Ray propagation mode selector. Attributes ---------- STRAIGHT_LINE : int Use straight-line propagation (homogeneous media) CURVED_PATH : int Use ray equation integration (gradient media) ADAPTIVE : int Automatically select based on gradient magnitude """ STRAIGHT_LINE = 0 CURVED_PATH = 1 ADAPTIVE = 2
[docs] class IPropagator(Protocol): """ Protocol (interface) for ray propagation implementations. All propagators must implement this interface to ensure compatibility with the simulation framework. """
[docs] def propagate_step( self, rays: RayBatch, step_size: float, material: MaterialField, ) -> None: """ Propagate rays forward by one step. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) step_size : float Step size in meters material : MaterialField Material field defining refractive index """ ...
[docs] def compute_gradient_threshold(self, wavelength: float) -> float: """ Compute threshold for switching between propagation modes. Parameters ---------- wavelength : float Wavelength in meters Returns ------- float Gradient magnitude threshold in m^-1 """ ...
[docs] class PropagationConfig: """ Configuration for ray propagation. Attributes ---------- mode : PropagationMode Propagation mode max_step_size : float Maximum step size in meters min_step_size : float Minimum step size in meters gradient_threshold : float Threshold for adaptive mode switching (m^-1) adaptive_stepping : bool Enable adaptive step size based on gradient rk4_integration : bool Use RK4 instead of Euler for curved paths Notes ----- For gradient media, step size should satisfy: Δs ≪ R_c where R_c = n/|∇n| is the radius of curvature """
[docs] def __init__( self, mode: PropagationMode = PropagationMode.ADAPTIVE, max_step_size: float = 1e-3, min_step_size: float = 1e-6, gradient_threshold: float = 1e-6, adaptive_stepping: bool = True, rk4_integration: bool = True, ): """ Initialize propagation configuration. Parameters ---------- mode : PropagationMode, optional Propagation mode, default ADAPTIVE max_step_size : float, optional Maximum step size in meters, default 1 mm min_step_size : float, optional Minimum step size in meters, default 1 μm gradient_threshold : float, optional Gradient threshold in m^-1, default 1e-6 adaptive_stepping : bool, optional Enable adaptive stepping, default True rk4_integration : bool, optional Use RK4 integration, default True """ self.mode = mode self.max_step_size = max_step_size self.min_step_size = min_step_size self.gradient_threshold = gradient_threshold self.adaptive_stepping = adaptive_stepping self.rk4_integration = rk4_integration
[docs] def compute_step_size( self, gradient_magnitude: float, refractive_index: float, wavelength: float ) -> float: """ Compute adaptive step size based on local conditions. Parameters ---------- gradient_magnitude : float |∇n| at current position (m^-1) refractive_index : float n at current position wavelength : float Wavelength in meters Returns ------- float Recommended step size in meters Notes ----- Step size is chosen to resolve ray curvature: Δs ≤ min(R_c/10, λ_medium) where R_c = n/|∇n| """ if not self.adaptive_stepping: return self.max_step_size # Radius of curvature if gradient_magnitude > 1e-12: radius_of_curvature = refractive_index / gradient_magnitude step_from_curvature = radius_of_curvature / 10.0 else: step_from_curvature = self.max_step_size # Wavelength in medium wavelength_medium = wavelength / refractive_index # Take minimum, but clamp to configured limits step_size = min(step_from_curvature, wavelength_medium) step_size = np.clip(step_size, self.min_step_size, self.max_step_size) return float(step_size)
# GPU device functions for ray propagation # These are defined even when CUDA is not available (decorators become no-ops) @cuda.jit(device=True) def device_propagate_straight_line( x: float, y: float, z: float, dx: float, dy: float, dz: float, step_size: float ) -> tuple[float, float, float]: """ Propagate ray in straight line on GPU device. Parameters ---------- x, y, z : float Current position dx, dy, dz : float Direction vector (must be normalized) step_size : float Distance to propagate Returns ------- tuple of float New position (x', y', z') """ new_x = x + dx * step_size new_y = y + dy * step_size new_z = z + dz * step_size return new_x, new_y, new_z @cuda.jit(device=True) def device_compute_ray_curvature( n: float, grad_n_x: float, grad_n_y: float, grad_n_z: float, dx: float, dy: float, dz: float, ) -> tuple[float, float, float]: """ Compute ray curvature vector on GPU device. The curvature vector determines how the ray direction changes: κ = (∇n - (d̂·∇n)d̂) / n Parameters ---------- n : float Refractive index at current position grad_n_x, grad_n_y, grad_n_z : float Gradient of refractive index ∇n dx, dy, dz : float Current ray direction (unit vector) Returns ------- tuple of float Curvature vector (κx, κy, κz) in m^-1 References ---------- .. [1] Born, M., & Wolf, E. (1999). Principles of Optics. Section 3.1. """ # Project gradient onto direction: d̂·∇n dot_product = dx * grad_n_x + dy * grad_n_y + dz * grad_n_z # Perpendicular component: ∇n - (d̂·∇n)d̂ perp_x = grad_n_x - dot_product * dx perp_y = grad_n_y - dot_product * dy perp_z = grad_n_z - dot_product * dz # Curvature: κ = ∇n_perp / n kappa_x = perp_x / n kappa_y = perp_y / n kappa_z = perp_z / n return kappa_x, kappa_y, kappa_z @cuda.jit(device=True) def device_euler_step_gradient( x: float, y: float, z: float, dx: float, dy: float, dz: float, n: float, grad_n_x: float, grad_n_y: float, grad_n_z: float, step_size: float, ) -> tuple[float, float, float, float, float, float]: """ Single Euler step for ray in gradient medium on GPU device. Updates position and direction according to: dr/ds = d̂ dd̂/ds = κ = ∇n_perp / n Parameters ---------- x, y, z : float Current position dx, dy, dz : float Current direction (unit vector) n : float Refractive index at current position grad_n_x, grad_n_y, grad_n_z : float Gradient ∇n at current position step_size : float Integration step size Returns ------- tuple of float (new_x, new_y, new_z, new_dx, new_dy, new_dz) """ # Compute curvature kappa_x, kappa_y, kappa_z = device_compute_ray_curvature( n, grad_n_x, grad_n_y, grad_n_z, dx, dy, dz ) # Update position: r' = r + d̂ * Δs new_x = x + dx * step_size new_y = y + dy * step_size new_z = z + dz * step_size # Update direction: d̂' = d̂ + κ * Δs new_dx = dx + kappa_x * step_size new_dy = dy + kappa_y * step_size new_dz = dz + kappa_z * step_size # Renormalize direction norm = math.sqrt(new_dx**2 + new_dy**2 + new_dz**2) if norm > 1e-12: new_dx /= norm new_dy /= norm new_dz /= norm return new_x, new_y, new_z, new_dx, new_dy, new_dz @cuda.jit(device=True) def device_normalize_vector( vx: float, vy: float, vz: float ) -> tuple[float, float, float]: """ Normalize a 3D vector on GPU device. Parameters ---------- vx, vy, vz : float Vector components Returns ------- tuple of float Normalized vector (or original if norm too small) """ norm = math.sqrt(vx**2 + vy**2 + vz**2) if norm < 1e-12: return vx, vy, vz return vx / norm, vy / norm, vz / norm @cuda.jit def kernel_propagate_straight_line( positions: Float32Array, directions: Float32Array, active: npt.NDArray[np.bool_], geometric_path_lengths: Float32Array, step_size: float, ) -> None: """ GPU kernel for straight-line ray propagation. Parameters ---------- positions : Float32Array Ray positions array (N, 3), modified in-place directions : Float32Array Ray directions array (N, 3) active : BoolArray Active ray mask (N,) geometric_path_lengths : Float32Array Geometric path lengths (N,), modified in-place step_size : float Distance to propagate """ idx = cuda.grid(1) if idx >= positions.shape[0]: return if not active[idx]: return # Read current state x = positions[idx, 0] y = positions[idx, 1] z = positions[idx, 2] dx = directions[idx, 0] dy = directions[idx, 1] dz = directions[idx, 2] # Propagate new_x, new_y, new_z = device_propagate_straight_line(x, y, z, dx, dy, dz, step_size) # Write new position positions[idx, 0] = new_x positions[idx, 1] = new_y positions[idx, 2] = new_z # Update path length geometric_path_lengths[idx] += step_size def propagate_straight_line_cpu(rays: RayBatch, step_size: float) -> None: """ Propagate rays in straight lines using CPU. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) step_size : float Distance to propagate in meters Notes ----- This function is appropriate for homogeneous media only. """ # Only process active rays active_mask = rays.active # Update positions: pos += step_size * direction rays.positions[active_mask] += step_size * rays.directions[active_mask] # Update geometric path lengths rays.geometric_path_lengths[active_mask] += step_size def propagate_straight_line_gpu( rays: RayBatch, step_size: float, threads_per_block: int = 256 ) -> None: """ Propagate rays in straight lines using GPU. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) step_size : float Distance to propagate in meters threads_per_block : int, optional CUDA threads per block, default 256 Notes ----- This function is appropriate for homogeneous media only. Use propagate_gradient_gpu for inhomogeneous media. """ if not cuda.is_available(): raise RuntimeError("CUDA is not available") n_rays = rays.num_rays blocks_per_grid = (n_rays + threads_per_block - 1) // threads_per_block # Launch kernel kernel_propagate_straight_line[blocks_per_grid, threads_per_block]( rays.positions, rays.directions, rays.active, rays.geometric_path_lengths, step_size, ) cuda.synchronize()
[docs] class StraightLinePropagator: """ Propagator for homogeneous media using straight-line propagation. This is the simplest and fastest propagation mode, appropriate when the refractive index gradient is negligible. Automatically falls back to CPU if CUDA is not available. """
[docs] def __init__(self, threads_per_block: int = 256, prefer_gpu: bool = True): """ Initialize straight-line propagator. Parameters ---------- threads_per_block : int, optional CUDA threads per block, default 256 prefer_gpu : bool, optional If True, use GPU when available. If False, always use CPU. """ self.threads_per_block = threads_per_block self._use_gpu = prefer_gpu and cuda.is_available()
[docs] def propagate_step( self, rays: RayBatch, step_size: float, material: MaterialField, ) -> None: """ Propagate rays in straight lines. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) step_size : float Step size in meters material : MaterialField Material field (not used for straight-line propagation) """ if self._use_gpu: propagate_straight_line_gpu(rays, step_size, self.threads_per_block) else: propagate_straight_line_cpu(rays, step_size)
[docs] def compute_gradient_threshold(self, wavelength: float) -> float: """ Compute gradient threshold. Parameters ---------- wavelength : float Wavelength in meters Returns ------- float Threshold value (infinity for straight-line only mode) """ return float("inf") # Always use straight line