# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
CPU-based Gradient Propagator for inhomogeneous media.
This module provides a propagator for materials with spatially-varying
refractive indices, using vectorized NumPy operations for efficiency.
"""
import numpy as np
import numpy.typing as npt
from ..propagator_protocol import MaterialFieldProtocol
[docs]
class GradientPropagator:
"""
Propagator for inhomogeneous media using ray equation integration.
This propagator handles materials with spatially-varying refractive
indices by numerically integrating the ray equation (eikonal equation).
Supports both Euler and RK4 integration methods.
Attributes
----------
use_rk4 : bool
If True, use RK4 integration; otherwise use Euler
adaptive_stepping : bool
If True, adapt step size to local gradient
min_step_size : float
Minimum step size in meters
max_step_size : float
Maximum step size in meters
threads_per_block : int
CUDA threads per block (legacy, not used for CPU)
References
----------
.. [1] Born, M., & Wolf, E. (1999). Principles of Optics. Section 3.1.
.. [2] Sharma, A., Kumar, D. V., & Ghatak, A. K. (1982). Tracing rays
through graded-index media: a new method. Applied Optics, 21(6),
984-987.
"""
[docs]
def __init__(
self,
use_rk4: bool | None = None,
method: str | None = None,
adaptive_stepping: bool = True,
min_step_size: float = 1e-6,
max_step_size: float = 1e-3,
threads_per_block: int = 256,
):
"""
Initialize gradient propagator.
Parameters
----------
use_rk4 : bool, optional
Use RK4 instead of Euler (deprecated, use method instead)
method : str, optional
Integration method: "euler" or "rk4", default "rk4"
adaptive_stepping : bool, optional
Enable adaptive step sizing, default True
min_step_size : float, optional
Minimum step size in meters, default 1 μm
max_step_size : float, optional
Maximum step size in meters, default 1 mm
threads_per_block : int, optional
CUDA threads per block (legacy), default 256
"""
# Handle both old (use_rk4) and new (method) parameter styles
if method is not None:
if method.lower() == "rk4":
self.use_rk4 = True
elif method.lower() == "euler":
self.use_rk4 = False
else:
raise ValueError(f"Unknown method '{method}'. Use 'euler' or 'rk4'.")
elif use_rk4 is not None:
self.use_rk4 = use_rk4
else:
self.use_rk4 = True # Default to RK4
self.adaptive_stepping = adaptive_stepping
self.min_step_size = min_step_size
self.max_step_size = max_step_size
self.threads_per_block = threads_per_block
[docs]
def compute_gradient_threshold(self, wavelength: float) -> float:
"""
Compute gradient threshold for switching to curved-path mode.
Parameters
----------
wavelength : float
Wavelength in meters
Returns
-------
float
Gradient magnitude threshold in m^-1
Notes
-----
Threshold is chosen such that ray curvature becomes significant
over a distance of ~wavelength: |∇n|/n > 1/λ
"""
return 1.0 / wavelength
[docs]
def propagate_step_cpu(
self,
positions: npt.NDArray[np.floating],
directions: npt.NDArray[np.floating],
active: npt.NDArray[np.bool_],
material: MaterialFieldProtocol,
step_size: float,
wavelength: float,
geometric_path_lengths: npt.NDArray[np.floating] | None = None,
optical_path_lengths: npt.NDArray[np.floating] | None = None,
accumulated_time: npt.NDArray[np.floating] | None = None,
) -> None:
"""
Propagate rays through gradient medium using CPU (vectorized NumPy).
This method modifies arrays in-place using Euler or RK4 integration.
Parameters
----------
positions : ndarray
Ray positions, shape (N, 3), modified in-place.
directions : ndarray
Ray directions, shape (N, 3), modified in-place.
active : ndarray
Active ray mask, shape (N,).
material : MaterialFieldProtocol
Material with get_refractive_index and get_refractive_index_gradient.
step_size : float
Step size in meters.
wavelength : float
Wavelength in meters.
geometric_path_lengths : ndarray, optional
Geometric path lengths, shape (N,), updated in-place.
optical_path_lengths : ndarray, optional
Optical path lengths, shape (N,), updated in-place.
accumulated_time : ndarray, optional
Accumulated time, shape (N,), updated in-place.
"""
c = 299792458.0 # Speed of light
active_mask = active.astype(bool)
if not np.any(active_mask):
return
pos = positions[active_mask]
dirs = directions[active_mask]
if self.use_rk4:
new_pos, new_dirs, n_avg = self._rk4_step_vectorized(
pos, dirs, material, step_size, wavelength
)
else:
new_pos, new_dirs, n_avg = self._euler_step_vectorized(
pos, dirs, material, step_size, wavelength
)
positions[active_mask] = new_pos.astype(np.float32)
directions[active_mask] = new_dirs.astype(np.float32)
if geometric_path_lengths is not None:
geometric_path_lengths[active_mask] += step_size
if optical_path_lengths is not None:
optical_path_lengths[active_mask] += (n_avg * step_size).astype(np.float32)
if accumulated_time is not None:
dt = (n_avg * step_size / c).astype(np.float32)
accumulated_time[active_mask] += dt
def _euler_step_vectorized(
self,
positions: npt.NDArray[np.floating],
directions: npt.NDArray[np.floating],
material: MaterialFieldProtocol,
step_size: float,
wavelength: float,
) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
"""
Vectorized Euler step for gradient medium.
Returns
-------
new_positions : ndarray
Updated positions.
new_directions : ndarray
Updated directions.
n_values : ndarray
Refractive index values for optical path calculation.
"""
x, y, z = positions[:, 0], positions[:, 1], positions[:, 2]
dx, dy, dz = directions[:, 0], directions[:, 1], directions[:, 2]
n = material.get_refractive_index(x, y, z, wavelength)
grad = material.get_refractive_index_gradient(x, y, z, wavelength)
grad_x, grad_y, grad_z = (
np.asarray(grad[0]),
np.asarray(grad[1]),
np.asarray(grad[2]),
)
dot_product = dx * grad_x + dy * grad_y + dz * grad_z
kappa_x = (grad_x - dot_product * dx) / n
kappa_y = (grad_y - dot_product * dy) / n
kappa_z = (grad_z - dot_product * dz) / n
new_x = x + dx * step_size
new_y = y + dy * step_size
new_z = z + dz * step_size
new_dx = dx + kappa_x * step_size
new_dy = dy + kappa_y * step_size
new_dz = dz + kappa_z * step_size
norm = np.sqrt(new_dx**2 + new_dy**2 + new_dz**2)
norm = np.where(norm < 1e-12, 1.0, norm)
new_dx /= norm
new_dy /= norm
new_dz /= norm
new_positions = np.column_stack([new_x, new_y, new_z])
new_directions = np.column_stack([new_dx, new_dy, new_dz])
return new_positions, new_directions, n
def _rk4_step_vectorized(
self,
positions: npt.NDArray[np.floating],
directions: npt.NDArray[np.floating],
material: MaterialFieldProtocol,
step_size: float,
wavelength: float,
) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
"""
Vectorized RK4 step for gradient medium.
Returns
-------
new_positions : ndarray
Updated positions.
new_directions : ndarray
Updated directions.
n_avg : ndarray
Average refractive index for optical path calculation.
"""
h = step_size
h2 = h / 2.0
def compute_derivatives(pos, dirs):
x, y, z = pos[:, 0], pos[:, 1], pos[:, 2]
dx, dy, dz = dirs[:, 0], dirs[:, 1], dirs[:, 2]
n = material.get_refractive_index(x, y, z, wavelength)
grad = material.get_refractive_index_gradient(x, y, z, wavelength)
grad_x, grad_y, grad_z = (
np.asarray(grad[0]),
np.asarray(grad[1]),
np.asarray(grad[2]),
)
dot_product = dx * grad_x + dy * grad_y + dz * grad_z
kappa_x = (grad_x - dot_product * dx) / n
kappa_y = (grad_y - dot_product * dy) / n
kappa_z = (grad_z - dot_product * dz) / n
dr_ds = dirs.copy()
dd_ds = np.column_stack([kappa_x, kappa_y, kappa_z])
return dr_ds, dd_ds, n
def normalize_directions(dirs):
norm = np.linalg.norm(dirs, axis=1, keepdims=True)
norm = np.where(norm < 1e-12, 1.0, norm)
return dirs / norm
# k1
k1_r, k1_d, n0 = compute_derivatives(positions, directions)
# k2
pos1 = positions + h2 * k1_r
dirs1 = normalize_directions(directions + h2 * k1_d)
k2_r, k2_d, n1 = compute_derivatives(pos1, dirs1)
# k3
pos2 = positions + h2 * k2_r
dirs2 = normalize_directions(directions + h2 * k2_d)
k3_r, k3_d, n2 = compute_derivatives(pos2, dirs2)
# k4
pos3 = positions + h * k3_r
dirs3 = normalize_directions(directions + h * k3_d)
k4_r, k4_d, n3 = compute_derivatives(pos3, dirs3)
# Final RK4 combination
new_positions = positions + (h / 6.0) * (k1_r + 2 * k2_r + 2 * k3_r + k4_r)
new_directions = directions + (h / 6.0) * (k1_d + 2 * k2_d + 2 * k3_d + k4_d)
new_directions = normalize_directions(new_directions)
# Simpson's rule for average n
n_avg = (n0 + 4 * n1 + n2) / 6.0
return new_positions, new_directions, n_avg
# ========================================================================
# Unified Interface Methods (RayPropagatorProtocol)
# ========================================================================
[docs]
def propagate_step(
self,
rays, # RayBatch
step_size: float,
wavelength: float = 532e-9,
material: MaterialFieldProtocol | None = None,
) -> None:
"""
Propagate rays by a single integration step.
This is the unified interface method compatible with GPU propagators.
Delegates to propagate_step_cpu internally.
Parameters
----------
rays : RayBatch
Ray batch containing positions, directions, and other ray properties.
Modified in-place.
step_size : float
Integration step size in meters.
wavelength : float, optional
Wavelength in meters, default 532 nm.
material : MaterialFieldProtocol, optional
Material to propagate through. If not provided, must be set via
a separate mechanism (e.g., stored during initialization).
"""
if material is None:
raise ValueError(
"material must be provided. CPU propagator requires explicit material."
)
self.propagate_step_cpu(
positions=rays.positions,
directions=rays.directions,
active=rays.active,
material=material,
step_size=step_size,
wavelength=wavelength,
geometric_path_lengths=rays.geometric_path_lengths,
optical_path_lengths=rays.optical_path_lengths,
accumulated_time=rays.accumulated_time,
)
[docs]
def propagate(
self,
rays, # RayBatch
total_distance: float,
step_size: float,
wavelength: float = 532e-9,
material: MaterialFieldProtocol | None = None,
) -> None:
"""
Propagate rays through a total distance.
This is the unified interface method compatible with GPU propagators.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
total_distance : float
Total distance to propagate in meters
step_size : float
Integration step size in meters
wavelength : float, optional
Wavelength in meters, default 532 nm
material : MaterialFieldProtocol, optional
Material to propagate through
"""
if material is None:
raise ValueError(
"material must be provided. CPU propagator requires explicit material."
)
num_steps = int(total_distance / step_size)
for _ in range(num_steps):
self.propagate_step(
rays=rays,
step_size=step_size,
wavelength=wavelength,
material=material,
)