# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Ray Propagation Engine with GPU Acceleration
This module implements the core ray propagation algorithms including both
straight-line propagation (for homogeneous media) and curved-path propagation
(for gradient media).
"""
import math
from enum import Enum
from typing import Protocol
import numpy as np
import numpy.typing as npt
# GPU support is optional
try:
from numba import cuda
HAS_CUDA = True
except ImportError:
class _FakeCuda:
"""Fake cuda module for when numba is not installed."""
@staticmethod
def jit(*args, **kwargs):
"""Return a no-op decorator."""
def decorator(func):
return func
# Handle both @cuda.jit and @cuda.jit(device=True)
if args and callable(args[0]):
return args[0]
return decorator
@staticmethod
def is_available():
return False
@staticmethod
def grid(n):
return 0
@staticmethod
def synchronize():
pass
cuda = _FakeCuda() # type: ignore[assignment]
HAS_CUDA = False
from ..materials.base import MaterialField
from .ray_data import Float32Array, RayBatch
[docs]
class PropagationMode(Enum):
"""
Ray propagation mode selector.
Attributes
----------
STRAIGHT_LINE : int
Use straight-line propagation (homogeneous media)
CURVED_PATH : int
Use ray equation integration (gradient media)
ADAPTIVE : int
Automatically select based on gradient magnitude
"""
STRAIGHT_LINE = 0
CURVED_PATH = 1
ADAPTIVE = 2
[docs]
class IPropagator(Protocol):
"""
Protocol (interface) for ray propagation implementations.
All propagators must implement this interface to ensure compatibility
with the simulation framework.
"""
[docs]
def propagate_step(
self,
rays: RayBatch,
step_size: float,
material: MaterialField,
) -> None:
"""
Propagate rays forward by one step.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
step_size : float
Step size in meters
material : MaterialField
Material field defining refractive index
"""
...
[docs]
def compute_gradient_threshold(self, wavelength: float) -> float:
"""
Compute threshold for switching between propagation modes.
Parameters
----------
wavelength : float
Wavelength in meters
Returns
-------
float
Gradient magnitude threshold in m^-1
"""
...
[docs]
class PropagationConfig:
"""
Configuration for ray propagation.
Attributes
----------
mode : PropagationMode
Propagation mode
max_step_size : float
Maximum step size in meters
min_step_size : float
Minimum step size in meters
gradient_threshold : float
Threshold for adaptive mode switching (m^-1)
adaptive_stepping : bool
Enable adaptive step size based on gradient
rk4_integration : bool
Use RK4 instead of Euler for curved paths
Notes
-----
For gradient media, step size should satisfy:
Δs ≪ R_c where R_c = n/|∇n| is the radius of curvature
"""
[docs]
def __init__(
self,
mode: PropagationMode = PropagationMode.ADAPTIVE,
max_step_size: float = 1e-3,
min_step_size: float = 1e-6,
gradient_threshold: float = 1e-6,
adaptive_stepping: bool = True,
rk4_integration: bool = True,
):
"""
Initialize propagation configuration.
Parameters
----------
mode : PropagationMode, optional
Propagation mode, default ADAPTIVE
max_step_size : float, optional
Maximum step size in meters, default 1 mm
min_step_size : float, optional
Minimum step size in meters, default 1 μm
gradient_threshold : float, optional
Gradient threshold in m^-1, default 1e-6
adaptive_stepping : bool, optional
Enable adaptive stepping, default True
rk4_integration : bool, optional
Use RK4 integration, default True
"""
self.mode = mode
self.max_step_size = max_step_size
self.min_step_size = min_step_size
self.gradient_threshold = gradient_threshold
self.adaptive_stepping = adaptive_stepping
self.rk4_integration = rk4_integration
[docs]
def compute_step_size(
self, gradient_magnitude: float, refractive_index: float, wavelength: float
) -> float:
"""
Compute adaptive step size based on local conditions.
Parameters
----------
gradient_magnitude : float
|∇n| at current position (m^-1)
refractive_index : float
n at current position
wavelength : float
Wavelength in meters
Returns
-------
float
Recommended step size in meters
Notes
-----
Step size is chosen to resolve ray curvature:
Δs ≤ min(R_c/10, λ_medium) where R_c = n/|∇n|
"""
if not self.adaptive_stepping:
return self.max_step_size
# Radius of curvature
if gradient_magnitude > 1e-12:
radius_of_curvature = refractive_index / gradient_magnitude
step_from_curvature = radius_of_curvature / 10.0
else:
step_from_curvature = self.max_step_size
# Wavelength in medium
wavelength_medium = wavelength / refractive_index
# Take minimum, but clamp to configured limits
step_size = min(step_from_curvature, wavelength_medium)
step_size = np.clip(step_size, self.min_step_size, self.max_step_size)
return float(step_size)
# GPU device functions for ray propagation
# These are defined even when CUDA is not available (decorators become no-ops)
@cuda.jit(device=True)
def device_propagate_straight_line(
x: float, y: float, z: float, dx: float, dy: float, dz: float, step_size: float
) -> tuple[float, float, float]:
"""
Propagate ray in straight line on GPU device.
Parameters
----------
x, y, z : float
Current position
dx, dy, dz : float
Direction vector (must be normalized)
step_size : float
Distance to propagate
Returns
-------
tuple of float
New position (x', y', z')
"""
new_x = x + dx * step_size
new_y = y + dy * step_size
new_z = z + dz * step_size
return new_x, new_y, new_z
@cuda.jit(device=True)
def device_compute_ray_curvature(
n: float,
grad_n_x: float,
grad_n_y: float,
grad_n_z: float,
dx: float,
dy: float,
dz: float,
) -> tuple[float, float, float]:
"""
Compute ray curvature vector on GPU device.
The curvature vector determines how the ray direction changes:
κ = (∇n - (d̂·∇n)d̂) / n
Parameters
----------
n : float
Refractive index at current position
grad_n_x, grad_n_y, grad_n_z : float
Gradient of refractive index ∇n
dx, dy, dz : float
Current ray direction (unit vector)
Returns
-------
tuple of float
Curvature vector (κx, κy, κz) in m^-1
References
----------
.. [1] Born, M., & Wolf, E. (1999). Principles of Optics. Section 3.1.
"""
# Project gradient onto direction: d̂·∇n
dot_product = dx * grad_n_x + dy * grad_n_y + dz * grad_n_z
# Perpendicular component: ∇n - (d̂·∇n)d̂
perp_x = grad_n_x - dot_product * dx
perp_y = grad_n_y - dot_product * dy
perp_z = grad_n_z - dot_product * dz
# Curvature: κ = ∇n_perp / n
kappa_x = perp_x / n
kappa_y = perp_y / n
kappa_z = perp_z / n
return kappa_x, kappa_y, kappa_z
@cuda.jit(device=True)
def device_euler_step_gradient(
x: float,
y: float,
z: float,
dx: float,
dy: float,
dz: float,
n: float,
grad_n_x: float,
grad_n_y: float,
grad_n_z: float,
step_size: float,
) -> tuple[float, float, float, float, float, float]:
"""
Single Euler step for ray in gradient medium on GPU device.
Updates position and direction according to:
dr/ds = d̂
dd̂/ds = κ = ∇n_perp / n
Parameters
----------
x, y, z : float
Current position
dx, dy, dz : float
Current direction (unit vector)
n : float
Refractive index at current position
grad_n_x, grad_n_y, grad_n_z : float
Gradient ∇n at current position
step_size : float
Integration step size
Returns
-------
tuple of float
(new_x, new_y, new_z, new_dx, new_dy, new_dz)
"""
# Compute curvature
kappa_x, kappa_y, kappa_z = device_compute_ray_curvature(
n, grad_n_x, grad_n_y, grad_n_z, dx, dy, dz
)
# Update position: r' = r + d̂ * Δs
new_x = x + dx * step_size
new_y = y + dy * step_size
new_z = z + dz * step_size
# Update direction: d̂' = d̂ + κ * Δs
new_dx = dx + kappa_x * step_size
new_dy = dy + kappa_y * step_size
new_dz = dz + kappa_z * step_size
# Renormalize direction
norm = math.sqrt(new_dx**2 + new_dy**2 + new_dz**2)
if norm > 1e-12:
new_dx /= norm
new_dy /= norm
new_dz /= norm
return new_x, new_y, new_z, new_dx, new_dy, new_dz
@cuda.jit(device=True)
def device_normalize_vector(
vx: float, vy: float, vz: float
) -> tuple[float, float, float]:
"""
Normalize a 3D vector on GPU device.
Parameters
----------
vx, vy, vz : float
Vector components
Returns
-------
tuple of float
Normalized vector (or original if norm too small)
"""
norm = math.sqrt(vx**2 + vy**2 + vz**2)
if norm < 1e-12:
return vx, vy, vz
return vx / norm, vy / norm, vz / norm
@cuda.jit
def kernel_propagate_straight_line(
positions: Float32Array,
directions: Float32Array,
active: npt.NDArray[np.bool_],
geometric_path_lengths: Float32Array,
step_size: float,
) -> None:
"""
GPU kernel for straight-line ray propagation.
Parameters
----------
positions : Float32Array
Ray positions array (N, 3), modified in-place
directions : Float32Array
Ray directions array (N, 3)
active : BoolArray
Active ray mask (N,)
geometric_path_lengths : Float32Array
Geometric path lengths (N,), modified in-place
step_size : float
Distance to propagate
"""
idx = cuda.grid(1)
if idx >= positions.shape[0]:
return
if not active[idx]:
return
# Read current state
x = positions[idx, 0]
y = positions[idx, 1]
z = positions[idx, 2]
dx = directions[idx, 0]
dy = directions[idx, 1]
dz = directions[idx, 2]
# Propagate
new_x, new_y, new_z = device_propagate_straight_line(x, y, z, dx, dy, dz, step_size)
# Write new position
positions[idx, 0] = new_x
positions[idx, 1] = new_y
positions[idx, 2] = new_z
# Update path length
geometric_path_lengths[idx] += step_size
def propagate_straight_line_cpu(rays: RayBatch, step_size: float) -> None:
"""
Propagate rays in straight lines using CPU.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
step_size : float
Distance to propagate in meters
Notes
-----
This function is appropriate for homogeneous media only.
"""
# Only process active rays
active_mask = rays.active
# Update positions: pos += step_size * direction
rays.positions[active_mask] += step_size * rays.directions[active_mask]
# Update geometric path lengths
rays.geometric_path_lengths[active_mask] += step_size
def propagate_straight_line_gpu(
rays: RayBatch, step_size: float, threads_per_block: int = 256
) -> None:
"""
Propagate rays in straight lines using GPU.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
step_size : float
Distance to propagate in meters
threads_per_block : int, optional
CUDA threads per block, default 256
Notes
-----
This function is appropriate for homogeneous media only.
Use propagate_gradient_gpu for inhomogeneous media.
"""
if not cuda.is_available():
raise RuntimeError("CUDA is not available")
n_rays = rays.num_rays
blocks_per_grid = (n_rays + threads_per_block - 1) // threads_per_block
# Launch kernel
kernel_propagate_straight_line[blocks_per_grid, threads_per_block](
rays.positions,
rays.directions,
rays.active,
rays.geometric_path_lengths,
step_size,
)
cuda.synchronize()
[docs]
class StraightLinePropagator:
"""
Propagator for homogeneous media using straight-line propagation.
This is the simplest and fastest propagation mode, appropriate when
the refractive index gradient is negligible.
Automatically falls back to CPU if CUDA is not available.
"""
[docs]
def __init__(self, threads_per_block: int = 256, prefer_gpu: bool = True):
"""
Initialize straight-line propagator.
Parameters
----------
threads_per_block : int, optional
CUDA threads per block, default 256
prefer_gpu : bool, optional
If True, use GPU when available. If False, always use CPU.
"""
self.threads_per_block = threads_per_block
self._use_gpu = prefer_gpu and cuda.is_available()
[docs]
def propagate_step(
self,
rays: RayBatch,
step_size: float,
material: MaterialField,
) -> None:
"""
Propagate rays in straight lines.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
step_size : float
Step size in meters
material : MaterialField
Material field (not used for straight-line propagation)
"""
if self._use_gpu:
propagate_straight_line_gpu(rays, step_size, self.threads_per_block)
else:
propagate_straight_line_cpu(rays, step_size)
[docs]
def compute_gradient_threshold(self, wavelength: float) -> float:
"""
Compute gradient threshold.
Parameters
----------
wavelength : float
Wavelength in meters
Returns
-------
float
Threshold value (infinity for straight-line only mode)
"""
return float("inf") # Always use straight line