Source code for lsurf.propagation.propagators.gpu_gradient

# The Clear BSD License
#
# Copyright (c) 2026 Tobias Heibges
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted (subject to the limitations in the disclaimer
# below) provided that the following conditions are met:
#
#      * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#
#      * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#      * Neither the name of the copyright holder nor the names of its
#      contributors may be used to endorse or promote products derived from this
#      software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
GPU-accelerated Gradient Propagator for inhomogeneous media.

This module provides a high-performance CUDA-based propagator for
materials with spatially-varying refractive indices.
"""

import numpy as np
import numpy.typing as npt

# GPU support is optional
try:
    from numba import cuda

    HAS_CUDA = True
except ImportError:

    class _FakeCuda:
        """Fake cuda module for when numba is not installed."""

        @staticmethod
        def jit(*args, **kwargs):
            """Return a no-op decorator."""

            def decorator(func):
                return func

            if args and callable(args[0]):
                return args[0]
            return decorator

        @staticmethod
        def is_available():
            return False

        @staticmethod
        def grid(n):
            return 0

        @staticmethod
        def synchronize():
            pass

        @staticmethod
        def to_device(arr):
            """Fake to_device that returns a wrapper."""
            return _FakeDeviceArray(arr)

    class _FakeDeviceArray:
        """Fake device array for when numba is not installed."""

        def __init__(self, arr):
            self._arr = arr

        def copy_to_host(self):
            return self._arr

    cuda = _FakeCuda()  # type: ignore[assignment]
    HAS_CUDA = False

from ..propagator_protocol import GPUMaterialProtocol, GPUMaterialID


[docs] class GPUGradientPropagator: """ High-performance GPU-based propagator for inhomogeneous materials. This propagator keeps all computation on the GPU for maximum performance. It works with any material that implements GPUMaterialProtocol, using the material's gpu_material_id to dispatch to the appropriate GPU kernels. Currently Supported Materials ----------------------------- - ExponentialAtmosphere (GPUMaterialID.EXPONENTIAL_ATMOSPHERE) For materials without GPU support, use GradientPropagator.propagate_step_cpu() which works with any MaterialField via the CPU. Parameters ---------- material : GPUMaterialProtocol The material to propagate through. Must implement GPUMaterialProtocol. method : str Integration method: 'euler' or 'rk4'. Default: 'rk4' threads_per_block : int CUDA threads per block. Default: 256 Example ------- >>> from lsurf.materials import ExponentialAtmosphere >>> atmosphere = ExponentialAtmosphere() >>> propagator = GPUGradientPropagator(atmosphere, method='rk4') >>> propagator.propagate(rays, total_distance=500e3, step_size=100.0) Raises ------ ValueError If the material does not support GPU acceleration. """
[docs] def __init__( self, material: GPUMaterialProtocol, method: str = "rk4", threads_per_block: int = 256, enable_absorption: bool = False, ): # Check if CUDA is available if not HAS_CUDA: raise ImportError( "CUDA support requires numba with CUDA. " "Use GradientPropagator for CPU-based propagation." ) # Store material reference for CPU fallback methods self._material = material # Validate material has GPU support if not hasattr(material, "gpu_material_id"): raise ValueError( f"Material {type(material).__name__} does not support GPU acceleration. " f"Use GradientPropagator.propagate_step_cpu() instead." ) # Validate material provides kernels if not hasattr(material, "get_gpu_kernels"): raise ValueError( f"Material {type(material).__name__} does not provide GPU kernels. " f"Implement get_gpu_kernels() method." ) self._material_id = material.gpu_material_id if method not in ("euler", "rk4"): raise ValueError(f"method must be 'euler' or 'rk4', got {method}") self.method = method self.threads_per_block = threads_per_block self.enable_absorption = enable_absorption # Get kernels from the material self._kernels = material.get_gpu_kernels() self._kernel = self._kernels.get(method) if self._kernel is None: raise ValueError( f"Material {type(material).__name__} does not provide '{method}' kernel. " f"Available: {list(self._kernels.keys())}" ) # Get absorption kernel if enabled self._absorption_kernel = None if enable_absorption: self._absorption_kernel = self._get_absorption_kernel() # Check if material uses device arrays (LUTs, grids, etc.) # Skip for materials with analytical duct kernels — they don't use LUT arrays, # and building the LUT can be very expensive. self._gpu_arrays = None self._device_arrays = None if hasattr(material, "get_gpu_arrays") and not hasattr( material, "get_duct_gpu_parameters" ): self._gpu_arrays = material.get_gpu_arrays()
# Device arrays will be created on first propagate call def _get_absorption_kernel(self): """Get the appropriate absorption kernel based on material type.""" from ..kernels.absorption.simple import _kernel_absorption_simple from ..kernels.absorption.grid import _kernel_absorption_grid if self._material_id == GPUMaterialID.SIMPLE_INHOMOGENEOUS: return _kernel_absorption_simple elif self._material_id == GPUMaterialID.EXPONENTIAL_ATMOSPHERE: return _kernel_absorption_simple # Uses same LUT structure elif self._material_id == GPUMaterialID.GRID_INHOMOGENEOUS: return _kernel_absorption_grid else: raise ValueError( f"No absorption kernel available for material ID {self._material_id}" ) def _apply_absorption( self, positions_d, directions_d, active_d, intensities_d, optical_depth_d, step_size: float, num_steps: int, blocks: int, ) -> None: """Apply absorption using the appropriate kernel.""" material_params = self._material.get_gpu_parameters() if self._material_id in ( GPUMaterialID.SIMPLE_INHOMOGENEOUS, GPUMaterialID.EXPONENTIAL_ATMOSPHERE, ): # Simple kernel: positions, directions, active, intensities, optical_depth, # step_size, num_steps, center_x, center_y, center_z, ref_radius, # min_alt, delta_h, lut_size, lut_alpha self._absorption_kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, intensities_d, optical_depth_d, float(step_size), num_steps, material_params[0], # center_x material_params[1], # center_y material_params[2], # center_z material_params[3], # ref_radius material_params[4], # min_alt material_params[5], # delta_h material_params[6], # lut_size self._device_arrays["lut_alpha"], ) elif self._material_id == GPUMaterialID.GRID_INHOMOGENEOUS: # Grid kernel: positions, directions, active, intensities, optical_depth, # step_size, num_steps, x_min, y_min, z_min, grid_dx, grid_dy, grid_dz, # nx, ny, nz, alpha_grid self._absorption_kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, intensities_d, optical_depth_d, float(step_size), num_steps, material_params[0], # x_min material_params[1], # y_min material_params[2], # z_min material_params[3], # grid_dx material_params[4], # grid_dy material_params[5], # grid_dz material_params[6], # nx material_params[7], # ny material_params[8], # nz self._device_arrays["alpha_grid"], ) @property def material(self) -> GPUMaterialProtocol: """The material being propagated through.""" return self._material
[docs] def propagate_step( self, rays, # RayBatch step_size: float, wavelength: float = 532e-9, ) -> None: """ Propagate rays by a single integration step. This is the unified interface method compatible with CPU propagators. Performs a single step propagation on GPU. Parameters ---------- rays : RayBatch Ray batch containing positions, directions, and other ray properties. Modified in-place. step_size : float Integration step size in meters. wavelength : float, optional Wavelength in meters, default 532 nm. """ # Single step is just propagate with total_distance = step_size self.propagate( rays=rays, total_distance=step_size, step_size=step_size, wavelength=wavelength, )
[docs] def propagate( self, rays, # RayBatch total_distance: float, step_size: float, wavelength: float = 532e-9, steps_per_kernel: int = 100, target_dn_frac: float | None = None, ) -> None: """ Propagate rays through material on GPU. Modifies the rays object in-place. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) total_distance : float Total distance to propagate in meters step_size : float Integration step size in meters (max_step in adaptive mode) wavelength : float, optional Wavelength in meters, default 532 nm (green light) steps_per_kernel : int Number of steps per kernel launch. Higher values reduce kernel launch overhead but increase register pressure. target_dn_frac : float or None If provided, use adaptive stepping with this fractional refractivity change target. The step_size becomes max_step and min_step_size from the material is used as min_step. """ num_rays = rays.num_rays # Prepare GPU arrays for ray state positions_d = cuda.to_device(rays.positions.astype(np.float32)) directions_d = cuda.to_device(rays.directions.astype(np.float32)) active_d = cuda.to_device(rays.active) geo_path_d = cuda.to_device(rays.geometric_path_lengths.astype(np.float32)) opt_path_d = cuda.to_device(rays.optical_path_lengths.astype(np.float32)) acc_time_d = cuda.to_device(rays.accumulated_time.astype(np.float32)) # Prepare absorption arrays if enabled intensities_d = None optical_depth_d = None if self.enable_absorption: intensities_d = cuda.to_device(rays.intensities.astype(np.float32)) if rays.optical_depth is not None: optical_depth_d = cuda.to_device(rays.optical_depth.astype(np.float32)) else: optical_depth_d = cuda.to_device(np.zeros(num_rays, dtype=np.float32)) # Transfer material arrays to device if needed (LUTs, grids, etc.) if self._gpu_arrays is not None and self._device_arrays is None: self._device_arrays = {} for name, arr in self._gpu_arrays.items(): self._device_arrays[name] = cuda.to_device(arr.astype(np.float32)) # Get material-specific kernel parameters from the material # Analytical duct kernels use dedicated parameters (no LUT arrays) if hasattr(self._material, "get_duct_gpu_parameters"): kernel_args = list(self._material.get_duct_gpu_parameters()) else: material_params = self._material.get_gpu_parameters() kernel_args = list(material_params) if self._device_arrays is not None: # Append device arrays in the order the kernel expects # Convention: lut_n, lut_dn_dh for SimpleAtmosphere # n_grid, grad_grid for GridAtmosphere for name in ["lut_n", "lut_dn_dh", "n_grid", "grad_grid"]: if name in self._device_arrays: kernel_args.append(self._device_arrays[name]) # Configure kernel launch blocks = (num_rays + self.threads_per_block - 1) // self.threads_per_block if target_dn_frac is not None: # Adaptive mode: use adaptive kernel variant adaptive_key = f"{self.method}_adaptive" adaptive_kernel = self._kernels.get(adaptive_key) if adaptive_kernel is None: raise ValueError( f"Material {type(self._material).__name__} does not provide " f"'{adaptive_key}' kernel. " f"Available: {list(self._kernels.keys())}" ) # Allocate step statistics arrays d_step_stats_min = cuda.to_device( np.full(num_rays, np.float32(1e30), dtype=np.float32) ) d_step_stats_max = cuda.to_device(np.zeros(num_rays, dtype=np.float32)) d_step_stats_count = cuda.to_device(np.zeros(num_rays, dtype=np.float32)) # Single kernel launch covers the full distance min_step = 3e-4 # Default min step (0.3mm) adaptive_kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, geo_path_d, opt_path_d, acc_time_d, float(total_distance), float(min_step), float(step_size), # max_step float(target_dn_frac), *kernel_args, float(wavelength), d_step_stats_min, d_step_stats_max, d_step_stats_count, ) else: # Fixed-step mode (existing behavior) total_steps = int(total_distance / step_size) remaining_steps = total_steps while remaining_steps > 0: steps_this_call = min(remaining_steps, steps_per_kernel) self._kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, geo_path_d, opt_path_d, acc_time_d, float(step_size), steps_this_call, *kernel_args, float(wavelength), ) # Apply absorption AFTER each batch (for proper path integration) if self.enable_absorption and self._absorption_kernel is not None: self._apply_absorption( positions_d, directions_d, active_d, intensities_d, optical_depth_d, step_size, steps_this_call, blocks, ) remaining_steps -= steps_this_call # Synchronize and copy back cuda.synchronize() rays.positions[:] = positions_d.copy_to_host() rays.directions[:] = directions_d.copy_to_host() rays.geometric_path_lengths[:] = geo_path_d.copy_to_host() rays.optical_path_lengths[:] = opt_path_d.copy_to_host() rays.accumulated_time[:] = acc_time_d.copy_to_host() # Copy back absorption data if enabled if self.enable_absorption: rays.intensities[:] = intensities_d.copy_to_host() if rays.optical_depth is not None: rays.optical_depth[:] = optical_depth_d.copy_to_host() # Store adaptive step stats if available if target_dn_frac is not None: from ...simulation.result import AdaptiveStepStats self._last_adaptive_step_stats = AdaptiveStepStats( step_min=d_step_stats_min.copy_to_host(), step_max=d_step_stats_max.copy_to_host(), step_count=d_step_stats_count.copy_to_host(), total_distance=np.full(num_rays, total_distance, dtype=np.float32), ) else: self._last_adaptive_step_stats = None
@property def last_adaptive_step_stats(self): """Return adaptive step stats from the most recent propagate() call.""" return getattr(self, "_last_adaptive_step_stats", None)
[docs] def propagate_with_history( self, rays, # RayBatch total_distance: float, step_size: float, history_interval: int = 100, wavelength: float = 1.0e-6, target_dn_frac: float | None = None, history_distance_interval: float | None = None, ) -> dict: """ Propagate rays and record position history at intervals. Parameters ---------- rays : RayBatch Ray batch to propagate (modified in-place) total_distance : float Total distance to propagate in meters step_size : float Integration step size in meters (max_step in adaptive mode) history_interval : int Record history every N steps (fixed-step mode only) wavelength : float Wavelength in meters target_dn_frac : float or None If provided, use adaptive stepping with this fractional refractivity change target. history_distance_interval : float or None Distance between history records in adaptive mode. If None in adaptive mode, defaults to step_size * history_interval. Returns ------- dict Dictionary with 'positions', 'directions', 'distances' arrays """ num_rays = rays.num_rays # Prepare GPU arrays for ray state positions_d = cuda.to_device(rays.positions.astype(np.float32)) directions_d = cuda.to_device(rays.directions.astype(np.float32)) active_d = cuda.to_device(rays.active) geo_path_d = cuda.to_device(rays.geometric_path_lengths.astype(np.float32)) opt_path_d = cuda.to_device(rays.optical_path_lengths.astype(np.float32)) acc_time_d = cuda.to_device(rays.accumulated_time.astype(np.float32)) # Transfer material arrays to device if needed (LUTs, grids, etc.) if self._gpu_arrays is not None and self._device_arrays is None: self._device_arrays = {} for name, arr in self._gpu_arrays.items(): self._device_arrays[name] = cuda.to_device(arr.astype(np.float32)) # Get material-specific kernel parameters # Analytical duct kernels use dedicated parameters (no LUT arrays) if hasattr(self._material, "get_duct_gpu_parameters"): kernel_args = list(self._material.get_duct_gpu_parameters()) else: material_params = self._material.get_gpu_parameters() kernel_args = list(material_params) if self._device_arrays is not None: for name in ["lut_n", "lut_dn_dh", "n_grid", "grad_grid"]: if name in self._device_arrays: kernel_args.append(self._device_arrays[name]) # Configure kernel launch blocks = (num_rays + self.threads_per_block - 1) // self.threads_per_block if target_dn_frac is not None: # Adaptive mode: record by distance intervals adaptive_key = f"{self.method}_adaptive" adaptive_kernel = self._kernels.get(adaptive_key) if adaptive_kernel is None: raise ValueError( f"Material {type(self._material).__name__} does not provide " f"'{adaptive_key}' kernel. " f"Available: {list(self._kernels.keys())}" ) dist_interval = ( history_distance_interval if history_distance_interval is not None else step_size * history_interval ) num_records = int(total_distance / dist_interval) + 2 min_step = 3e-4 # Allocate step statistics arrays d_step_stats_min = cuda.to_device( np.full(num_rays, np.float32(1e30), dtype=np.float32) ) d_step_stats_max = cuda.to_device(np.zeros(num_rays, dtype=np.float32)) d_step_stats_count = cuda.to_device(np.zeros(num_rays, dtype=np.float32)) # Storage for history position_history = np.zeros((num_records, num_rays, 3), dtype=np.float32) direction_history = np.zeros((num_records, num_rays, 3), dtype=np.float32) distance_history = np.zeros(num_records, dtype=np.float32) # Record initial state position_history[0] = positions_d.copy_to_host() direction_history[0] = directions_d.copy_to_host() distance_history[0] = 0.0 record_idx = 1 distance_covered = 0.0 while distance_covered < total_distance: chunk = min(dist_interval, total_distance - distance_covered) adaptive_kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, geo_path_d, opt_path_d, acc_time_d, float(chunk), float(min_step), float(step_size), # max_step float(target_dn_frac), *kernel_args, float(wavelength), d_step_stats_min, d_step_stats_max, d_step_stats_count, ) distance_covered += chunk if record_idx < num_records: cuda.synchronize() position_history[record_idx] = positions_d.copy_to_host() direction_history[record_idx] = directions_d.copy_to_host() distance_history[record_idx] = distance_covered record_idx += 1 else: # Fixed-step mode (existing behavior) total_steps = int(total_distance / step_size) num_records = total_steps // history_interval + 1 # Storage for history position_history = np.zeros((num_records, num_rays, 3), dtype=np.float32) direction_history = np.zeros((num_records, num_rays, 3), dtype=np.float32) distance_history = np.zeros(num_records, dtype=np.float32) # Record initial state position_history[0] = positions_d.copy_to_host() direction_history[0] = directions_d.copy_to_host() distance_history[0] = 0.0 record_idx = 1 steps_done = 0 while steps_done < total_steps: steps_to_next_record = min( history_interval - (steps_done % history_interval), total_steps - steps_done, ) self._kernel[blocks, self.threads_per_block]( positions_d, directions_d, active_d, geo_path_d, opt_path_d, acc_time_d, float(step_size), steps_to_next_record, *kernel_args, float(wavelength), ) steps_done += steps_to_next_record # Record if at interval if steps_done % history_interval == 0 and record_idx < num_records: cuda.synchronize() position_history[record_idx] = positions_d.copy_to_host() direction_history[record_idx] = directions_d.copy_to_host() distance_history[record_idx] = steps_done * step_size record_idx += 1 # Final copy back cuda.synchronize() rays.positions[:] = positions_d.copy_to_host() rays.directions[:] = directions_d.copy_to_host() rays.geometric_path_lengths[:] = geo_path_d.copy_to_host() rays.optical_path_lengths[:] = opt_path_d.copy_to_host() rays.accumulated_time[:] = acc_time_d.copy_to_host() # Store adaptive step stats if available if target_dn_frac is not None: from ...simulation.result import AdaptiveStepStats self._last_adaptive_step_stats = AdaptiveStepStats( step_min=d_step_stats_min.copy_to_host(), step_max=d_step_stats_max.copy_to_host(), step_count=d_step_stats_count.copy_to_host(), total_distance=np.full(num_rays, total_distance, dtype=np.float32), ) else: self._last_adaptive_step_stats = None return { "positions": position_history[:record_idx], "directions": direction_history[:record_idx], "distances": distance_history[:record_idx], }
[docs] def get_refractive_index( self, positions: npt.NDArray[np.floating], wavelength: float = 1.0e-6, ) -> npt.NDArray[np.float64]: """ Compute refractive index at positions (CPU, delegates to material). Parameters ---------- positions : ndarray (N, 3) Position coordinates wavelength : float Wavelength in meters Returns ------- ndarray (N,) Refractive index at each position """ x = positions[:, 0] y = positions[:, 1] z = positions[:, 2] return self._material.get_refractive_index(x, y, z, wavelength)
# Convenience alias GPUInhomogeneousPropagator = GPUGradientPropagator