# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
GPU-accelerated Gradient Propagator for inhomogeneous media.
This module provides a high-performance CUDA-based propagator for
materials with spatially-varying refractive indices.
"""
import numpy as np
import numpy.typing as npt
# GPU support is optional
try:
from numba import cuda
HAS_CUDA = True
except ImportError:
class _FakeCuda:
"""Fake cuda module for when numba is not installed."""
@staticmethod
def jit(*args, **kwargs):
"""Return a no-op decorator."""
def decorator(func):
return func
if args and callable(args[0]):
return args[0]
return decorator
@staticmethod
def is_available():
return False
@staticmethod
def grid(n):
return 0
@staticmethod
def synchronize():
pass
@staticmethod
def to_device(arr):
"""Fake to_device that returns a wrapper."""
return _FakeDeviceArray(arr)
class _FakeDeviceArray:
"""Fake device array for when numba is not installed."""
def __init__(self, arr):
self._arr = arr
def copy_to_host(self):
return self._arr
cuda = _FakeCuda() # type: ignore[assignment]
HAS_CUDA = False
from ..propagator_protocol import GPUMaterialProtocol, GPUMaterialID
[docs]
class GPUGradientPropagator:
"""
High-performance GPU-based propagator for inhomogeneous materials.
This propagator keeps all computation on the GPU for maximum performance.
It works with any material that implements GPUMaterialProtocol, using
the material's gpu_material_id to dispatch to the appropriate GPU kernels.
Currently Supported Materials
-----------------------------
- ExponentialAtmosphere (GPUMaterialID.EXPONENTIAL_ATMOSPHERE)
For materials without GPU support, use GradientPropagator.propagate_step_cpu()
which works with any MaterialField via the CPU.
Parameters
----------
material : GPUMaterialProtocol
The material to propagate through. Must implement GPUMaterialProtocol.
method : str
Integration method: 'euler' or 'rk4'. Default: 'rk4'
threads_per_block : int
CUDA threads per block. Default: 256
Example
-------
>>> from lsurf.materials import ExponentialAtmosphere
>>> atmosphere = ExponentialAtmosphere()
>>> propagator = GPUGradientPropagator(atmosphere, method='rk4')
>>> propagator.propagate(rays, total_distance=500e3, step_size=100.0)
Raises
------
ValueError
If the material does not support GPU acceleration.
"""
[docs]
def __init__(
self,
material: GPUMaterialProtocol,
method: str = "rk4",
threads_per_block: int = 256,
enable_absorption: bool = False,
):
# Check if CUDA is available
if not HAS_CUDA:
raise ImportError(
"CUDA support requires numba with CUDA. "
"Use GradientPropagator for CPU-based propagation."
)
# Store material reference for CPU fallback methods
self._material = material
# Validate material has GPU support
if not hasattr(material, "gpu_material_id"):
raise ValueError(
f"Material {type(material).__name__} does not support GPU acceleration. "
f"Use GradientPropagator.propagate_step_cpu() instead."
)
# Validate material provides kernels
if not hasattr(material, "get_gpu_kernels"):
raise ValueError(
f"Material {type(material).__name__} does not provide GPU kernels. "
f"Implement get_gpu_kernels() method."
)
self._material_id = material.gpu_material_id
if method not in ("euler", "rk4"):
raise ValueError(f"method must be 'euler' or 'rk4', got {method}")
self.method = method
self.threads_per_block = threads_per_block
self.enable_absorption = enable_absorption
# Get kernels from the material
self._kernels = material.get_gpu_kernels()
self._kernel = self._kernels.get(method)
if self._kernel is None:
raise ValueError(
f"Material {type(material).__name__} does not provide '{method}' kernel. "
f"Available: {list(self._kernels.keys())}"
)
# Get absorption kernel if enabled
self._absorption_kernel = None
if enable_absorption:
self._absorption_kernel = self._get_absorption_kernel()
# Check if material uses device arrays (LUTs, grids, etc.)
# Skip for materials with analytical duct kernels — they don't use LUT arrays,
# and building the LUT can be very expensive.
self._gpu_arrays = None
self._device_arrays = None
if hasattr(material, "get_gpu_arrays") and not hasattr(
material, "get_duct_gpu_parameters"
):
self._gpu_arrays = material.get_gpu_arrays()
# Device arrays will be created on first propagate call
def _get_absorption_kernel(self):
"""Get the appropriate absorption kernel based on material type."""
from ..kernels.absorption.simple import _kernel_absorption_simple
from ..kernels.absorption.grid import _kernel_absorption_grid
if self._material_id == GPUMaterialID.SIMPLE_INHOMOGENEOUS:
return _kernel_absorption_simple
elif self._material_id == GPUMaterialID.EXPONENTIAL_ATMOSPHERE:
return _kernel_absorption_simple # Uses same LUT structure
elif self._material_id == GPUMaterialID.GRID_INHOMOGENEOUS:
return _kernel_absorption_grid
else:
raise ValueError(
f"No absorption kernel available for material ID {self._material_id}"
)
def _apply_absorption(
self,
positions_d,
directions_d,
active_d,
intensities_d,
optical_depth_d,
step_size: float,
num_steps: int,
blocks: int,
) -> None:
"""Apply absorption using the appropriate kernel."""
material_params = self._material.get_gpu_parameters()
if self._material_id in (
GPUMaterialID.SIMPLE_INHOMOGENEOUS,
GPUMaterialID.EXPONENTIAL_ATMOSPHERE,
):
# Simple kernel: positions, directions, active, intensities, optical_depth,
# step_size, num_steps, center_x, center_y, center_z, ref_radius,
# min_alt, delta_h, lut_size, lut_alpha
self._absorption_kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
intensities_d,
optical_depth_d,
float(step_size),
num_steps,
material_params[0], # center_x
material_params[1], # center_y
material_params[2], # center_z
material_params[3], # ref_radius
material_params[4], # min_alt
material_params[5], # delta_h
material_params[6], # lut_size
self._device_arrays["lut_alpha"],
)
elif self._material_id == GPUMaterialID.GRID_INHOMOGENEOUS:
# Grid kernel: positions, directions, active, intensities, optical_depth,
# step_size, num_steps, x_min, y_min, z_min, grid_dx, grid_dy, grid_dz,
# nx, ny, nz, alpha_grid
self._absorption_kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
intensities_d,
optical_depth_d,
float(step_size),
num_steps,
material_params[0], # x_min
material_params[1], # y_min
material_params[2], # z_min
material_params[3], # grid_dx
material_params[4], # grid_dy
material_params[5], # grid_dz
material_params[6], # nx
material_params[7], # ny
material_params[8], # nz
self._device_arrays["alpha_grid"],
)
@property
def material(self) -> GPUMaterialProtocol:
"""The material being propagated through."""
return self._material
[docs]
def propagate_step(
self,
rays, # RayBatch
step_size: float,
wavelength: float = 532e-9,
) -> None:
"""
Propagate rays by a single integration step.
This is the unified interface method compatible with CPU propagators.
Performs a single step propagation on GPU.
Parameters
----------
rays : RayBatch
Ray batch containing positions, directions, and other ray properties.
Modified in-place.
step_size : float
Integration step size in meters.
wavelength : float, optional
Wavelength in meters, default 532 nm.
"""
# Single step is just propagate with total_distance = step_size
self.propagate(
rays=rays,
total_distance=step_size,
step_size=step_size,
wavelength=wavelength,
)
[docs]
def propagate(
self,
rays, # RayBatch
total_distance: float,
step_size: float,
wavelength: float = 532e-9,
steps_per_kernel: int = 100,
target_dn_frac: float | None = None,
) -> None:
"""
Propagate rays through material on GPU.
Modifies the rays object in-place.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
total_distance : float
Total distance to propagate in meters
step_size : float
Integration step size in meters (max_step in adaptive mode)
wavelength : float, optional
Wavelength in meters, default 532 nm (green light)
steps_per_kernel : int
Number of steps per kernel launch. Higher values reduce
kernel launch overhead but increase register pressure.
target_dn_frac : float or None
If provided, use adaptive stepping with this fractional
refractivity change target. The step_size becomes max_step
and min_step_size from the material is used as min_step.
"""
num_rays = rays.num_rays
# Prepare GPU arrays for ray state
positions_d = cuda.to_device(rays.positions.astype(np.float32))
directions_d = cuda.to_device(rays.directions.astype(np.float32))
active_d = cuda.to_device(rays.active)
geo_path_d = cuda.to_device(rays.geometric_path_lengths.astype(np.float32))
opt_path_d = cuda.to_device(rays.optical_path_lengths.astype(np.float32))
acc_time_d = cuda.to_device(rays.accumulated_time.astype(np.float32))
# Prepare absorption arrays if enabled
intensities_d = None
optical_depth_d = None
if self.enable_absorption:
intensities_d = cuda.to_device(rays.intensities.astype(np.float32))
if rays.optical_depth is not None:
optical_depth_d = cuda.to_device(rays.optical_depth.astype(np.float32))
else:
optical_depth_d = cuda.to_device(np.zeros(num_rays, dtype=np.float32))
# Transfer material arrays to device if needed (LUTs, grids, etc.)
if self._gpu_arrays is not None and self._device_arrays is None:
self._device_arrays = {}
for name, arr in self._gpu_arrays.items():
self._device_arrays[name] = cuda.to_device(arr.astype(np.float32))
# Get material-specific kernel parameters from the material
# Analytical duct kernels use dedicated parameters (no LUT arrays)
if hasattr(self._material, "get_duct_gpu_parameters"):
kernel_args = list(self._material.get_duct_gpu_parameters())
else:
material_params = self._material.get_gpu_parameters()
kernel_args = list(material_params)
if self._device_arrays is not None:
# Append device arrays in the order the kernel expects
# Convention: lut_n, lut_dn_dh for SimpleAtmosphere
# n_grid, grad_grid for GridAtmosphere
for name in ["lut_n", "lut_dn_dh", "n_grid", "grad_grid"]:
if name in self._device_arrays:
kernel_args.append(self._device_arrays[name])
# Configure kernel launch
blocks = (num_rays + self.threads_per_block - 1) // self.threads_per_block
if target_dn_frac is not None:
# Adaptive mode: use adaptive kernel variant
adaptive_key = f"{self.method}_adaptive"
adaptive_kernel = self._kernels.get(adaptive_key)
if adaptive_kernel is None:
raise ValueError(
f"Material {type(self._material).__name__} does not provide "
f"'{adaptive_key}' kernel. "
f"Available: {list(self._kernels.keys())}"
)
# Allocate step statistics arrays
d_step_stats_min = cuda.to_device(
np.full(num_rays, np.float32(1e30), dtype=np.float32)
)
d_step_stats_max = cuda.to_device(np.zeros(num_rays, dtype=np.float32))
d_step_stats_count = cuda.to_device(np.zeros(num_rays, dtype=np.float32))
# Single kernel launch covers the full distance
min_step = 3e-4 # Default min step (0.3mm)
adaptive_kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
geo_path_d,
opt_path_d,
acc_time_d,
float(total_distance),
float(min_step),
float(step_size), # max_step
float(target_dn_frac),
*kernel_args,
float(wavelength),
d_step_stats_min,
d_step_stats_max,
d_step_stats_count,
)
else:
# Fixed-step mode (existing behavior)
total_steps = int(total_distance / step_size)
remaining_steps = total_steps
while remaining_steps > 0:
steps_this_call = min(remaining_steps, steps_per_kernel)
self._kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
geo_path_d,
opt_path_d,
acc_time_d,
float(step_size),
steps_this_call,
*kernel_args,
float(wavelength),
)
# Apply absorption AFTER each batch (for proper path integration)
if self.enable_absorption and self._absorption_kernel is not None:
self._apply_absorption(
positions_d,
directions_d,
active_d,
intensities_d,
optical_depth_d,
step_size,
steps_this_call,
blocks,
)
remaining_steps -= steps_this_call
# Synchronize and copy back
cuda.synchronize()
rays.positions[:] = positions_d.copy_to_host()
rays.directions[:] = directions_d.copy_to_host()
rays.geometric_path_lengths[:] = geo_path_d.copy_to_host()
rays.optical_path_lengths[:] = opt_path_d.copy_to_host()
rays.accumulated_time[:] = acc_time_d.copy_to_host()
# Copy back absorption data if enabled
if self.enable_absorption:
rays.intensities[:] = intensities_d.copy_to_host()
if rays.optical_depth is not None:
rays.optical_depth[:] = optical_depth_d.copy_to_host()
# Store adaptive step stats if available
if target_dn_frac is not None:
from ...simulation.result import AdaptiveStepStats
self._last_adaptive_step_stats = AdaptiveStepStats(
step_min=d_step_stats_min.copy_to_host(),
step_max=d_step_stats_max.copy_to_host(),
step_count=d_step_stats_count.copy_to_host(),
total_distance=np.full(num_rays, total_distance, dtype=np.float32),
)
else:
self._last_adaptive_step_stats = None
@property
def last_adaptive_step_stats(self):
"""Return adaptive step stats from the most recent propagate() call."""
return getattr(self, "_last_adaptive_step_stats", None)
[docs]
def propagate_with_history(
self,
rays, # RayBatch
total_distance: float,
step_size: float,
history_interval: int = 100,
wavelength: float = 1.0e-6,
target_dn_frac: float | None = None,
history_distance_interval: float | None = None,
) -> dict:
"""
Propagate rays and record position history at intervals.
Parameters
----------
rays : RayBatch
Ray batch to propagate (modified in-place)
total_distance : float
Total distance to propagate in meters
step_size : float
Integration step size in meters (max_step in adaptive mode)
history_interval : int
Record history every N steps (fixed-step mode only)
wavelength : float
Wavelength in meters
target_dn_frac : float or None
If provided, use adaptive stepping with this fractional
refractivity change target.
history_distance_interval : float or None
Distance between history records in adaptive mode.
If None in adaptive mode, defaults to step_size * history_interval.
Returns
-------
dict
Dictionary with 'positions', 'directions', 'distances' arrays
"""
num_rays = rays.num_rays
# Prepare GPU arrays for ray state
positions_d = cuda.to_device(rays.positions.astype(np.float32))
directions_d = cuda.to_device(rays.directions.astype(np.float32))
active_d = cuda.to_device(rays.active)
geo_path_d = cuda.to_device(rays.geometric_path_lengths.astype(np.float32))
opt_path_d = cuda.to_device(rays.optical_path_lengths.astype(np.float32))
acc_time_d = cuda.to_device(rays.accumulated_time.astype(np.float32))
# Transfer material arrays to device if needed (LUTs, grids, etc.)
if self._gpu_arrays is not None and self._device_arrays is None:
self._device_arrays = {}
for name, arr in self._gpu_arrays.items():
self._device_arrays[name] = cuda.to_device(arr.astype(np.float32))
# Get material-specific kernel parameters
# Analytical duct kernels use dedicated parameters (no LUT arrays)
if hasattr(self._material, "get_duct_gpu_parameters"):
kernel_args = list(self._material.get_duct_gpu_parameters())
else:
material_params = self._material.get_gpu_parameters()
kernel_args = list(material_params)
if self._device_arrays is not None:
for name in ["lut_n", "lut_dn_dh", "n_grid", "grad_grid"]:
if name in self._device_arrays:
kernel_args.append(self._device_arrays[name])
# Configure kernel launch
blocks = (num_rays + self.threads_per_block - 1) // self.threads_per_block
if target_dn_frac is not None:
# Adaptive mode: record by distance intervals
adaptive_key = f"{self.method}_adaptive"
adaptive_kernel = self._kernels.get(adaptive_key)
if adaptive_kernel is None:
raise ValueError(
f"Material {type(self._material).__name__} does not provide "
f"'{adaptive_key}' kernel. "
f"Available: {list(self._kernels.keys())}"
)
dist_interval = (
history_distance_interval
if history_distance_interval is not None
else step_size * history_interval
)
num_records = int(total_distance / dist_interval) + 2
min_step = 3e-4
# Allocate step statistics arrays
d_step_stats_min = cuda.to_device(
np.full(num_rays, np.float32(1e30), dtype=np.float32)
)
d_step_stats_max = cuda.to_device(np.zeros(num_rays, dtype=np.float32))
d_step_stats_count = cuda.to_device(np.zeros(num_rays, dtype=np.float32))
# Storage for history
position_history = np.zeros((num_records, num_rays, 3), dtype=np.float32)
direction_history = np.zeros((num_records, num_rays, 3), dtype=np.float32)
distance_history = np.zeros(num_records, dtype=np.float32)
# Record initial state
position_history[0] = positions_d.copy_to_host()
direction_history[0] = directions_d.copy_to_host()
distance_history[0] = 0.0
record_idx = 1
distance_covered = 0.0
while distance_covered < total_distance:
chunk = min(dist_interval, total_distance - distance_covered)
adaptive_kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
geo_path_d,
opt_path_d,
acc_time_d,
float(chunk),
float(min_step),
float(step_size), # max_step
float(target_dn_frac),
*kernel_args,
float(wavelength),
d_step_stats_min,
d_step_stats_max,
d_step_stats_count,
)
distance_covered += chunk
if record_idx < num_records:
cuda.synchronize()
position_history[record_idx] = positions_d.copy_to_host()
direction_history[record_idx] = directions_d.copy_to_host()
distance_history[record_idx] = distance_covered
record_idx += 1
else:
# Fixed-step mode (existing behavior)
total_steps = int(total_distance / step_size)
num_records = total_steps // history_interval + 1
# Storage for history
position_history = np.zeros((num_records, num_rays, 3), dtype=np.float32)
direction_history = np.zeros((num_records, num_rays, 3), dtype=np.float32)
distance_history = np.zeros(num_records, dtype=np.float32)
# Record initial state
position_history[0] = positions_d.copy_to_host()
direction_history[0] = directions_d.copy_to_host()
distance_history[0] = 0.0
record_idx = 1
steps_done = 0
while steps_done < total_steps:
steps_to_next_record = min(
history_interval - (steps_done % history_interval),
total_steps - steps_done,
)
self._kernel[blocks, self.threads_per_block](
positions_d,
directions_d,
active_d,
geo_path_d,
opt_path_d,
acc_time_d,
float(step_size),
steps_to_next_record,
*kernel_args,
float(wavelength),
)
steps_done += steps_to_next_record
# Record if at interval
if steps_done % history_interval == 0 and record_idx < num_records:
cuda.synchronize()
position_history[record_idx] = positions_d.copy_to_host()
direction_history[record_idx] = directions_d.copy_to_host()
distance_history[record_idx] = steps_done * step_size
record_idx += 1
# Final copy back
cuda.synchronize()
rays.positions[:] = positions_d.copy_to_host()
rays.directions[:] = directions_d.copy_to_host()
rays.geometric_path_lengths[:] = geo_path_d.copy_to_host()
rays.optical_path_lengths[:] = opt_path_d.copy_to_host()
rays.accumulated_time[:] = acc_time_d.copy_to_host()
# Store adaptive step stats if available
if target_dn_frac is not None:
from ...simulation.result import AdaptiveStepStats
self._last_adaptive_step_stats = AdaptiveStepStats(
step_min=d_step_stats_min.copy_to_host(),
step_max=d_step_stats_max.copy_to_host(),
step_count=d_step_stats_count.copy_to_host(),
total_distance=np.full(num_rays, total_distance, dtype=np.float32),
)
else:
self._last_adaptive_step_stats = None
return {
"positions": position_history[:record_idx],
"directions": direction_history[:record_idx],
"distances": distance_history[:record_idx],
}
[docs]
def get_refractive_index(
self,
positions: npt.NDArray[np.floating],
wavelength: float = 1.0e-6,
) -> npt.NDArray[np.float64]:
"""
Compute refractive index at positions (CPU, delegates to material).
Parameters
----------
positions : ndarray (N, 3)
Position coordinates
wavelength : float
Wavelength in meters
Returns
-------
ndarray (N,)
Refractive index at each position
"""
x = positions[:, 0]
y = positions[:, 1]
z = positions[:, 2]
return self._material.get_refractive_index(x, y, z, wavelength)
# Convenience alias
GPUInhomogeneousPropagator = GPUGradientPropagator