Source code for lsurf.propagation.propagators.gradient

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
CPU-based Gradient Propagator for inhomogeneous media.

This module provides a propagator for materials with spatially-varying
refractive indices, using vectorized NumPy operations for efficiency.
"""

import numpy as np
import numpy.typing as npt

from ..propagator_protocol import MaterialFieldProtocol


[docs] class GradientPropagator: """ Propagator for inhomogeneous media using ray equation integration. This propagator handles materials with spatially-varying refractive indices by numerically integrating the ray equation (eikonal equation). Supports both Euler and RK4 integration methods. Attributes ---------- use_rk4 : bool If True, use RK4 integration; otherwise use Euler adaptive_stepping : bool If True, adapt step size to local gradient min_step_size : float Minimum step size in meters max_step_size : float Maximum step size in meters threads_per_block : int CUDA threads per block (legacy, not used for CPU) References ---------- .. [1] Born, M., & Wolf, E. (1999). Principles of Optics. Section 3.1. .. [2] Sharma, A., Kumar, D. V., & Ghatak, A. K. (1982). Tracing rays through graded-index media: a new method. Applied Optics, 21(6), 984-987. """
[docs] def __init__( self, use_rk4: bool | None = None, method: str | None = None, adaptive_stepping: bool = True, min_step_size: float = 1e-6, max_step_size: float = 1e-3, threads_per_block: int = 256, ): """ Initialize gradient propagator. Parameters ---------- use_rk4 : bool, optional Use RK4 instead of Euler (deprecated, use method instead) method : str, optional Integration method: "euler" or "rk4", default "rk4" adaptive_stepping : bool, optional Enable adaptive step sizing, default True min_step_size : float, optional Minimum step size in meters, default 1 μm max_step_size : float, optional Maximum step size in meters, default 1 mm threads_per_block : int, optional CUDA threads per block (legacy), default 256 """ # Handle both old (use_rk4) and new (method) parameter styles if method is not None: if method.lower() == "rk4": self.use_rk4 = True elif method.lower() == "euler": self.use_rk4 = False else: raise ValueError(f"Unknown method '{method}'. Use 'euler' or 'rk4'.") elif use_rk4 is not None: self.use_rk4 = use_rk4 else: self.use_rk4 = True # Default to RK4 self.adaptive_stepping = adaptive_stepping self.min_step_size = min_step_size self.max_step_size = max_step_size self.threads_per_block = threads_per_block
[docs] def compute_gradient_threshold(self, wavelength: float) -> float: """ Compute gradient threshold for switching to curved-path mode. Parameters ---------- wavelength : float Wavelength in meters Returns ------- float Gradient magnitude threshold in m^-1 Notes ----- Threshold is chosen such that ray curvature becomes significant over a distance of ~wavelength: |∇n|/n > 1/λ """ return 1.0 / wavelength
[docs] def propagate_step_cpu( self, positions: npt.NDArray[np.floating], directions: npt.NDArray[np.floating], active: npt.NDArray[np.bool_], material: MaterialFieldProtocol, step_size: float, wavelength: float, geometric_path_lengths: npt.NDArray[np.floating] | None = None, optical_path_lengths: npt.NDArray[np.floating] | None = None, accumulated_time: npt.NDArray[np.floating] | None = None, ) -> None: """ Propagate rays through gradient medium using CPU (vectorized NumPy). This method modifies arrays in-place using Euler or RK4 integration. Parameters ---------- positions : ndarray Ray positions, shape (N, 3), modified in-place. directions : ndarray Ray directions, shape (N, 3), modified in-place. active : ndarray Active ray mask, shape (N,). material : MaterialFieldProtocol Material with get_refractive_index and get_refractive_index_gradient. step_size : float Step size in meters. wavelength : float Wavelength in meters. geometric_path_lengths : ndarray, optional Geometric path lengths, shape (N,), updated in-place. optical_path_lengths : ndarray, optional Optical path lengths, shape (N,), updated in-place. accumulated_time : ndarray, optional Accumulated time, shape (N,), updated in-place. """ c = 299792458.0 # Speed of light active_mask = active.astype(bool) if not np.any(active_mask): return pos = positions[active_mask] dirs = directions[active_mask] if self.use_rk4: new_pos, new_dirs, n_avg = self._rk4_step_vectorized( pos, dirs, material, step_size, wavelength ) else: new_pos, new_dirs, n_avg = self._euler_step_vectorized( pos, dirs, material, step_size, wavelength ) positions[active_mask] = new_pos.astype(np.float32) directions[active_mask] = new_dirs.astype(np.float32) if geometric_path_lengths is not None: geometric_path_lengths[active_mask] += step_size if optical_path_lengths is not None: optical_path_lengths[active_mask] += (n_avg * step_size).astype(np.float32) if accumulated_time is not None: dt = (n_avg * step_size / c).astype(np.float32) accumulated_time[active_mask] += dt
def _euler_step_vectorized( self, positions: npt.NDArray[np.floating], directions: npt.NDArray[np.floating], material: MaterialFieldProtocol, step_size: float, wavelength: float, ) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """ Vectorized Euler step for gradient medium. Returns ------- new_positions : ndarray Updated positions. new_directions : ndarray Updated directions. n_values : ndarray Refractive index values for optical path calculation. """ x, y, z = positions[:, 0], positions[:, 1], positions[:, 2] dx, dy, dz = directions[:, 0], directions[:, 1], directions[:, 2] n = material.get_refractive_index(x, y, z, wavelength) grad = material.get_refractive_index_gradient(x, y, z, wavelength) grad_x, grad_y, grad_z = ( np.asarray(grad[0]), np.asarray(grad[1]), np.asarray(grad[2]), ) dot_product = dx * grad_x + dy * grad_y + dz * grad_z kappa_x = (grad_x - dot_product * dx) / n kappa_y = (grad_y - dot_product * dy) / n kappa_z = (grad_z - dot_product * dz) / n new_x = x + dx * step_size new_y = y + dy * step_size new_z = z + dz * step_size new_dx = dx + kappa_x * step_size new_dy = dy + kappa_y * step_size new_dz = dz + kappa_z * step_size norm = np.sqrt(new_dx**2 + new_dy**2 + new_dz**2) norm = np.where(norm < 1e-12, 1.0, norm) new_dx /= norm new_dy /= norm new_dz /= norm new_positions = np.column_stack([new_x, new_y, new_z]) new_directions = np.column_stack([new_dx, new_dy, new_dz]) return new_positions, new_directions, n def _rk4_step_vectorized( self, positions: npt.NDArray[np.floating], directions: npt.NDArray[np.floating], material: MaterialFieldProtocol, step_size: float, wavelength: float, ) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """ Vectorized RK4 step for gradient medium. Returns ------- new_positions : ndarray Updated positions. new_directions : ndarray Updated directions. n_avg : ndarray Average refractive index for optical path calculation. """ h = step_size h2 = h / 2.0 def compute_derivatives(pos, dirs): x, y, z = pos[:, 0], pos[:, 1], pos[:, 2] dx, dy, dz = dirs[:, 0], dirs[:, 1], dirs[:, 2] n = material.get_refractive_index(x, y, z, wavelength) grad = material.get_refractive_index_gradient(x, y, z, wavelength) grad_x, grad_y, grad_z = ( np.asarray(grad[0]), np.asarray(grad[1]), np.asarray(grad[2]), ) dot_product = dx * grad_x + dy * grad_y + dz * grad_z kappa_x = (grad_x - dot_product * dx) / n kappa_y = (grad_y - dot_product * dy) / n kappa_z = (grad_z - dot_product * dz) / n dr_ds = dirs.copy() dd_ds = np.column_stack([kappa_x, kappa_y, kappa_z]) return dr_ds, dd_ds, n def normalize_directions(dirs): norm = np.linalg.norm(dirs, axis=1, keepdims=True) norm = np.where(norm < 1e-12, 1.0, norm) return dirs / norm # k1 k1_r, k1_d, n0 = compute_derivatives(positions, directions) # k2 pos1 = positions + h2 * k1_r dirs1 = normalize_directions(directions + h2 * k1_d) k2_r, k2_d, n1 = compute_derivatives(pos1, dirs1) # k3 pos2 = positions + h2 * k2_r dirs2 = normalize_directions(directions + h2 * k2_d) k3_r, k3_d, n2 = compute_derivatives(pos2, dirs2) # k4 pos3 = positions + h * k3_r dirs3 = normalize_directions(directions + h * k3_d) k4_r, k4_d, n3 = compute_derivatives(pos3, dirs3) # Final RK4 combination new_positions = positions + (h / 6.0) * (k1_r + 2 * k2_r + 2 * k3_r + k4_r) new_directions = directions + (h / 6.0) * (k1_d + 2 * k2_d + 2 * k3_d + k4_d) new_directions = normalize_directions(new_directions) # Simpson's rule for average n n_avg = (n0 + 4 * n1 + n2) / 6.0 return new_positions, new_directions, n_avg # ======================================================================== # Unified Interface Methods (RayPropagatorProtocol) # ========================================================================
[docs] def propagate_step( self, rays, # RayBatch step_size: float, wavelength: float = 532e-9, material: MaterialFieldProtocol | None = None, ) -> None: """ Propagate rays by a single integration step. This is the unified interface method compatible with GPU propagators. Delegates to propagate_step_cpu internally. Parameters ---------- rays : RayBatch Ray batch containing positions, directions, and other ray properties. Modified in-place. step_size : float Integration step size in meters. wavelength : float, optional Wavelength in meters, default 532 nm. material : MaterialFieldProtocol, optional Material to propagate through. If not provided, must be set via a separate mechanism (e.g., stored during initialization). """ if material is None: raise ValueError( "material must be provided. CPU propagator requires explicit material." ) self.propagate_step_cpu( positions=rays.positions, directions=rays.directions, active=rays.active, material=material, step_size=step_size, wavelength=wavelength, geometric_path_lengths=rays.geometric_path_lengths, optical_path_lengths=rays.optical_path_lengths, accumulated_time=rays.accumulated_time, )
[docs] def propagate( self, rays, # RayBatch total_distance: float, step_size: float, wavelength: float = 532e-9, material: MaterialFieldProtocol | None = None, ) -> None: """ Propagate rays through a total distance. This is the unified interface method compatible with GPU propagators. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) total_distance : float Total distance to propagate in meters step_size : float Integration step size in meters wavelength : float, optional Wavelength in meters, default 532 nm material : MaterialFieldProtocol, optional Material to propagate through """ if material is None: raise ValueError( "material must be provided. CPU propagator requires explicit material." ) num_steps = int(total_distance / step_size) for _ in range(num_steps): self.propagate_step( rays=rays, step_size=step_size, wavelength=wavelength, material=material, )